You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by bl...@apache.org on 2015/04/28 01:12:24 UTC

[27/51] [partial] parquet-mr git commit: PARQUET-23: Rename to org.apache.parquet.

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/parquet/schema/Types.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/parquet/schema/Types.java b/parquet-column/src/main/java/parquet/schema/Types.java
deleted file mode 100644
index 7e95110..0000000
--- a/parquet-column/src/main/java/parquet/schema/Types.java
+++ /dev/null
@@ -1,668 +0,0 @@
-/* 
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package parquet.schema;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import parquet.Preconditions;
-import parquet.schema.PrimitiveType.PrimitiveTypeName;
-import parquet.schema.Type.ID;
-
-/**
- * This class provides fluent builders that produce Parquet schema Types.
- * <p>
- * The most basic use is to build primitive types:
- * <pre>
- *   Types.required(INT64).named("id");
- *   Types.optional(INT32).named("number");
- * </pre>
- * <p>
- * The {@link #required(PrimitiveTypeName)} factory method produces a primitive
- * type builder, and the {@link PrimitiveBuilder#named(String)} builds the
- * {@link PrimitiveType}. Between {@code required} and {@code named}, other
- * builder methods can be used to add type annotations or other type metadata:
- * <pre>
- *   Types.required(BINARY).as(UTF8).named("username");
- *   Types.optional(FIXED_LEN_BYTE_ARRAY).length(20).named("sha1");
- * </pre>
- * <p>
- * Optional types are built using {@link #optional(PrimitiveTypeName)} to get
- * the builder.
- * <p>
- * Groups are built similarly, using {@code requiredGroup()} (or the optional
- * version) to return a group builder. Group builders provide {@code required}
- * and {@code optional} to add primitive types, which return primitive builders
- * like the versions above.
- * <pre>
- *   // This produces:
- *   // required group User {
- *   //   required int64 id;
- *   //   optional binary email (UTF8);
- *   // }
- *   Types.requiredGroup()
- *            .required(INT64).named("id")
- *            .required(BINARY).as(UTF8).named("email")
- *        .named("User")
- * </pre>
- * <p>
- * When {@code required} is called on a group builder, the builder it returns
- * will add the type to the parent group when it is built and {@code named} will
- * return its parent group builder (instead of the type) so more fields can be
- * added.
- * <p>
- * Sub-groups can be created using {@code requiredGroup()} to get a group
- * builder that will create the group type, add it to the parent builder, and
- * return the parent builder for more fields.
- * <pre>
- *   // required group User {
- *   //   required int64 id;
- *   //   optional binary email (UTF8);
- *   //   optional group address {
- *   //     required binary street (UTF8);
- *   //     required int32 zipcode;
- *   //   }
- *   // }
- *   Types.requiredGroup()
- *            .required(INT64).named("id")
- *            .required(BINARY).as(UTF8).named("email")
- *            .optionalGroup()
- *                .required(BINARY).as(UTF8).named("street")
- *                .required(INT32).named("zipcode")
- *            .named("address")
- *        .named("User")
- * </pre>
- * <p>
- * Message types are built using {@link #buildMessage()} and function just like
- * group builders.
- * <pre>
- *   // message User {
- *   //   required int64 id;
- *   //   optional binary email (UTF8);
- *   //   optional group address {
- *   //     required binary street (UTF8);
- *   //     required int32 zipcode;
- *   //   }
- *   // }
- *   Types.buildMessage()
- *            .required(INT64).named("id")
- *            .required(BINARY).as(UTF8).named("email")
- *            .optionalGroup()
- *                .required(BINARY).as(UTF8).named("street")
- *                .required(INT32).named("zipcode")
- *            .named("address")
- *        .named("User")
- * </pre>
- * <p>
- * These builders enforce consistency checks based on the specifications in
- * the parquet-format documentation. For example, if DECIMAL is used to annotate
- * a FIXED_LEN_BYTE_ARRAY that is not long enough for its maximum precision,
- * these builders will throw an IllegalArgumentException:
- * <pre>
- *   // throws IllegalArgumentException with message:
- *   // "FIXED(4) is not long enough to store 10 digits"
- *   Types.required(FIXED_LEN_BYTE_ARRAY).length(4)
- *        .as(DECIMAL).precision(10)
- *        .named("badDecimal");
- * </pre>
- */
-public class Types {
-  private static final int NOT_SET = 0;
-
-  /**
-   * A base builder for {@link Type} objects.
-   *
-   * @param <P> The type that this builder will return from
-   *          {@link #named(String)} when the type is built.
-   */
-  public abstract static class Builder<T extends Builder, P> {
-    protected final P parent;
-    protected final Class<? extends P> returnClass;
-
-    protected Type.Repetition repetition = null;
-    protected OriginalType originalType = null;
-    protected Type.ID id = null;
-    private boolean repetitionAlreadySet = false;
-
-    /**
-     * Construct a type builder that returns a "parent" object when the builder
-     * is finished. The {@code parent} will be returned by
-     * {@link #named(String)} so that builders can be chained.
-     *
-     * @param parent a non-null object to return from {@link #named(String)}
-     */
-    protected Builder(P parent) {
-      Preconditions.checkNotNull(parent, "Parent cannot be null");
-      this.parent = parent;
-      this.returnClass = null;
-    }
-
-    /**
-     * Construct a type builder that returns the {@link Type} that was built
-     * when the builder is finished. The {@code returnClass} must be the
-     * expected {@code Type} class.
-     *
-     * @param returnClass a {@code Type} to return from {@link #named(String)}
-     */
-    protected Builder(Class<P> returnClass) {
-      Preconditions.checkArgument(Type.class.isAssignableFrom(returnClass),
-          "The requested return class must extend Type");
-      this.returnClass = returnClass;
-      this.parent = null;
-    }
-
-    protected abstract T self();
-
-    protected final T repetition(Type.Repetition repetition) {
-      Preconditions.checkArgument(!repetitionAlreadySet,
-          "Repetition has already been set");
-      Preconditions.checkNotNull(repetition, "Repetition cannot be null");
-      this.repetition = repetition;
-      this.repetitionAlreadySet = true;
-      return self();
-    }
-
-    /**
-     * Adds a type annotation ({@link OriginalType}) to the type being built.
-     * <p>
-     * Type annotations are used to extend the types that parquet can store, by
-     * specifying how the primitive types should be interpreted. This keeps the
-     * set of primitive types to a minimum and reuses parquet's efficient
-     * encodings. For example, strings are stored as byte arrays (binary) with
-     * a UTF8 annotation.
-     *
-     * @param type an {@code OriginalType}
-     * @return this builder for method chaining
-     */
-    public T as(OriginalType type) {
-      this.originalType = type;
-      return self();
-    }
-
-    /**
-     * adds an id annotation to the type being built.
-     * <p>
-     * ids are used to capture the original id when converting from models using ids (thrift, protobufs)
-     *
-     * @param id the id of the field
-     * @return this builder for method chaining
-     */
-    public T id(int id) {
-      this.id = new ID(id);
-      return self();
-    }
-
-    abstract protected Type build(String name);
-
-    /**
-     * Builds a {@link Type} and returns the parent builder, if given, or the
-     * {@code Type} that was built. If returning a parent object that is a
-     * GroupBuilder, the constructed type will be added to it as a field.
-     * <p>
-     * <em>Note:</em> Any configuration for this type builder should be done
-     * before calling this method.
-     *
-     * @param name a name for the constructed type
-     * @return the parent {@code GroupBuilder} or the constructed {@code Type}
-     */
-    public P named(String name) {
-      Preconditions.checkNotNull(name, "Name is required");
-      Preconditions.checkNotNull(repetition, "Repetition is required");
-
-      Type type = build(name);
-      if (parent != null) {
-        // if the parent is a GroupBuilder, add type to it
-        if (GroupBuilder.class.isAssignableFrom(parent.getClass())) {
-          GroupBuilder.class.cast(parent).addField(type);
-        }
-        return parent;
-      } else {
-        // no parent indicates that the Type object should be returned
-        // the constructor check guarantees that returnClass is a Type
-        return returnClass.cast(type);
-      }
-    }
-
-  }
-
-  /**
-   * A builder for {@link PrimitiveType} objects.
-   *
-   * @param <P> The type that this builder will return from
-   *          {@link #named(String)} when the type is built.
-   */
-  public static class PrimitiveBuilder<P> extends Builder<PrimitiveBuilder<P>, P> {
-    private static final long MAX_PRECISION_INT32 = maxPrecision(4);
-    private static final long MAX_PRECISION_INT64 = maxPrecision(8);
-    private final PrimitiveTypeName primitiveType;
-    private int length = NOT_SET;
-    private int precision = NOT_SET;
-    private int scale = NOT_SET;
-
-    private PrimitiveBuilder(P parent, PrimitiveTypeName type) {
-      super(parent);
-      this.primitiveType = type;
-    }
-
-    private PrimitiveBuilder(Class<P> returnType, PrimitiveTypeName type) {
-      super(returnType);
-      this.primitiveType = type;
-    }
-
-    @Override
-    protected PrimitiveBuilder<P> self() {
-      return this;
-    }
-
-    /**
-     * Adds the length for a FIXED_LEN_BYTE_ARRAY.
-     *
-     * @param length an int length
-     * @return this builder for method chaining
-     */
-    public PrimitiveBuilder<P> length(int length) {
-      this.length = length;
-      return this;
-    }
-
-    /**
-     * Adds the precision for a DECIMAL.
-     * <p>
-     * This value is required for decimals and must be less than or equal to
-     * the maximum number of base-10 digits in the underlying type. A 4-byte
-     * fixed, for example, can store up to 9 base-10 digits.
-     *
-     * @param precision an int precision value for the DECIMAL
-     * @return this builder for method chaining
-     */
-    public PrimitiveBuilder<P> precision(int precision) {
-      this.precision = precision;
-      return this;
-    }
-
-    /**
-     * Adds the scale for a DECIMAL.
-     * <p>
-     * This value must be less than the maximum precision of the type and must
-     * be a positive number. If not set, the default scale is 0.
-     * <p>
-     * The scale specifies the number of digits of the underlying unscaled
-     * that are to the right of the decimal point. The decimal interpretation
-     * of values in this column is: {@code value*10^(-scale)}.
-     *
-     * @param scale an int scale value for the DECIMAL
-     * @return this builder for method chaining
-     */
-    public PrimitiveBuilder<P> scale(int scale) {
-      this.scale = scale;
-      return this;
-    }
-
-    @Override
-    protected PrimitiveType build(String name) {
-      if (PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY == primitiveType) {
-        Preconditions.checkArgument(length > 0,
-            "Invalid FIXED_LEN_BYTE_ARRAY length: " + length);
-      }
-
-      DecimalMetadata meta = decimalMetadata();
-
-      // validate type annotations and required metadata
-      if (originalType != null) {
-        switch (originalType) {
-          case UTF8:
-          case JSON:
-          case BSON:
-            Preconditions.checkState(
-                primitiveType == PrimitiveTypeName.BINARY,
-                originalType.toString() + " can only annotate binary fields");
-            break;
-          case DECIMAL:
-            Preconditions.checkState(
-                (primitiveType == PrimitiveTypeName.INT32) ||
-                (primitiveType == PrimitiveTypeName.INT64) ||
-                (primitiveType == PrimitiveTypeName.BINARY) ||
-                (primitiveType == PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY),
-                "DECIMAL can only annotate INT32, INT64, BINARY, and FIXED"
-            );
-            if (primitiveType == PrimitiveTypeName.INT32) {
-              Preconditions.checkState(
-                  meta.getPrecision() <= MAX_PRECISION_INT32,
-                  "INT32 cannot store " + meta.getPrecision() + " digits " +
-                      "(max " + MAX_PRECISION_INT32 + ")");
-            } else if (primitiveType == PrimitiveTypeName.INT64) {
-              Preconditions.checkState(
-                  meta.getPrecision() <= MAX_PRECISION_INT64,
-                  "INT64 cannot store " + meta.getPrecision() + " digits " +
-                  "(max " + MAX_PRECISION_INT64 + ")");
-            } else if (primitiveType == PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY) {
-              Preconditions.checkState(
-                  meta.getPrecision() <= maxPrecision(length),
-                  "FIXED(" + length + ") cannot store " + meta.getPrecision() +
-                  " digits (max " + maxPrecision(length) + ")");
-            }
-            break;
-          case DATE:
-          case TIME_MILLIS:
-          case UINT_8:
-          case UINT_16:
-          case UINT_32:
-          case INT_8:
-          case INT_16:
-          case INT_32:
-            Preconditions.checkState(primitiveType == PrimitiveTypeName.INT32,
-                originalType.toString() + " can only annotate INT32");
-            break;
-          case TIMESTAMP_MILLIS:
-          case UINT_64:
-          case INT_64:
-            Preconditions.checkState(primitiveType == PrimitiveTypeName.INT64,
-                originalType.toString() + " can only annotate INT64");
-            break;
-          case INTERVAL:
-            Preconditions.checkState(
-                (primitiveType == PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY) &&
-                (length == 12),
-                "INTERVAL can only annotate FIXED_LEN_BYTE_ARRAY(12)");
-            break;
-          case ENUM:
-            Preconditions.checkState(
-                primitiveType == PrimitiveTypeName.BINARY,
-                "ENUM can only annotate binary fields");
-            break;
-          default:
-            throw new IllegalStateException(originalType + " can not be applied to a primitive type");
-        }
-      }
-
-      return new PrimitiveType(repetition, primitiveType, length, name, originalType, meta, id);
-    }
-
-    private static long maxPrecision(int numBytes) {
-      return Math.round(                      // convert double to long
-          Math.floor(Math.log10(              // number of base-10 digits
-              Math.pow(2, 8 * numBytes - 1) - 1)  // max value stored in numBytes
-          )
-      );
-    }
-
-    protected DecimalMetadata decimalMetadata() {
-      DecimalMetadata meta = null;
-      if (OriginalType.DECIMAL == originalType) {
-        Preconditions.checkArgument(precision > 0,
-            "Invalid DECIMAL precision: " + precision);
-        Preconditions.checkArgument(scale >= 0,
-            "Invalid DECIMAL scale: " + scale);
-        Preconditions.checkArgument(scale <= precision,
-            "Invalid DECIMAL scale: cannot be greater than precision");
-        meta = new DecimalMetadata(precision, scale);
-      }
-      return meta;
-    }
-  }
-
-  /**
-   * A builder for {@link GroupType} objects.
-   *
-   * @param <P> The type that this builder will return from
-   *          {@link #named(String)} when the type is built.
-   */
-  public static class GroupBuilder<P> extends Builder<GroupBuilder<P>, P> {
-    protected final List<Type> fields;
-
-    private GroupBuilder(P parent) {
-      super(parent);
-      this.fields = new ArrayList<Type>();
-    }
-
-    private GroupBuilder(Class<P> returnType) {
-      super(returnType);
-      this.fields = new ArrayList<Type>();
-    }
-
-    @Override
-    protected GroupBuilder<P> self() {
-      return this;
-    }
-
-    public PrimitiveBuilder<GroupBuilder<P>> primitive(
-        PrimitiveTypeName type, Type.Repetition repetition) {
-      return new PrimitiveBuilder<GroupBuilder<P>>(this, type)
-          .repetition(repetition);
-    }
-
-    /**
-     * Returns a {@link PrimitiveBuilder} for the required primitive type
-     * {@code type}.
-     *
-     * @param type a {@link PrimitiveTypeName}
-     * @return a primitive builder for {@code type} that will return this
-     *          builder for additional fields.
-     */
-    public PrimitiveBuilder<GroupBuilder<P>> required(
-        PrimitiveTypeName type) {
-      return new PrimitiveBuilder<GroupBuilder<P>>(this, type)
-          .repetition(Type.Repetition.REQUIRED);
-    }
-
-    /**
-     * Returns a {@link PrimitiveBuilder} for the optional primitive type
-     * {@code type}.
-     *
-     * @param type a {@link PrimitiveTypeName}
-     * @return a primitive builder for {@code type} that will return this
-     *          builder for additional fields.
-     */
-    public PrimitiveBuilder<GroupBuilder<P>> optional(
-        PrimitiveTypeName type) {
-      return new PrimitiveBuilder<GroupBuilder<P>>(this, type)
-          .repetition(Type.Repetition.OPTIONAL);
-    }
-
-    /**
-     * Returns a {@link PrimitiveBuilder} for the repeated primitive type
-     * {@code type}.
-     *
-     * @param type a {@link PrimitiveTypeName}
-     * @return a primitive builder for {@code type} that will return this
-     *          builder for additional fields.
-     */
-    public PrimitiveBuilder<GroupBuilder<P>> repeated(
-        PrimitiveTypeName type) {
-      return new PrimitiveBuilder<GroupBuilder<P>>(this, type)
-          .repetition(Type.Repetition.REPEATED);
-    }
-
-    public GroupBuilder<GroupBuilder<P>> group(Type.Repetition repetition) {
-      return new GroupBuilder<GroupBuilder<P>>(this)
-          .repetition(repetition);
-    }
-
-    /**
-     * Returns a {@link GroupBuilder} to build a required sub-group.
-     *
-     * @return a group builder that will return this builder for additional
-     *          fields.
-     */
-    public GroupBuilder<GroupBuilder<P>> requiredGroup() {
-      return new GroupBuilder<GroupBuilder<P>>(this)
-          .repetition(Type.Repetition.REQUIRED);
-    }
-
-    /**
-     * Returns a {@link GroupBuilder} to build an optional sub-group.
-     *
-     * @return a group builder that will return this builder for additional
-     *          fields.
-     */
-    public GroupBuilder<GroupBuilder<P>> optionalGroup() {
-      return new GroupBuilder<GroupBuilder<P>>(this)
-          .repetition(Type.Repetition.OPTIONAL);
-    }
-
-    /**
-     * Returns a {@link GroupBuilder} to build a repeated sub-group.
-     *
-     * @return a group builder that will return this builder for additional
-     *          fields.
-     */
-    public GroupBuilder<GroupBuilder<P>> repeatedGroup() {
-      return new GroupBuilder<GroupBuilder<P>>(this)
-          .repetition(Type.Repetition.REPEATED);
-    }
-
-    /**
-     * Adds {@code type} as a sub-field to the group configured by this builder.
-     *
-     * @return this builder for additional fields.
-     */
-    public GroupBuilder<P> addField(Type type) {
-      fields.add(type);
-      return this;
-    }
-
-    /**
-     * Adds {@code types} as sub-fields of the group configured by this builder.
-     *
-     * @return this builder for additional fields.
-     */
-    public GroupBuilder<P> addFields(Type... types) {
-      for (Type type : types) {
-        fields.add(type);
-      }
-      return this;
-    }
-
-    @Override
-    protected GroupType build(String name) {
-      Preconditions.checkState(!fields.isEmpty(),
-          "Cannot build an empty group");
-      return new GroupType(repetition, name, originalType, fields, id);
-    }
-  }
-
-  public static class MessageTypeBuilder extends GroupBuilder<MessageType> {
-    private MessageTypeBuilder() {
-      super(MessageType.class);
-      repetition(Type.Repetition.REQUIRED);
-    }
-
-    /**
-     * Builds and returns the {@link MessageType} configured by this builder.
-     * <p>
-     * <em>Note:</em> All primitive types and sub-groups should be added before
-     * calling this method.
-     *
-     * @param name a name for the constructed type
-     * @return the final {@code MessageType} configured by this builder.
-     */
-    @Override
-    public MessageType named(String name) {
-      Preconditions.checkNotNull(name, "Name is required");
-      return new MessageType(name, fields);
-    }
-  }
-
-  /**
-   * Returns a builder to construct a {@link MessageType}.
-   *
-   * @return a {@link MessageTypeBuilder}
-   */
-  public static MessageTypeBuilder buildMessage() {
-    return new MessageTypeBuilder();
-  }
-
-  public static GroupBuilder<GroupType> buildGroup(
-      Type.Repetition repetition) {
-    return new GroupBuilder<GroupType>(GroupType.class).repetition(repetition);
-  }
-
-  /**
-   * Returns a builder to construct a required {@link GroupType}.
-   *
-   * @return a {@link GroupBuilder}
-   */
-  public static GroupBuilder<GroupType> requiredGroup() {
-    return new GroupBuilder<GroupType>(GroupType.class)
-        .repetition(Type.Repetition.REQUIRED);
-  }
-
-  /**
-   * Returns a builder to construct an optional {@link GroupType}.
-   *
-   * @return a {@link GroupBuilder}
-   */
-  public static GroupBuilder<GroupType> optionalGroup() {
-    return new GroupBuilder<GroupType>(GroupType.class)
-        .repetition(Type.Repetition.OPTIONAL);
-  }
-
-  /**
-   * Returns a builder to construct a repeated {@link GroupType}.
-   *
-   * @return a {@link GroupBuilder}
-   */
-  public static GroupBuilder<GroupType> repeatedGroup() {
-    return new GroupBuilder<GroupType>(GroupType.class)
-        .repetition(Type.Repetition.REPEATED);
-  }
-
-  public static PrimitiveBuilder<PrimitiveType> primitive(
-      PrimitiveTypeName type, Type.Repetition repetition) {
-    return new PrimitiveBuilder<PrimitiveType>(PrimitiveType.class, type)
-        .repetition(repetition);
-  }
-
-  /**
-   * Returns a builder to construct a required {@link PrimitiveType}.
-   *
-   * @param type a {@link PrimitiveTypeName} for the constructed type
-   * @return a {@link PrimitiveBuilder}
-   */
-  public static PrimitiveBuilder<PrimitiveType> required(
-      PrimitiveTypeName type) {
-    return new PrimitiveBuilder<PrimitiveType>(PrimitiveType.class, type)
-        .repetition(Type.Repetition.REQUIRED);
-  }
-
-  /**
-   * Returns a builder to construct an optional {@link PrimitiveType}.
-   *
-   * @param type a {@link PrimitiveTypeName} for the constructed type
-   * @return a {@link PrimitiveBuilder}
-   */
-  public static PrimitiveBuilder<PrimitiveType> optional(
-      PrimitiveTypeName type) {
-    return new PrimitiveBuilder<PrimitiveType>(PrimitiveType.class, type)
-        .repetition(Type.Repetition.OPTIONAL);
-  }
-
-  /**
-   * Returns a builder to construct a repeated {@link PrimitiveType}.
-   *
-   * @param type a {@link PrimitiveTypeName} for the constructed type
-   * @return a {@link PrimitiveBuilder}
-   */
-  public static PrimitiveBuilder<PrimitiveType> repeated(
-      PrimitiveTypeName type) {
-    return new PrimitiveBuilder<PrimitiveType>(PrimitiveType.class, type)
-        .repetition(Type.Repetition.REPEATED);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/test/java/org/apache/parquet/column/impl/TestColumnReaderImpl.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/org/apache/parquet/column/impl/TestColumnReaderImpl.java b/parquet-column/src/test/java/org/apache/parquet/column/impl/TestColumnReaderImpl.java
new file mode 100644
index 0000000..ebdbdf8
--- /dev/null
+++ b/parquet-column/src/test/java/org/apache/parquet/column/impl/TestColumnReaderImpl.java
@@ -0,0 +1,123 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column.impl;
+
+import static junit.framework.Assert.assertEquals;
+import static org.apache.parquet.column.ParquetProperties.WriterVersion.PARQUET_2_0;
+
+import java.util.List;
+
+import org.junit.Test;
+
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.ColumnReader;
+import org.apache.parquet.column.ParquetProperties;
+import org.apache.parquet.column.page.DataPage;
+import org.apache.parquet.column.page.DataPageV2;
+import org.apache.parquet.column.page.mem.MemPageReader;
+import org.apache.parquet.column.page.mem.MemPageWriter;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.io.api.PrimitiveConverter;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.MessageTypeParser;
+
+public class TestColumnReaderImpl {
+
+  private int rows = 13001;
+
+  private static final class ValidatingConverter extends PrimitiveConverter {
+    int count;
+
+    @Override
+    public void addBinary(Binary value) {
+      assertEquals("bar" + count % 10, value.toStringUsingUTF8());
+      ++ count;
+    }
+  }
+
+  @Test
+  public void test() {
+    MessageType schema = MessageTypeParser.parseMessageType("message test { required binary foo; }");
+    ColumnDescriptor col = schema.getColumns().get(0);
+    MemPageWriter pageWriter = new MemPageWriter();
+    ColumnWriterV2 columnWriterV2 = new ColumnWriterV2(col, pageWriter, new ParquetProperties(1024, PARQUET_2_0, true), 2048);
+    for (int i = 0; i < rows; i++) {
+      columnWriterV2.write(Binary.fromString("bar" + i % 10), 0, 0);
+      if ((i + 1) % 1000 == 0) {
+        columnWriterV2.writePage(i);
+      }
+    }
+    columnWriterV2.writePage(rows);
+    columnWriterV2.finalizeColumnChunk();
+    List<DataPage> pages = pageWriter.getPages();
+    int valueCount = 0;
+    int rowCount = 0;
+    for (DataPage dataPage : pages) {
+      valueCount += dataPage.getValueCount();
+      rowCount += ((DataPageV2)dataPage).getRowCount();
+    }
+    assertEquals(rows, rowCount);
+    assertEquals(rows, valueCount);
+    MemPageReader pageReader = new MemPageReader((long)rows, pages.iterator(), pageWriter.getDictionaryPage());
+    ValidatingConverter converter = new ValidatingConverter();
+    ColumnReader columnReader = new ColumnReaderImpl(col, pageReader, converter);
+    for (int i = 0; i < rows; i++) {
+      assertEquals(0, columnReader.getCurrentRepetitionLevel());
+      assertEquals(0, columnReader.getCurrentDefinitionLevel());
+      columnReader.writeCurrentValueToConverter();
+      columnReader.consume();
+    }
+    assertEquals(rows, converter.count);
+  }
+
+  @Test
+  public void testOptional() {
+    MessageType schema = MessageTypeParser.parseMessageType("message test { optional binary foo; }");
+    ColumnDescriptor col = schema.getColumns().get(0);
+    MemPageWriter pageWriter = new MemPageWriter();
+    ColumnWriterV2 columnWriterV2 = new ColumnWriterV2(col, pageWriter, new ParquetProperties(1024, PARQUET_2_0, true), 2048);
+    for (int i = 0; i < rows; i++) {
+      columnWriterV2.writeNull(0, 0);
+      if ((i + 1) % 1000 == 0) {
+        columnWriterV2.writePage(i);
+      }
+    }
+    columnWriterV2.writePage(rows);
+    columnWriterV2.finalizeColumnChunk();
+    List<DataPage> pages = pageWriter.getPages();
+    int valueCount = 0;
+    int rowCount = 0;
+    for (DataPage dataPage : pages) {
+      valueCount += dataPage.getValueCount();
+      rowCount += ((DataPageV2)dataPage).getRowCount();
+    }
+    assertEquals(rows, rowCount);
+    assertEquals(rows, valueCount);
+    MemPageReader pageReader = new MemPageReader((long)rows, pages.iterator(), pageWriter.getDictionaryPage());
+    ValidatingConverter converter = new ValidatingConverter();
+    ColumnReader columnReader = new ColumnReaderImpl(col, pageReader, converter);
+    for (int i = 0; i < rows; i++) {
+      assertEquals(0, columnReader.getCurrentRepetitionLevel());
+      assertEquals(0, columnReader.getCurrentDefinitionLevel());
+      columnReader.consume();
+    }
+    assertEquals(0, converter.count);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/test/java/org/apache/parquet/column/mem/TestMemColumn.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/org/apache/parquet/column/mem/TestMemColumn.java b/parquet-column/src/test/java/org/apache/parquet/column/mem/TestMemColumn.java
new file mode 100644
index 0000000..d801442
--- /dev/null
+++ b/parquet-column/src/test/java/org/apache/parquet/column/mem/TestMemColumn.java
@@ -0,0 +1,164 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column.mem;
+
+import static org.junit.Assert.assertEquals;
+
+import org.junit.Test;
+
+import org.apache.parquet.Log;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.ColumnReader;
+import org.apache.parquet.column.ColumnWriter;
+import org.apache.parquet.column.ParquetProperties.WriterVersion;
+import org.apache.parquet.column.impl.ColumnReadStoreImpl;
+import org.apache.parquet.column.impl.ColumnWriteStoreV1;
+import org.apache.parquet.column.page.mem.MemPageStore;
+import org.apache.parquet.example.DummyRecordConverter;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.MessageTypeParser;
+
+public class TestMemColumn {
+  private static final Log LOG = Log.getLog(TestMemColumn.class);
+
+  @Test
+  public void testMemColumn() throws Exception {
+    MessageType schema = MessageTypeParser.parseMessageType("message msg { required group foo { required int64 bar; } }");
+    ColumnDescriptor path = schema.getColumnDescription(new String[] {"foo", "bar"});
+    MemPageStore memPageStore = new MemPageStore(10);
+    ColumnWriteStoreV1 memColumnsStore = newColumnWriteStoreImpl(memPageStore);
+    ColumnWriter columnWriter = memColumnsStore.getColumnWriter(path);
+    columnWriter.write(42l, 0, 0);
+    memColumnsStore.flush();
+
+    ColumnReader columnReader = getColumnReader(memPageStore, path, schema);
+    for (int i = 0; i < columnReader.getTotalValueCount(); i++) {
+      assertEquals(columnReader.getCurrentRepetitionLevel(), 0);
+      assertEquals(columnReader.getCurrentDefinitionLevel(), 0);
+      assertEquals(columnReader.getLong(), 42);
+      columnReader.consume();
+    }
+  }
+
+  private ColumnWriter getColumnWriter(ColumnDescriptor path, MemPageStore memPageStore) {
+    ColumnWriteStoreV1 memColumnsStore = newColumnWriteStoreImpl(memPageStore);
+    ColumnWriter columnWriter = memColumnsStore.getColumnWriter(path);
+    return columnWriter;
+  }
+
+  private ColumnReader getColumnReader(MemPageStore memPageStore, ColumnDescriptor path, MessageType schema) {
+    return new ColumnReadStoreImpl(
+        memPageStore,
+        new DummyRecordConverter(schema).getRootConverter(),
+        schema
+        ).getColumnReader(path);
+  }
+
+  @Test
+  public void testMemColumnBinary() throws Exception {
+    MessageType mt = MessageTypeParser.parseMessageType("message msg { required group foo { required binary bar; } }");
+    String[] col = new String[]{"foo", "bar"};
+    MemPageStore memPageStore = new MemPageStore(10);
+
+    ColumnWriteStoreV1 memColumnsStore = newColumnWriteStoreImpl(memPageStore);
+    ColumnDescriptor path1 = mt.getColumnDescription(col);
+    ColumnDescriptor path = path1;
+
+    ColumnWriter columnWriter = memColumnsStore.getColumnWriter(path);
+    columnWriter.write(Binary.fromString("42"), 0, 0);
+    memColumnsStore.flush();
+
+    ColumnReader columnReader = getColumnReader(memPageStore, path, mt);
+    for (int i = 0; i < columnReader.getTotalValueCount(); i++) {
+      assertEquals(columnReader.getCurrentRepetitionLevel(), 0);
+      assertEquals(columnReader.getCurrentDefinitionLevel(), 0);
+      assertEquals(columnReader.getBinary().toStringUsingUTF8(), "42");
+      columnReader.consume();
+    }
+  }
+
+  @Test
+  public void testMemColumnSeveralPages() throws Exception {
+    MessageType mt = MessageTypeParser.parseMessageType("message msg { required group foo { required int64 bar; } }");
+    String[] col = new String[]{"foo", "bar"};
+    MemPageStore memPageStore = new MemPageStore(10);
+    ColumnWriteStoreV1 memColumnsStore = newColumnWriteStoreImpl(memPageStore);
+    ColumnDescriptor path1 = mt.getColumnDescription(col);
+    ColumnDescriptor path = path1;
+
+    ColumnWriter columnWriter = memColumnsStore.getColumnWriter(path);
+    for (int i = 0; i < 2000; i++) {
+      columnWriter.write(42l, 0, 0);
+    }
+    memColumnsStore.flush();
+
+    ColumnReader columnReader = getColumnReader(memPageStore, path, mt);
+    for (int i = 0; i < columnReader.getTotalValueCount(); i++) {
+      assertEquals(columnReader.getCurrentRepetitionLevel(), 0);
+      assertEquals(columnReader.getCurrentDefinitionLevel(), 0);
+      assertEquals(columnReader.getLong(), 42);
+      columnReader.consume();
+    }
+  }
+
+  @Test
+  public void testMemColumnSeveralPagesRepeated() throws Exception {
+    MessageType mt = MessageTypeParser.parseMessageType("message msg { repeated group foo { repeated int64 bar; } }");
+    String[] col = new String[]{"foo", "bar"};
+    MemPageStore memPageStore = new MemPageStore(10);
+    ColumnWriteStoreV1 memColumnsStore = newColumnWriteStoreImpl(memPageStore);
+    ColumnDescriptor path1 = mt.getColumnDescription(col);
+    ColumnDescriptor path = path1;
+
+    ColumnWriter columnWriter = memColumnsStore.getColumnWriter(path);
+    int[] rs = { 0, 0, 0, 1, 1, 1, 2, 2, 2};
+    int[] ds = { 0, 1, 2, 0, 1, 2, 0, 1, 2};
+    for (int i = 0; i < 837; i++) {
+      int r = rs[i % rs.length];
+      int d = ds[i % ds.length];
+      LOG.debug("write i: " + i);
+      if (d == 2) {
+        columnWriter.write((long)i, r, d);
+      } else {
+        columnWriter.writeNull(r, d);
+      }
+    }
+    memColumnsStore.flush();
+
+    ColumnReader columnReader = getColumnReader(memPageStore, path, mt);
+    int i = 0;
+    for (int j = 0; j < columnReader.getTotalValueCount(); j++) {
+      int r = rs[i % rs.length];
+      int d = ds[i % ds.length];
+      LOG.debug("read i: " + i);
+      assertEquals("r row " + i, r, columnReader.getCurrentRepetitionLevel());
+      assertEquals("d row " + i, d, columnReader.getCurrentDefinitionLevel());
+      if (d == 2) {
+        assertEquals("data row " + i, (long)i, columnReader.getLong());
+      }
+      columnReader.consume();
+      ++ i;
+    }
+  }
+
+  private ColumnWriteStoreV1 newColumnWriteStoreImpl(MemPageStore memPageStore) {
+    return new ColumnWriteStoreV1(memPageStore, 2048, 2048, false, WriterVersion.PARQUET_1_0);
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/test/java/org/apache/parquet/column/mem/TestMemPageStore.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/org/apache/parquet/column/mem/TestMemPageStore.java b/parquet-column/src/test/java/org/apache/parquet/column/mem/TestMemPageStore.java
new file mode 100644
index 0000000..8fcada6
--- /dev/null
+++ b/parquet-column/src/test/java/org/apache/parquet/column/mem/TestMemPageStore.java
@@ -0,0 +1,61 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column.mem;
+
+import static org.apache.parquet.column.Encoding.*;
+
+import java.io.IOException;
+
+import org.junit.Test;
+
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.page.DataPage;
+import org.apache.parquet.column.page.PageReader;
+import org.apache.parquet.column.page.PageWriter;
+import org.apache.parquet.column.page.mem.MemPageStore;
+import org.apache.parquet.column.statistics.LongStatistics;
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
+
+public class TestMemPageStore {
+
+  private String[] path = { "foo", "bar"};
+
+  @Test
+  public void test() throws IOException {
+    MemPageStore memPageStore = new MemPageStore(10);
+    ColumnDescriptor col = new ColumnDescriptor(path , PrimitiveTypeName.INT64, 2, 2);
+    LongStatistics stats = new LongStatistics();
+    PageWriter pageWriter = memPageStore.getPageWriter(col);
+    pageWriter.writePage(BytesInput.from(new byte[735]), 209, stats, BIT_PACKED, BIT_PACKED, PLAIN);
+    pageWriter.writePage(BytesInput.from(new byte[743]), 209, stats, BIT_PACKED, BIT_PACKED, PLAIN);
+    pageWriter.writePage(BytesInput.from(new byte[743]), 209, stats, BIT_PACKED, BIT_PACKED, PLAIN);
+    pageWriter.writePage(BytesInput.from(new byte[735]), 209, stats, BIT_PACKED, BIT_PACKED, PLAIN);
+    PageReader pageReader = memPageStore.getPageReader(col);
+    long totalValueCount = pageReader.getTotalValueCount();
+    System.out.println(totalValueCount);
+    int total = 0;
+    do {
+      DataPage readPage = pageReader.readPage();
+      total += readPage.getValueCount();
+      System.out.println(readPage);
+      // TODO: assert
+    } while (total < totalValueCount);
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/test/java/org/apache/parquet/column/page/mem/MemPageReader.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/org/apache/parquet/column/page/mem/MemPageReader.java b/parquet-column/src/test/java/org/apache/parquet/column/page/mem/MemPageReader.java
new file mode 100644
index 0000000..a6e8910
--- /dev/null
+++ b/parquet-column/src/test/java/org/apache/parquet/column/page/mem/MemPageReader.java
@@ -0,0 +1,69 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column.page.mem;
+
+import static org.apache.parquet.Log.DEBUG;
+import static org.apache.parquet.Preconditions.checkNotNull;
+
+import java.util.Iterator;
+
+import org.apache.parquet.Log;
+import org.apache.parquet.column.page.DictionaryPage;
+import org.apache.parquet.column.page.DataPage;
+import org.apache.parquet.column.page.PageReader;
+import org.apache.parquet.io.ParquetDecodingException;
+
+
+public class MemPageReader implements PageReader {
+  private static final Log LOG = Log.getLog(MemPageReader.class);
+
+  private final long totalValueCount;
+  private final Iterator<DataPage> pages;
+  private final DictionaryPage dictionaryPage;
+
+  public MemPageReader(long totalValueCount, Iterator<DataPage> pages, DictionaryPage dictionaryPage) {
+    super();
+    checkNotNull(pages, "pages");
+    this.totalValueCount = totalValueCount;
+    this.pages = pages;
+    this.dictionaryPage = dictionaryPage;
+  }
+
+  @Override
+  public long getTotalValueCount() {
+    return totalValueCount;
+  }
+
+  @Override
+  public DataPage readPage() {
+    if (pages.hasNext()) {
+      DataPage next = pages.next();
+      if (DEBUG) LOG.debug("read page " + next);
+      return next;
+    } else {
+      throw new ParquetDecodingException("after last page");
+    }
+  }
+
+  @Override
+  public DictionaryPage readDictionaryPage() {
+    return dictionaryPage;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/test/java/org/apache/parquet/column/page/mem/MemPageStore.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/org/apache/parquet/column/page/mem/MemPageStore.java b/parquet-column/src/test/java/org/apache/parquet/column/page/mem/MemPageStore.java
new file mode 100644
index 0000000..219e5cd
--- /dev/null
+++ b/parquet-column/src/test/java/org/apache/parquet/column/page/mem/MemPageStore.java
@@ -0,0 +1,77 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column.page.mem;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.parquet.Log;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.UnknownColumnException;
+import org.apache.parquet.column.page.DataPage;
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.column.page.PageReader;
+import org.apache.parquet.column.page.PageWriteStore;
+import org.apache.parquet.column.page.PageWriter;
+
+
+public class MemPageStore implements PageReadStore, PageWriteStore {
+  private static final Log LOG = Log.getLog(MemPageStore.class);
+
+  private Map<ColumnDescriptor, MemPageWriter> pageWriters = new HashMap<ColumnDescriptor, MemPageWriter>();
+
+  private long rowCount;
+
+  public MemPageStore(long rowCount) {
+    super();
+    this.rowCount = rowCount;
+  }
+
+  @Override
+  public PageWriter getPageWriter(ColumnDescriptor path) {
+    MemPageWriter pageWriter = pageWriters.get(path);
+    if (pageWriter == null) {
+      pageWriter = new MemPageWriter();
+      pageWriters.put(path, pageWriter);
+    }
+    return pageWriter;
+  }
+
+  @Override
+  public PageReader getPageReader(ColumnDescriptor descriptor) {
+    MemPageWriter pageWriter = pageWriters.get(descriptor);
+    if (pageWriter == null) {
+      throw new UnknownColumnException(descriptor);
+    }
+    List<DataPage> pages = new ArrayList<DataPage>(pageWriter.getPages());
+    if (Log.DEBUG) LOG.debug("initialize page reader with "+ pageWriter.getTotalValueCount() + " values and " + pages.size() + " pages");
+    return new MemPageReader(pageWriter.getTotalValueCount(), pages.iterator(), pageWriter.getDictionaryPage());
+  }
+
+  @Override
+  public long getRowCount() {
+    return rowCount;
+  }
+
+  public void addRowCount(long count) {
+    rowCount += count;
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/test/java/org/apache/parquet/column/page/mem/MemPageWriter.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/org/apache/parquet/column/page/mem/MemPageWriter.java b/parquet-column/src/test/java/org/apache/parquet/column/page/mem/MemPageWriter.java
new file mode 100644
index 0000000..d5bfe22
--- /dev/null
+++ b/parquet-column/src/test/java/org/apache/parquet/column/page/mem/MemPageWriter.java
@@ -0,0 +1,113 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column.page.mem;
+
+import static org.apache.parquet.Log.DEBUG;
+import static org.apache.parquet.bytes.BytesInput.copy;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.parquet.Log;
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.column.Encoding;
+import org.apache.parquet.column.page.DataPageV1;
+import org.apache.parquet.column.page.DataPageV2;
+import org.apache.parquet.column.page.DictionaryPage;
+import org.apache.parquet.column.page.DataPage;
+import org.apache.parquet.column.page.PageWriter;
+import org.apache.parquet.column.statistics.Statistics;
+import org.apache.parquet.io.ParquetEncodingException;
+
+public class MemPageWriter implements PageWriter {
+  private static final Log LOG = Log.getLog(MemPageWriter.class);
+
+  private final List<DataPage> pages = new ArrayList<DataPage>();
+  private DictionaryPage dictionaryPage;
+  private long memSize = 0;
+  private long totalValueCount = 0;
+
+  @Override
+  public void writePage(BytesInput bytesInput, int valueCount, Statistics statistics, Encoding rlEncoding, Encoding dlEncoding, Encoding valuesEncoding)
+      throws IOException {
+    if (valueCount == 0) {
+      throw new ParquetEncodingException("illegal page of 0 values");
+    }
+    memSize += bytesInput.size();
+    pages.add(new DataPageV1(BytesInput.copy(bytesInput), valueCount, (int)bytesInput.size(), statistics, rlEncoding, dlEncoding, valuesEncoding));
+    totalValueCount += valueCount;
+    if (DEBUG) LOG.debug("page written for " + bytesInput.size() + " bytes and " + valueCount + " records");
+  }
+
+  @Override
+  public void writePageV2(int rowCount, int nullCount, int valueCount,
+      BytesInput repetitionLevels, BytesInput definitionLevels,
+      Encoding dataEncoding, BytesInput data, Statistics<?> statistics) throws IOException {
+    if (valueCount == 0) {
+      throw new ParquetEncodingException("illegal page of 0 values");
+    }
+    long size = repetitionLevels.size() + definitionLevels.size() + data.size();
+    memSize += size;
+    pages.add(DataPageV2.uncompressed(rowCount, nullCount, valueCount, copy(repetitionLevels), copy(definitionLevels), dataEncoding, copy(data), statistics));
+    totalValueCount += valueCount;
+    if (DEBUG) LOG.debug("page written for " + size + " bytes and " + valueCount + " records");
+
+  }
+
+  @Override
+  public long getMemSize() {
+    return memSize;
+  }
+
+  public List<DataPage> getPages() {
+    return pages;
+  }
+
+  public DictionaryPage getDictionaryPage() {
+    return dictionaryPage;
+  }
+
+  public long getTotalValueCount() {
+    return totalValueCount;
+  }
+
+  @Override
+  public long allocatedSize() {
+    // this store keeps only the bytes written
+    return memSize;
+  }
+
+  @Override
+  public void writeDictionaryPage(DictionaryPage dictionaryPage) throws IOException {
+    if (this.dictionaryPage != null) {
+      throw new ParquetEncodingException("Only one dictionary page per block");
+    }
+    this.memSize += dictionaryPage.getBytes().size();
+    this.dictionaryPage = dictionaryPage.copy();
+    if (DEBUG) LOG.debug("dictionary page written for " + dictionaryPage.getBytes().size() + " bytes and " + dictionaryPage.getDictionarySize() + " records");
+  }
+
+  @Override
+  public String memUsageString(String prefix) {
+    return String.format("%s %,d bytes", prefix, memSize);
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/test/java/org/apache/parquet/column/statistics/TestStatistics.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/org/apache/parquet/column/statistics/TestStatistics.java b/parquet-column/src/test/java/org/apache/parquet/column/statistics/TestStatistics.java
new file mode 100644
index 0000000..e0c6a10
--- /dev/null
+++ b/parquet-column/src/test/java/org/apache/parquet/column/statistics/TestStatistics.java
@@ -0,0 +1,569 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column.statistics;
+
+import static org.junit.Assert.*;
+
+import java.nio.ByteBuffer;
+
+import org.junit.Test;
+
+import org.apache.parquet.io.api.Binary;
+
+public class TestStatistics {
+  private int[] integerArray;
+  private long[] longArray;
+  private float[] floatArray;
+  private double[] doubleArray;
+  private String[] stringArray;
+  private boolean[] booleanArray;
+
+  @Test
+  public void testNumNulls() {
+    IntStatistics stats = new IntStatistics();
+    assertEquals(stats.getNumNulls(), 0);
+
+    stats.incrementNumNulls();
+    stats.incrementNumNulls();
+    stats.incrementNumNulls();
+    stats.incrementNumNulls();
+    assertEquals(stats.getNumNulls(), 4);
+
+    stats.incrementNumNulls(5);
+    assertEquals(stats.getNumNulls(), 9);
+
+    stats.setNumNulls(22);
+    assertEquals(stats.getNumNulls(), 22);
+  }
+
+  @Test
+  public void testIntMinMax() {
+    // Test basic max/min
+    integerArray = new int[] {1, 3, 14, 54, 66, 8, 0, 23, 54};
+    IntStatistics stats = new IntStatistics();
+
+    for (int i: integerArray) {
+      stats.updateStats(i);
+    }
+    assertEquals(stats.getMax(), 66);
+    assertEquals(stats.getMin(), 0);
+
+    // Test negative values
+    integerArray = new int[] {-11, 3, -14, 54, -66, 8, 0, -23, 54};
+    IntStatistics statsNeg = new IntStatistics();
+
+    for (int i: integerArray) {
+      statsNeg.updateStats(i);
+    }
+    assertEquals(statsNeg.getMax(), 54);
+    assertEquals(statsNeg.getMin(), -66);
+
+    // Test converting to and from byte[]
+    byte[] intMaxBytes = statsNeg.getMaxBytes();
+    byte[] intMinBytes = statsNeg.getMinBytes();
+
+    assertEquals(ByteBuffer.wrap(intMaxBytes).order(java.nio.ByteOrder.LITTLE_ENDIAN).getInt(), 54);
+    assertEquals(ByteBuffer.wrap(intMinBytes).order(java.nio.ByteOrder.LITTLE_ENDIAN).getInt(), -66);
+
+    IntStatistics statsFromBytes = new IntStatistics();
+    statsFromBytes.setMinMaxFromBytes(intMinBytes, intMaxBytes);
+
+    assertEquals(statsFromBytes.getMax(), 54);
+    assertEquals(statsFromBytes.getMin(), -66);
+
+    integerArray = new int[] {Integer.MAX_VALUE, Integer.MIN_VALUE};
+    IntStatistics minMaxValues = new IntStatistics();
+
+    for (int i: integerArray) {
+      minMaxValues.updateStats(i);
+    }
+    assertEquals(minMaxValues.getMax(), Integer.MAX_VALUE);
+    assertEquals(minMaxValues.getMin(), Integer.MIN_VALUE);
+
+    // Test converting to and from byte[] for large and small values
+    byte[] intMaxBytesMinMax = minMaxValues.getMaxBytes();
+    byte[] intMinBytesMinMax = minMaxValues.getMinBytes();
+
+    assertEquals(ByteBuffer.wrap(intMaxBytesMinMax).order(java.nio.ByteOrder.LITTLE_ENDIAN).getInt(), Integer.MAX_VALUE);
+    assertEquals(ByteBuffer.wrap(intMinBytesMinMax).order(java.nio.ByteOrder.LITTLE_ENDIAN).getInt(), Integer.MIN_VALUE);
+
+    IntStatistics statsFromBytesMinMax= new IntStatistics();
+    statsFromBytesMinMax.setMinMaxFromBytes(intMinBytesMinMax, intMaxBytesMinMax);
+
+    assertEquals(statsFromBytesMinMax.getMax(), Integer.MAX_VALUE);
+    assertEquals(statsFromBytesMinMax.getMin(), Integer.MIN_VALUE);
+
+    // Test print formatting
+    assertEquals(stats.toString(), "min: 0, max: 66, num_nulls: 0");
+  }
+
+  @Test
+  public void testLongMinMax() {
+    // Test basic max/min
+    longArray = new long[] {9, 39, 99, 3, 0, 12, 1000, 65, 542};
+    LongStatistics stats = new LongStatistics();
+
+    for (long l: longArray) {
+      stats.updateStats(l);
+    }
+    assertEquals(stats.getMax(), 1000);
+    assertEquals(stats.getMin(), 0);
+
+    // Test negative values
+    longArray = new long[] {-101, 993, -9914, 54, -9, 89, 0, -23, 90};
+    LongStatistics statsNeg = new LongStatistics();
+
+    for (long l: longArray) {
+      statsNeg.updateStats(l);
+    }
+    assertEquals(statsNeg.getMax(), 993);
+    assertEquals(statsNeg.getMin(), -9914);
+
+    // Test converting to and from byte[]
+    byte[] longMaxBytes = statsNeg.getMaxBytes();
+    byte[] longMinBytes = statsNeg.getMinBytes();
+
+    assertEquals(ByteBuffer.wrap(longMaxBytes).order(java.nio.ByteOrder.LITTLE_ENDIAN).getLong(), 993);
+    assertEquals(ByteBuffer.wrap(longMinBytes).order(java.nio.ByteOrder.LITTLE_ENDIAN).getLong(), -9914);
+
+    LongStatistics statsFromBytes = new LongStatistics();
+    statsFromBytes.setMinMaxFromBytes(longMinBytes, longMaxBytes);
+
+    assertEquals(statsFromBytes.getMax(), 993);
+    assertEquals(statsFromBytes.getMin(), -9914);
+
+    longArray = new long[] {Long.MAX_VALUE, Long.MIN_VALUE};
+    LongStatistics minMaxValues = new LongStatistics();
+
+    for (long l: longArray) {
+      minMaxValues.updateStats(l);
+    }
+    assertEquals(minMaxValues.getMax(), Long.MAX_VALUE);
+    assertEquals(minMaxValues.getMin(), Long.MIN_VALUE);
+
+    // Test converting to and from byte[] for large and small values
+    byte[] longMaxBytesMinMax = minMaxValues.getMaxBytes();
+    byte[] longMinBytesMinMax = minMaxValues.getMinBytes();
+
+    assertEquals(ByteBuffer.wrap(longMaxBytesMinMax).order(java.nio.ByteOrder.LITTLE_ENDIAN).getLong(), Long.MAX_VALUE);
+    assertEquals(ByteBuffer.wrap(longMinBytesMinMax).order(java.nio.ByteOrder.LITTLE_ENDIAN).getLong(), Long.MIN_VALUE);
+
+    LongStatistics statsFromBytesMinMax= new LongStatistics();
+    statsFromBytesMinMax.setMinMaxFromBytes(longMinBytesMinMax, longMaxBytesMinMax);
+
+    assertEquals(statsFromBytesMinMax.getMax(), Long.MAX_VALUE);
+    assertEquals(statsFromBytesMinMax.getMin(), Long.MIN_VALUE);
+
+    // Test print formatting
+    assertEquals(stats.toString(), "min: 0, max: 1000, num_nulls: 0");
+  }
+
+  @Test
+  public void testFloatMinMax() {
+    // Test basic max/min
+    floatArray = new float[] {1.5f, 44.5f, 412.99f, 0.65f, 5.6f, 100.6f, 0.0001f, 23.0f, 553.6f};
+    FloatStatistics stats = new FloatStatistics();
+
+    for (float f: floatArray) {
+      stats.updateStats(f);
+    }
+    assertEquals(stats.getMax(), 553.6f, 1e-10);
+    assertEquals(stats.getMin(), 0.0001f, 1e-10);
+
+    // Test negative values
+    floatArray = new float[] {-1.5f, -44.5f, -412.99f, 0.65f, -5.6f, -100.6f, 0.0001f, -23.0f, -3.6f};
+    FloatStatistics statsNeg = new FloatStatistics();
+
+    for (float f: floatArray) {
+      statsNeg.updateStats(f);
+    }
+    assertEquals(statsNeg.getMax(), 0.65f, 1e-10);
+    assertEquals(statsNeg.getMin(), -412.99f, 1e-10);
+
+    // Test converting to and from byte[]
+    byte[] floatMaxBytes = statsNeg.getMaxBytes();
+    byte[] floatMinBytes = statsNeg.getMinBytes();
+
+    assertEquals(ByteBuffer.wrap(floatMaxBytes).order(java.nio.ByteOrder.LITTLE_ENDIAN).getFloat(), 0.65f, 1e-10);
+    assertEquals(ByteBuffer.wrap(floatMinBytes).order(java.nio.ByteOrder.LITTLE_ENDIAN).getFloat(), -412.99f, 1e-10);
+
+    FloatStatistics statsFromBytes = new FloatStatistics();
+    statsFromBytes.setMinMaxFromBytes(floatMinBytes, floatMaxBytes);
+
+    assertEquals(statsFromBytes.getMax(), 0.65f, 1e-10);
+    assertEquals(statsFromBytes.getMin(), -412.99f, 1e-10);
+
+    floatArray = new float[] {Float.MAX_VALUE, Float.MIN_VALUE};
+    FloatStatistics minMaxValues = new FloatStatistics();
+
+    for (float f: floatArray) {
+      minMaxValues.updateStats(f);
+    }
+    assertEquals(minMaxValues.getMax(), Float.MAX_VALUE, 1e-10);
+    assertEquals(minMaxValues.getMin(), Float.MIN_VALUE, 1e-10);
+
+    // Test converting to and from byte[] for large and small values
+    byte[] floatMaxBytesMinMax = minMaxValues.getMaxBytes();
+    byte[] floatMinBytesMinMax = minMaxValues.getMinBytes();
+
+    assertEquals(ByteBuffer.wrap(floatMaxBytesMinMax).order(java.nio.ByteOrder.LITTLE_ENDIAN).getFloat(), Float.MAX_VALUE, 1e-10);
+    assertEquals(ByteBuffer.wrap(floatMinBytesMinMax).order(java.nio.ByteOrder.LITTLE_ENDIAN).getFloat(), Float.MIN_VALUE, 1e-10);
+
+    FloatStatistics statsFromBytesMinMax= new FloatStatistics();
+    statsFromBytesMinMax.setMinMaxFromBytes(floatMinBytesMinMax, floatMaxBytesMinMax);
+
+    assertEquals(statsFromBytesMinMax.getMax(), Float.MAX_VALUE, 1e-10);
+    assertEquals(statsFromBytesMinMax.getMin(), Float.MIN_VALUE, 1e-10);
+
+    // Test print formatting
+    assertEquals(stats.toString(), "min: 0.00010, max: 553.59998, num_nulls: 0");
+  }
+
+  @Test
+  public void testDoubleMinMax() {
+    // Test basic max/min
+    doubleArray = new double[] {81.5d, 944.5f, 2.002d, 334.5d, 5.6d, 0.001d, 0.00001d, 23.0d, 553.6d};
+    DoubleStatistics stats = new DoubleStatistics();
+
+    for (double d: doubleArray) {
+      stats.updateStats(d);
+    }
+    assertEquals(stats.getMax(), 944.5d, 1e-10);
+    assertEquals(stats.getMin(), 0.00001d, 1e-10);
+
+    // Test negative values
+    doubleArray = new double[] {-81.5d, -944.5d, 2.002d, -334.5d, -5.6d, -0.001d, -0.00001d, 23.0d, -3.6d};
+    DoubleStatistics statsNeg = new DoubleStatistics();
+
+    for (double d: doubleArray) {
+      statsNeg.updateStats(d);
+    }
+    assertEquals(statsNeg.getMax(), 23.0d, 1e-10);
+    assertEquals(statsNeg.getMin(), -944.5d, 1e-10);
+
+    // Test converting to and from byte[]
+    byte[] doubleMaxBytes = statsNeg.getMaxBytes();
+    byte[] doubleMinBytes = statsNeg.getMinBytes();
+
+    assertEquals(ByteBuffer.wrap(doubleMaxBytes).order(java.nio.ByteOrder.LITTLE_ENDIAN).getDouble(), 23.0d, 1e-10);
+    assertEquals(ByteBuffer.wrap(doubleMinBytes).order(java.nio.ByteOrder.LITTLE_ENDIAN).getDouble(), -944.5d, 1e-10);
+
+    DoubleStatistics statsFromBytes = new DoubleStatistics();
+    statsFromBytes.setMinMaxFromBytes(doubleMinBytes, doubleMaxBytes);
+
+    assertEquals(statsFromBytes.getMax(), 23.0d, 1e-10);
+    assertEquals(statsFromBytes.getMin(), -944.5d, 1e-10);
+
+    doubleArray = new double[] {Double.MAX_VALUE, Double.MIN_VALUE};
+    DoubleStatistics minMaxValues = new DoubleStatistics();
+
+    for (double d: doubleArray) {
+      minMaxValues.updateStats(d);
+    }
+    assertEquals(minMaxValues.getMax(), Double.MAX_VALUE, 1e-10);
+    assertEquals(minMaxValues.getMin(), Double.MIN_VALUE, 1e-10);
+
+    // Test converting to and from byte[] for large and small values
+    byte[] doubleMaxBytesMinMax = minMaxValues.getMaxBytes();
+    byte[] doubleMinBytesMinMax = minMaxValues.getMinBytes();
+
+    assertEquals(ByteBuffer.wrap(doubleMaxBytesMinMax).order(java.nio.ByteOrder.LITTLE_ENDIAN).getDouble(), Double.MAX_VALUE, 1e-10);
+    assertEquals(ByteBuffer.wrap(doubleMinBytesMinMax).order(java.nio.ByteOrder.LITTLE_ENDIAN).getDouble(), Double.MIN_VALUE, 1e-10);
+
+    DoubleStatistics statsFromBytesMinMax= new DoubleStatistics();
+    statsFromBytesMinMax.setMinMaxFromBytes(doubleMinBytesMinMax, doubleMaxBytesMinMax);
+
+    assertEquals(statsFromBytesMinMax.getMax(), Double.MAX_VALUE, 1e-10);
+    assertEquals(statsFromBytesMinMax.getMin(), Double.MIN_VALUE, 1e-10);
+
+    // Test print formatting
+    assertEquals(stats.toString(), "min: 0.00001, max: 944.50000, num_nulls: 0");
+  }
+
+  @Test
+  public void testBooleanMinMax() {
+    // Test all true
+    booleanArray = new boolean[] {true, true, true};
+    BooleanStatistics statsTrue = new BooleanStatistics();
+
+    for (boolean i: booleanArray) {
+      statsTrue.updateStats(i);
+    }
+    assertTrue(statsTrue.getMax());
+    assertTrue(statsTrue.getMin());
+
+    // Test all false
+    booleanArray = new boolean[] {false, false, false};
+    BooleanStatistics statsFalse = new BooleanStatistics();
+
+    for (boolean i: booleanArray) {
+      statsFalse.updateStats(i);
+    }
+    assertFalse(statsFalse.getMax());
+    assertFalse(statsFalse.getMin());
+
+    booleanArray = new boolean[] {false, true, false};
+    BooleanStatistics statsBoth = new BooleanStatistics();
+
+    for (boolean i: booleanArray) {
+      statsBoth.updateStats(i);
+    }
+    assertTrue(statsBoth.getMax());
+    assertFalse(statsBoth.getMin());
+
+    // Test converting to and from byte[]
+    byte[] boolMaxBytes = statsBoth.getMaxBytes();
+    byte[] boolMinBytes = statsBoth.getMinBytes();
+
+    assertEquals((int)(boolMaxBytes[0] & 255), 1);
+    assertEquals((int)(boolMinBytes[0] & 255), 0);
+
+    BooleanStatistics statsFromBytes = new BooleanStatistics();
+    statsFromBytes.setMinMaxFromBytes(boolMinBytes, boolMaxBytes);
+
+    assertTrue(statsFromBytes.getMax());
+    assertFalse(statsFromBytes.getMin());
+
+    // Test print formatting
+    assertEquals(statsBoth.toString(), "min: false, max: true, num_nulls: 0");
+  }
+
+  @Test
+  public void testBinaryMinMax() {
+    //Test basic max/min
+    stringArray = new String[] {"hello", "world", "this", "is", "a", "test", "of", "the", "stats", "class"};
+    BinaryStatistics stats = new BinaryStatistics();
+
+    for (String s: stringArray) {
+      stats.updateStats(Binary.fromString(s));
+    }
+    assertEquals(stats.getMax(), Binary.fromString("world"));
+    assertEquals(stats.getMin(), Binary.fromString("a"));
+
+    // Test empty string
+    stringArray = new String[] {"", "", "", "", ""};
+    BinaryStatistics statsEmpty = new BinaryStatistics();
+
+    for (String s: stringArray) {
+      statsEmpty.updateStats(Binary.fromString(s));
+    }
+    assertEquals(statsEmpty.getMax(), Binary.fromString(""));
+    assertEquals(statsEmpty.getMin(), Binary.fromString(""));
+
+    // Test converting to and from byte[]
+    byte[] stringMaxBytes = stats.getMaxBytes();
+    byte[] stringMinBytes = stats.getMinBytes();
+
+    assertEquals(new String(stringMaxBytes), "world");
+    assertEquals(new String(stringMinBytes), "a");
+
+    BinaryStatistics statsFromBytes = new BinaryStatistics();
+    statsFromBytes.setMinMaxFromBytes(stringMinBytes, stringMaxBytes);
+
+    assertEquals(statsFromBytes.getMax(), Binary.fromString("world"));
+    assertEquals(statsFromBytes.getMin(), Binary.fromString("a"));
+
+    // Test print formatting
+    assertEquals(stats.toString(), "min: a, max: world, num_nulls: 0");
+  }
+
+  @Test
+  public void testMergingStatistics() {
+    testMergingIntStats();
+    testMergingLongStats();
+    testMergingFloatStats();
+    testMergingDoubleStats();
+    testMergingBooleanStats();
+    testMergingStringStats();
+  }
+
+  private void testMergingIntStats() {
+    integerArray = new int[] {1, 2, 3, 4, 5};
+    IntStatistics intStats = new IntStatistics();
+
+    for (int s: integerArray) {
+      intStats.updateStats(s);
+    }
+
+    integerArray = new int[] {0, 3, 3};
+    IntStatistics intStats2 = new IntStatistics();
+
+    for (int s: integerArray) {
+      intStats2.updateStats(s);
+    }
+    intStats.mergeStatistics(intStats2);
+    assertEquals(intStats.getMax(), 5);
+    assertEquals(intStats.getMin(), 0);
+
+    integerArray = new int[] {-1, -100, 100};
+    IntStatistics intStats3 = new IntStatistics();
+    for (int s: integerArray) {
+      intStats3.updateStats(s);
+    }
+    intStats.mergeStatistics(intStats3);
+
+    assertEquals(intStats.getMax(), 100);
+    assertEquals(intStats.getMin(), -100);
+  }
+
+  private void testMergingLongStats() {
+    longArray = new long[] {1l, 2l, 3l, 4l, 5l};
+    LongStatistics longStats = new LongStatistics();
+
+    for (long s: longArray) {
+      longStats.updateStats(s);
+    }
+
+    longArray = new long[] {0l, 3l, 3l};
+    LongStatistics longStats2 = new LongStatistics();
+
+    for (long s: longArray) {
+      longStats2.updateStats(s);
+    }
+    longStats.mergeStatistics(longStats2);
+    assertEquals(longStats.getMax(), 5l);
+    assertEquals(longStats.getMin(), 0l);
+
+    longArray = new long[] {-1l, -100l, 100l};
+    LongStatistics longStats3 = new LongStatistics();
+    for (long s: longArray) {
+      longStats3.updateStats(s);
+    }
+    longStats.mergeStatistics(longStats3);
+
+    assertEquals(longStats.getMax(), 100l);
+    assertEquals(longStats.getMin(), -100l);
+  }
+
+  private void testMergingFloatStats() {
+    floatArray = new float[] {1.44f, 12.2f, 98.3f, 1.4f, 0.05f};
+    FloatStatistics floatStats = new FloatStatistics();
+
+    for (float s: floatArray) {
+      floatStats.updateStats(s);
+    }
+
+    floatArray = new float[] {0.0001f, 9.9f, 3.1f};
+    FloatStatistics floatStats2 = new FloatStatistics();
+
+    for (float s: floatArray) {
+      floatStats2.updateStats(s);
+    }
+    floatStats.mergeStatistics(floatStats2);
+    assertEquals(floatStats.getMax(), 98.3f, 1e-10);
+    assertEquals(floatStats.getMin(), 0.0001f, 1e-10);
+
+    floatArray = new float[] {-1.91f, -100.9f, 100.54f};
+    FloatStatistics floatStats3 = new FloatStatistics();
+    for (float s: floatArray) {
+      floatStats3.updateStats(s);
+    }
+    floatStats.mergeStatistics(floatStats3);
+
+    assertEquals(floatStats.getMax(), 100.54f, 1e-10);
+    assertEquals(floatStats.getMin(), -100.9f, 1e-10);
+  }
+
+  private void testMergingDoubleStats() {
+    doubleArray = new double[] {1.44d, 12.2d, 98.3d, 1.4d, 0.05d};
+    DoubleStatistics doubleStats = new DoubleStatistics();
+
+    for (double s: doubleArray) {
+      doubleStats.updateStats(s);
+    }
+
+    doubleArray = new double[] {0.0001d, 9.9d, 3.1d};
+    DoubleStatistics doubleStats2 = new DoubleStatistics();
+
+    for (double s: doubleArray) {
+      doubleStats2.updateStats(s);
+    }
+    doubleStats.mergeStatistics(doubleStats2);
+    assertEquals(doubleStats.getMax(), 98.3d, 1e-10);
+    assertEquals(doubleStats.getMin(), 0.0001d, 1e-10);
+
+    doubleArray = new double[] {-1.91d, -100.9d, 100.54d};
+    DoubleStatistics doubleStats3 = new DoubleStatistics();
+    for (double s: doubleArray) {
+      doubleStats3.updateStats(s);
+    }
+    doubleStats.mergeStatistics(doubleStats3);
+
+    assertEquals(doubleStats.getMax(), 100.54d, 1e-10);
+    assertEquals(doubleStats.getMin(), -100.9d, 1e-10);
+  }
+
+  private void testMergingBooleanStats() {
+    booleanArray = new boolean[] {true, true, true};
+    BooleanStatistics booleanStats = new BooleanStatistics();
+
+    for (boolean s: booleanArray) {
+      booleanStats.updateStats(s);
+    }
+
+    booleanArray = new boolean[] {true, false};
+    BooleanStatistics booleanStats2 = new BooleanStatistics();
+
+    for (boolean s: booleanArray) {
+      booleanStats2.updateStats(s);
+    }
+    booleanStats.mergeStatistics(booleanStats2);
+    assertEquals(booleanStats.getMax(), true);
+    assertEquals(booleanStats.getMin(), false);
+
+    booleanArray = new boolean[] {false, false, false, false};
+    BooleanStatistics booleanStats3 = new BooleanStatistics();
+    for (boolean s: booleanArray) {
+      booleanStats3.updateStats(s);
+    }
+    booleanStats.mergeStatistics(booleanStats3);
+
+    assertEquals(booleanStats.getMax(), true);
+    assertEquals(booleanStats.getMin(), false);
+  }
+
+  private void testMergingStringStats() {
+    stringArray = new String[] {"hello", "world", "this", "is", "a", "test", "of", "the", "stats", "class"};
+    BinaryStatistics stats = new BinaryStatistics();
+
+    for (String s: stringArray) {
+      stats.updateStats(Binary.fromString(s));
+    }
+
+    stringArray = new String[] {"zzzz", "asdf", "testing"};
+    BinaryStatistics stats2 = new BinaryStatistics();
+
+    for (String s: stringArray) {
+      stats2.updateStats(Binary.fromString(s));
+    }
+    stats.mergeStatistics(stats2);
+    assertEquals(stats.getMax(), Binary.fromString("zzzz"));
+    assertEquals(stats.getMin(), Binary.fromString("a"));
+
+    stringArray = new String[] {"", "good", "testing"};
+    BinaryStatistics stats3 = new BinaryStatistics();
+    for (String s: stringArray) {
+      stats3.updateStats(Binary.fromString(s));
+    }
+    stats.mergeStatistics(stats3);
+
+    assertEquals(stats.getMax(), Binary.fromString("zzzz"));
+    assertEquals(stats.getMin(), Binary.fromString(""));
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/test/java/org/apache/parquet/column/values/RandomStr.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/RandomStr.java b/parquet-column/src/test/java/org/apache/parquet/column/values/RandomStr.java
new file mode 100644
index 0000000..8b41c39
--- /dev/null
+++ b/parquet-column/src/test/java/org/apache/parquet/column/values/RandomStr.java
@@ -0,0 +1,56 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column.values;
+
+import java.util.Random;
+
+/**
+ * 
+ * @author Aniket Mokashi
+ *
+ */
+public class RandomStr {
+  private final char[] alphanumeric=alphanumeric();
+  private final Random rand;
+
+  public RandomStr(){this(null);}
+
+  public RandomStr(Random rand){
+    this.rand=(rand!=null) ? rand : new Random();
+  }
+
+  public String get(int len){
+    StringBuffer out=new StringBuffer();
+
+    while(out.length() < len){
+      int idx=Math.abs(( rand.nextInt() % alphanumeric.length ));
+      out.append(alphanumeric[idx]);
+    }
+    return out.toString();
+  }
+
+  // create alphanumeric char array
+  private char[] alphanumeric(){
+    StringBuffer buf=new StringBuffer(128);
+    for(int i=48; i<= 57;i++)buf.append((char)i); // 0-9
+    for(int i=65; i<= 90;i++)buf.append((char)i); // A-Z
+    for(int i=97; i<=122;i++)buf.append((char)i); // a-z
+    return buf.toString().toCharArray();
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/test/java/org/apache/parquet/column/values/Utils.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/Utils.java b/parquet-column/src/test/java/org/apache/parquet/column/values/Utils.java
new file mode 100644
index 0000000..c9a62b4
--- /dev/null
+++ b/parquet-column/src/test/java/org/apache/parquet/column/values/Utils.java
@@ -0,0 +1,90 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column.values;
+
+import java.io.IOException;
+import java.util.Random;
+
+import org.apache.parquet.io.api.Binary;
+
+/**
+ * Test Utility class
+ * 
+ * @author Aniket Mokashi
+ *
+ */
+public class Utils {
+  private static Random randomLen = new Random();
+  private static RandomStr randomStr = new RandomStr(randomLen);
+  
+  public static String[] getRandomStringSamples(int numSamples, int maxLength) {
+    String[] samples = new String[numSamples];
+    
+    for (int i=0; i < numSamples; i++) {
+      int len = randomLen.nextInt(maxLength);
+      samples[i] = randomStr.get(len);
+    }
+    
+    return samples;
+  }
+  
+  public static void writeInts(ValuesWriter writer, int[] ints)
+      throws IOException {
+    for(int i=0; i < ints.length; i++) {
+      writer.writeInteger(ints[i]);
+    }
+  }
+
+  public static void writeData(ValuesWriter writer, String[] strings)
+      throws IOException {
+    for(int i=0; i < strings.length; i++) {
+      writer.writeBytes(Binary.fromString(strings[i]));
+    }
+  }
+
+  public static Binary[] readData(ValuesReader reader, byte[] data, int offset, int length)
+      throws IOException {
+    Binary[] bins = new Binary[length];
+    reader.initFromPage(length, data, 0);
+    for(int i=0; i < length; i++) {
+      bins[i] = reader.readBytes();
+    }
+    return bins;
+  }
+  
+  public static Binary[] readData(ValuesReader reader, byte[] data, int length)
+      throws IOException {
+    return readData(reader, data, 0, length);
+  }
+  
+  public static int[] readInts(ValuesReader reader, byte[] data, int offset, int length)
+      throws IOException {
+    int[] ints = new int[length];
+    reader.initFromPage(length, data, offset);
+    for(int i=0; i < length; i++) {
+      ints[i] = reader.readInteger();
+    }
+    return ints;
+  }
+  
+  public static int[] readInts(ValuesReader reader, byte[] data, int length)
+      throws IOException {
+    return readInts(reader, data, 0, length);
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/test/java/org/apache/parquet/column/values/bitpacking/BitPackingPerfTest.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/bitpacking/BitPackingPerfTest.java b/parquet-column/src/test/java/org/apache/parquet/column/values/bitpacking/BitPackingPerfTest.java
new file mode 100644
index 0000000..e74e787
--- /dev/null
+++ b/parquet-column/src/test/java/org/apache/parquet/column/values/bitpacking/BitPackingPerfTest.java
@@ -0,0 +1,103 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column.values.bitpacking;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+
+import org.apache.parquet.column.values.ValuesReader;
+import org.apache.parquet.column.values.bitpacking.BitPacking.BitPackingWriter;
+
+/**
+ * Improvable micro benchmark for bitpacking
+ * run with: -verbose:gc -Xmx2g -Xms2g
+ * @author Julien Le Dem
+ *
+ */
+public class BitPackingPerfTest {
+
+  public static void main(String[] args) throws IOException {
+    int COUNT = 800000;
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    BitPackingWriter w = BitPacking.getBitPackingWriter(1, baos);
+    long t0 = System.currentTimeMillis();
+    for (int i = 0 ; i < COUNT; ++i) {
+      w.write(i % 2);
+    }
+    w.finish();
+    long t1 = System.currentTimeMillis();
+    System.out.println("written in " + (t1 - t0) + "ms");
+    System.out.println();
+    byte[] bytes = baos.toByteArray();
+    System.out.println(bytes.length);
+    int[] result = new int[COUNT];
+    for (int l = 0; l < 5; l++) {
+      long s = manual(bytes, result);
+      long b = generated(bytes, result);
+      float ratio = (float)b/s;
+      System.out.println("                                             " + ratio + (ratio < 1 ? " < 1 => GOOD" : " >= 1 => BAD"));
+    }
+  }
+
+  private static void verify(int[] result) {
+    int error = 0;
+    for (int i = 0 ; i < result.length; ++i) {
+      if (result[i] != i % 2) {
+        error ++;
+      }
+    }
+    if (error != 0) {
+      throw new RuntimeException("errors: " + error + " / " + result.length);
+    }
+  }
+
+  private static long manual(byte[] bytes, int[] result)
+      throws IOException {
+    return readNTimes(bytes, result, new BitPackingValuesReader(1));
+  }
+
+  private static long generated(byte[] bytes, int[] result)
+      throws IOException {
+    return readNTimes(bytes, result, new ByteBitPackingValuesReader(1, Packer.BIG_ENDIAN));
+  }
+
+  private static long readNTimes(byte[] bytes, int[] result, ValuesReader r)
+      throws IOException {
+    System.out.println();
+    long t = 0;
+    int N = 10;
+    System.gc();
+    System.out.print("                                             " + r.getClass().getSimpleName());
+    System.out.print(" no gc <");
+    for (int k = 0; k < N; k++) {
+      long t2 = System.nanoTime();
+      r.initFromPage(result.length, bytes, 0);
+      for (int i = 0; i < result.length; i++) {
+        result[i] = r.readInteger();
+      }
+      long t3 = System.nanoTime();
+      t += t3 - t2;
+    }
+    System.out.println("> read in " + t/1000 + "µs " + (N * result.length / (t / 1000)) + " values per µs");
+    verify(result);
+    return t;
+  }
+
+}
+