You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by bl...@apache.org on 2015/04/28 01:12:35 UTC

[38/51] [partial] parquet-mr git commit: PARQUET-23: Rename to org.apache.parquet.

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/ValidTypeMap.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/ValidTypeMap.java b/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/ValidTypeMap.java
new file mode 100644
index 0000000..4f8b10d
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/ValidTypeMap.java
@@ -0,0 +1,178 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.filter2.predicate;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.parquet.hadoop.metadata.ColumnPath;
+import org.apache.parquet.filter2.predicate.Operators.Column;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.OriginalType;
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
+
+/**
+ * Contains all valid mappings from class -> parquet type (and vice versa) for use in
+ * {@link FilterPredicate}s
+ *
+ * This is a bit ugly, but it allows us to provide good error messages at runtime
+ * when there are type mismatches.
+ *
+ * TODO: this has some overlap with {@link PrimitiveTypeName#javaType}
+ * TODO: (https://issues.apache.org/jira/browse/PARQUET-30)
+ */
+public class ValidTypeMap {
+  private ValidTypeMap() { }
+
+  // classToParquetType and parquetTypeToClass are used as a bi-directional map
+  private static final Map<Class<?>, Set<FullTypeDescriptor>> classToParquetType = new HashMap<Class<?>, Set<FullTypeDescriptor>>();
+  private static final Map<FullTypeDescriptor, Set<Class<?>>> parquetTypeToClass = new HashMap<FullTypeDescriptor, Set<Class<?>>>();
+
+  // set up the mapping in both directions
+  private static void add(Class<?> c, FullTypeDescriptor f) {
+    Set<FullTypeDescriptor> descriptors = classToParquetType.get(c);
+    if (descriptors == null) {
+      descriptors = new HashSet<FullTypeDescriptor>();
+      classToParquetType.put(c, descriptors);
+    }
+    descriptors.add(f);
+
+    Set<Class<?>> classes = parquetTypeToClass.get(f);
+    if (classes == null) {
+      classes = new HashSet<Class<?>>();
+      parquetTypeToClass.put(f, classes);
+    }
+    classes.add(c);
+  }
+
+  static {
+    // basic primitive columns
+    add(Integer.class, new FullTypeDescriptor(PrimitiveTypeName.INT32, null));
+    add(Long.class, new FullTypeDescriptor(PrimitiveTypeName.INT64, null));
+    add(Float.class, new FullTypeDescriptor(PrimitiveTypeName.FLOAT, null));
+    add(Double.class, new FullTypeDescriptor(PrimitiveTypeName.DOUBLE, null));
+    add(Boolean.class, new FullTypeDescriptor(PrimitiveTypeName.BOOLEAN, null));
+
+    // Both of these binary types are valid
+    add(Binary.class, new FullTypeDescriptor(PrimitiveTypeName.BINARY, null));
+    add(Binary.class, new FullTypeDescriptor(PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY, null));
+
+    add(Binary.class, new FullTypeDescriptor(PrimitiveTypeName.BINARY, OriginalType.UTF8));
+    add(Binary.class, new FullTypeDescriptor(PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY, OriginalType.UTF8));
+  }
+
+  /**
+   * Asserts that foundColumn was declared as a type that is compatible with the type for this column found
+   * in the schema of the parquet file.
+   *
+   * @throws java.lang.IllegalArgumentException if the types do not align
+   *
+   * @param foundColumn the column as declared by the user
+   * @param primitiveType the primitive type according to the schema
+   * @param originalType the original type according to the schema
+   */
+  public static <T extends Comparable<T>> void assertTypeValid(Column<T> foundColumn, PrimitiveTypeName primitiveType, OriginalType originalType) {
+    Class<T> foundColumnType = foundColumn.getColumnType();
+    ColumnPath columnPath = foundColumn.getColumnPath();
+
+    Set<FullTypeDescriptor> validTypeDescriptors = classToParquetType.get(foundColumnType);
+    FullTypeDescriptor typeInFileMetaData = new FullTypeDescriptor(primitiveType, originalType);
+
+    if (validTypeDescriptors == null) {
+      StringBuilder message = new StringBuilder();
+      message
+          .append("Column ")
+          .append(columnPath.toDotString())
+          .append(" was declared as type: ")
+          .append(foundColumnType.getName())
+          .append(" which is not supported in FilterPredicates.");
+
+      Set<Class<?>> supportedTypes = parquetTypeToClass.get(typeInFileMetaData);
+      if (supportedTypes != null) {
+        message
+          .append(" Supported types for this column are: ")
+          .append(supportedTypes);
+      } else {
+        message.append(" There are no supported types for columns of " + typeInFileMetaData);
+      }
+      throw new IllegalArgumentException(message.toString());
+    }
+
+    if (!validTypeDescriptors.contains(typeInFileMetaData)) {
+      StringBuilder message = new StringBuilder();
+      message
+          .append("FilterPredicate column: ")
+          .append(columnPath.toDotString())
+          .append("'s declared type (")
+          .append(foundColumnType.getName())
+          .append(") does not match the schema found in file metadata. Column ")
+          .append(columnPath.toDotString())
+          .append(" is of type: ")
+          .append(typeInFileMetaData)
+          .append("\nValid types for this column are: ")
+          .append(parquetTypeToClass.get(typeInFileMetaData));
+      throw new IllegalArgumentException(message.toString());
+    }
+  }
+
+  private static final class FullTypeDescriptor {
+    private final PrimitiveTypeName primitiveType;
+    private final OriginalType originalType;
+
+    private FullTypeDescriptor(PrimitiveTypeName primitiveType, OriginalType originalType) {
+      this.primitiveType = primitiveType;
+      this.originalType = originalType;
+    }
+
+    public PrimitiveTypeName getPrimitiveType() {
+      return primitiveType;
+    }
+
+    public OriginalType getOriginalType() {
+      return originalType;
+    }
+
+    @Override
+    public String toString() {
+      return "FullTypeDescriptor(" + "PrimitiveType: " + primitiveType + ", OriginalType: " + originalType + ')';
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) return true;
+      if (o == null || getClass() != o.getClass()) return false;
+
+      FullTypeDescriptor that = (FullTypeDescriptor) o;
+
+      if (originalType != that.originalType) return false;
+      if (primitiveType != that.primitiveType) return false;
+
+      return true;
+    }
+
+    @Override
+    public int hashCode() {
+      int result = primitiveType != null ? primitiveType.hashCode() : 0;
+      result = 31 * result + (originalType != null ? originalType.hashCode() : 0);
+      return result;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/filter2/recordlevel/FilteringGroupConverter.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/filter2/recordlevel/FilteringGroupConverter.java b/parquet-column/src/main/java/org/apache/parquet/filter2/recordlevel/FilteringGroupConverter.java
new file mode 100644
index 0000000..a76b5ee
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/filter2/recordlevel/FilteringGroupConverter.java
@@ -0,0 +1,115 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.filter2.recordlevel;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.parquet.hadoop.metadata.ColumnPath;
+import org.apache.parquet.filter2.recordlevel.IncrementallyUpdatedFilterPredicate.ValueInspector;
+import org.apache.parquet.io.PrimitiveColumnIO;
+import org.apache.parquet.io.api.Converter;
+import org.apache.parquet.io.api.GroupConverter;
+
+import static org.apache.parquet.Preconditions.checkArgument;
+import static org.apache.parquet.Preconditions.checkNotNull;
+
+/**
+ * See {@link FilteringRecordMaterializer}
+ */
+public class FilteringGroupConverter extends GroupConverter {
+  // the real converter
+  private final GroupConverter delegate;
+
+  // the path, from the root of the schema, to this converter
+  // used ultimately by the primitive converter proxy to figure
+  // out which column it represents.
+  private final List<Integer> indexFieldPath;
+
+  // for a given column, which nodes in the filter expression need
+  // to be notified of this column's value
+  private final Map<ColumnPath, List<ValueInspector>> valueInspectorsByColumn;
+
+  // used to go from our indexFieldPath to the PrimitiveColumnIO for that column
+  private final Map<List<Integer>, PrimitiveColumnIO> columnIOsByIndexFieldPath;
+
+  public FilteringGroupConverter(
+      GroupConverter delegate,
+      List<Integer> indexFieldPath,
+      Map<ColumnPath, List<ValueInspector>> valueInspectorsByColumn, Map<List<Integer>,
+      PrimitiveColumnIO> columnIOsByIndexFieldPath) {
+
+    this.delegate = checkNotNull(delegate, "delegate");
+    this.indexFieldPath = checkNotNull(indexFieldPath, "indexFieldPath");
+    this.columnIOsByIndexFieldPath = checkNotNull(columnIOsByIndexFieldPath, "columnIOsByIndexFieldPath");
+    this.valueInspectorsByColumn = checkNotNull(valueInspectorsByColumn, "valueInspectorsByColumn");
+  }
+
+  // When a converter is asked for, we get the real one from the delegate, then wrap it
+  // in a filtering pass-through proxy.
+  // TODO: making the assumption that getConverter(i) is only called once, is that valid?
+  @Override
+  public Converter getConverter(int fieldIndex) {
+
+    // get the real converter from the delegate
+    Converter delegateConverter = checkNotNull(delegate.getConverter(fieldIndex), "delegate converter");
+
+    // determine the indexFieldPath for the converter proxy we're about to make, which is
+    // this converter's path + the requested fieldIndex
+    List<Integer> newIndexFieldPath = new ArrayList<Integer>(indexFieldPath.size() + 1);
+    newIndexFieldPath.addAll(indexFieldPath);
+    newIndexFieldPath.add(fieldIndex);
+
+    if (delegateConverter.isPrimitive()) {
+      PrimitiveColumnIO columnIO = getColumnIO(newIndexFieldPath);
+      ColumnPath columnPath = ColumnPath.get(columnIO.getColumnDescriptor().getPath());
+      ValueInspector[] valueInspectors = getValueInspectors(columnPath);
+      return new FilteringPrimitiveConverter(delegateConverter.asPrimitiveConverter(), valueInspectors);
+    } else {
+      return new FilteringGroupConverter(delegateConverter.asGroupConverter(), newIndexFieldPath, valueInspectorsByColumn, columnIOsByIndexFieldPath);
+    }
+
+  }
+
+  private PrimitiveColumnIO getColumnIO(List<Integer> indexFieldPath) {
+    PrimitiveColumnIO found = columnIOsByIndexFieldPath.get(indexFieldPath);
+    checkArgument(found != null, "Did not find PrimitiveColumnIO for index field path" + indexFieldPath);
+    return found;
+  }
+
+  private ValueInspector[] getValueInspectors(ColumnPath columnPath) {
+    List<ValueInspector> inspectorsList = valueInspectorsByColumn.get(columnPath);
+    if (inspectorsList == null) {
+      return new ValueInspector[] {};
+    } else {
+      return inspectorsList.toArray(new ValueInspector[inspectorsList.size()]);
+    }
+  }
+
+  @Override
+  public void start() {
+    delegate.start();
+  }
+
+  @Override
+  public void end() {
+    delegate.end();
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/filter2/recordlevel/FilteringPrimitiveConverter.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/filter2/recordlevel/FilteringPrimitiveConverter.java b/parquet-column/src/main/java/org/apache/parquet/filter2/recordlevel/FilteringPrimitiveConverter.java
new file mode 100644
index 0000000..18edb64
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/filter2/recordlevel/FilteringPrimitiveConverter.java
@@ -0,0 +1,109 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.filter2.recordlevel;
+
+import org.apache.parquet.column.Dictionary;
+import org.apache.parquet.filter2.recordlevel.IncrementallyUpdatedFilterPredicate.ValueInspector;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.io.api.PrimitiveConverter;
+
+import static org.apache.parquet.Preconditions.checkNotNull;
+
+/**
+ * see {@link FilteringRecordMaterializer}
+ *
+ * This pass-through proxy for a delegate {@link PrimitiveConverter} also
+ * updates the {@link ValueInspector}s of a {@link IncrementallyUpdatedFilterPredicate}
+ */
+public class FilteringPrimitiveConverter extends PrimitiveConverter {
+  private final PrimitiveConverter delegate;
+  private final ValueInspector[] valueInspectors;
+
+  public FilteringPrimitiveConverter(PrimitiveConverter delegate, ValueInspector[] valueInspectors) {
+    this.delegate = checkNotNull(delegate, "delegate");
+    this.valueInspectors = checkNotNull(valueInspectors, "valueInspectors");
+  }
+
+  // TODO: this works, but
+  // TODO: essentially turns off the benefits of dictionary support
+  // TODO: even if the underlying delegate supports it.
+  // TODO: we should support it here. (https://issues.apache.org/jira/browse/PARQUET-36)
+  @Override
+  public boolean hasDictionarySupport() {
+    return false;
+  }
+
+  @Override
+  public void setDictionary(Dictionary dictionary) {
+    throw new UnsupportedOperationException("FilteringPrimitiveConverter doesn't have dictionary support");
+  }
+
+  @Override
+  public void addValueFromDictionary(int dictionaryId) {
+    throw new UnsupportedOperationException("FilteringPrimitiveConverter doesn't have dictionary support");
+  }
+
+  @Override
+  public void addBinary(Binary value) {
+    for (ValueInspector valueInspector : valueInspectors) {
+      valueInspector.update(value);
+    }
+    delegate.addBinary(value);
+  }
+
+  @Override
+  public void addBoolean(boolean value) {
+    for (ValueInspector valueInspector : valueInspectors) {
+      valueInspector.update(value);
+    }
+    delegate.addBoolean(value);
+  }
+
+  @Override
+  public void addDouble(double value) {
+    for (ValueInspector valueInspector : valueInspectors) {
+      valueInspector.update(value);
+    }
+    delegate.addDouble(value);
+  }
+
+  @Override
+  public void addFloat(float value) {
+    for (ValueInspector valueInspector : valueInspectors) {
+      valueInspector.update(value);
+    }
+    delegate.addFloat(value);
+  }
+
+  @Override
+  public void addInt(int value) {
+    for (ValueInspector valueInspector : valueInspectors) {
+      valueInspector.update(value);
+    }
+    delegate.addInt(value);
+  }
+
+  @Override
+  public void addLong(long value) {
+    for (ValueInspector valueInspector : valueInspectors) {
+      valueInspector.update(value);
+    }
+    delegate.addLong(value);
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/filter2/recordlevel/FilteringRecordMaterializer.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/filter2/recordlevel/FilteringRecordMaterializer.java b/parquet-column/src/main/java/org/apache/parquet/filter2/recordlevel/FilteringRecordMaterializer.java
new file mode 100644
index 0000000..d8fa677
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/filter2/recordlevel/FilteringRecordMaterializer.java
@@ -0,0 +1,115 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.filter2.recordlevel;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.parquet.hadoop.metadata.ColumnPath;
+import org.apache.parquet.filter2.recordlevel.IncrementallyUpdatedFilterPredicate.ValueInspector;
+import org.apache.parquet.io.PrimitiveColumnIO;
+import org.apache.parquet.io.api.GroupConverter;
+import org.apache.parquet.io.api.RecordMaterializer;
+
+import static org.apache.parquet.Preconditions.checkNotNull;
+
+/**
+ * A pass-through proxy for a {@link RecordMaterializer} that updates a {@link IncrementallyUpdatedFilterPredicate}
+ * as it receives concrete values for the current record. If, after the record assembly signals that
+ * there are no more values, the predicate indicates that this record should be dropped, {@link #getCurrentRecord()}
+ * returns null to signal that this record is being skipped.
+ * Otherwise, the record is retrieved from the delegate.
+ */
+public class FilteringRecordMaterializer<T> extends RecordMaterializer<T> {
+  // the real record materializer
+  private final RecordMaterializer<T> delegate;
+
+  // the proxied root converter
+  private final FilteringGroupConverter rootConverter;
+
+  // the predicate
+  private final IncrementallyUpdatedFilterPredicate filterPredicate;
+
+  public FilteringRecordMaterializer(
+      RecordMaterializer<T> delegate,
+      List<PrimitiveColumnIO> columnIOs,
+      Map<ColumnPath, List<ValueInspector>> valueInspectorsByColumn,
+      IncrementallyUpdatedFilterPredicate filterPredicate) {
+
+    checkNotNull(columnIOs, "columnIOs");
+    checkNotNull(valueInspectorsByColumn, "valueInspectorsByColumn");
+    this.filterPredicate = checkNotNull(filterPredicate, "filterPredicate");
+    this.delegate = checkNotNull(delegate, "delegate");
+
+    // keep track of which path of indices leads to which primitive column
+    Map<List<Integer>, PrimitiveColumnIO> columnIOsByIndexFieldPath = new HashMap<List<Integer>, PrimitiveColumnIO>();
+
+    for (PrimitiveColumnIO c : columnIOs) {
+      columnIOsByIndexFieldPath.put(getIndexFieldPathList(c), c);
+    }
+
+    // create a proxy for the delegate's root converter
+    this.rootConverter = new FilteringGroupConverter(
+        delegate.getRootConverter(), Collections.<Integer>emptyList(), valueInspectorsByColumn, columnIOsByIndexFieldPath);
+  }
+
+  public static List<Integer> getIndexFieldPathList(PrimitiveColumnIO c) {
+    return intArrayToList(c.getIndexFieldPath());
+  }
+
+  public static List<Integer> intArrayToList(int[] arr) {
+    List<Integer> list = new ArrayList<Integer>(arr.length);
+    for (int i : arr) {
+      list.add(i);
+    }
+    return list;
+  }
+
+
+
+  @Override
+  public T getCurrentRecord() {
+
+    // find out if the predicate thinks we should keep this record
+    boolean keep = IncrementallyUpdatedFilterPredicateEvaluator.evaluate(filterPredicate);
+
+    // reset the stateful predicate no matter what
+    IncrementallyUpdatedFilterPredicateResetter.reset(filterPredicate);
+
+    if (keep) {
+      return delegate.getCurrentRecord();
+    } else {
+      // signals a skip
+      return null;
+    }
+  }
+
+  @Override
+  public void skipCurrentRecord() {
+    delegate.skipCurrentRecord();
+  }
+
+  @Override
+  public GroupConverter getRootConverter() {
+    return rootConverter;
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/filter2/recordlevel/IncrementallyUpdatedFilterPredicate.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/filter2/recordlevel/IncrementallyUpdatedFilterPredicate.java b/parquet-column/src/main/java/org/apache/parquet/filter2/recordlevel/IncrementallyUpdatedFilterPredicate.java
new file mode 100644
index 0000000..606c78f
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/filter2/recordlevel/IncrementallyUpdatedFilterPredicate.java
@@ -0,0 +1,157 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.filter2.recordlevel;
+
+import org.apache.parquet.io.api.Binary;
+
+import static org.apache.parquet.Preconditions.checkNotNull;
+
+/**
+ * A rewritten version of a {@link org.apache.parquet.filter2.predicate.FilterPredicate} which receives
+ * the values for a record's columns one by one and internally tracks whether the predicate is
+ * satisfied, unsatisfied, or unknown.
+ *
+ * This is used to apply a predicate during record assembly, without assembling a second copy of
+ * a record, and without building a stack of update events.
+ *
+ * IncrementallyUpdatedFilterPredicate is implemented via the visitor pattern, as is
+ * {@link org.apache.parquet.filter2.predicate.FilterPredicate}
+ */
+public interface IncrementallyUpdatedFilterPredicate {
+
+  /**
+   * A Visitor for an {@link IncrementallyUpdatedFilterPredicate}, per the visitor pattern.
+   */
+  public static interface Visitor {
+    boolean visit(ValueInspector p);
+    boolean visit(And and);
+    boolean visit(Or or);
+  }
+
+  /**
+   * A {@link IncrementallyUpdatedFilterPredicate} must accept a {@link Visitor}, per the visitor pattern.
+   */
+  boolean accept(Visitor visitor);
+
+  /**
+   * This is the leaf node of a filter predicate. It receives the value for the primitive column it represents,
+   * and decides whether or not the predicate represented by this node is satisfied.
+   *
+   * It is stateful, and needs to be rest after use.
+   */
+  public static abstract class ValueInspector implements IncrementallyUpdatedFilterPredicate {
+    // package private constructor
+    ValueInspector() { }
+
+    private boolean result = false;
+    private boolean isKnown = false;
+
+    // these methods signal what the value is
+    public void updateNull() { throw new UnsupportedOperationException(); }
+    public void update(int value) { throw new UnsupportedOperationException(); }
+    public void update(long value) { throw new UnsupportedOperationException(); }
+    public void update(double value) { throw new UnsupportedOperationException(); }
+    public void update(float value) { throw new UnsupportedOperationException(); }
+    public void update(boolean value) { throw new UnsupportedOperationException(); }
+    public void update(Binary value) { throw new UnsupportedOperationException(); }
+
+    /**
+     * Reset to clear state and begin evaluating the next record.
+     */
+    public final void reset() {
+      isKnown = false;
+      result = false;
+    }
+
+    /**
+     * Subclasses should call this method to signal that the result of this predicate is known.
+     */
+    protected final void setResult(boolean result) {
+      if (isKnown) {
+        throw new IllegalStateException("setResult() called on a ValueInspector whose result is already known!"
+          + " Did you forget to call reset()?");
+      }
+      this.result = result;
+      this.isKnown = true;
+    }
+
+    /**
+     * Should only be called if {@link #isKnown} return true.
+     */
+    public final boolean getResult() {
+      if (!isKnown) {
+        throw new IllegalStateException("getResult() called on a ValueInspector whose result is not yet known!");
+      }
+      return result;
+    }
+
+    /**
+     * Return true if this inspector has received a value yet, false otherwise.
+     */
+    public final boolean isKnown() {
+      return isKnown;
+    }
+
+    @Override
+    public boolean accept(Visitor visitor) {
+      return visitor.visit(this);
+    }
+  }
+
+  // base class for and / or
+  static abstract class BinaryLogical implements IncrementallyUpdatedFilterPredicate {
+    private final IncrementallyUpdatedFilterPredicate left;
+    private final IncrementallyUpdatedFilterPredicate right;
+
+    BinaryLogical(IncrementallyUpdatedFilterPredicate left, IncrementallyUpdatedFilterPredicate right) {
+      this.left = checkNotNull(left, "left");
+      this.right = checkNotNull(right, "right");
+    }
+
+    public final IncrementallyUpdatedFilterPredicate getLeft() {
+      return left;
+    }
+
+    public final IncrementallyUpdatedFilterPredicate getRight() {
+      return right;
+    }
+  }
+
+  public static final class Or extends BinaryLogical {
+    Or(IncrementallyUpdatedFilterPredicate left, IncrementallyUpdatedFilterPredicate right) {
+      super(left, right);
+    }
+
+    @Override
+    public boolean accept(Visitor visitor) {
+      return visitor.visit(this);
+    }
+  }
+
+  public static final class And extends BinaryLogical {
+    And(IncrementallyUpdatedFilterPredicate left, IncrementallyUpdatedFilterPredicate right) {
+      super(left, right);
+    }
+
+    @Override
+    public boolean accept(Visitor visitor) {
+      return visitor.visit(this);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/filter2/recordlevel/IncrementallyUpdatedFilterPredicateBuilderBase.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/filter2/recordlevel/IncrementallyUpdatedFilterPredicateBuilderBase.java b/parquet-column/src/main/java/org/apache/parquet/filter2/recordlevel/IncrementallyUpdatedFilterPredicateBuilderBase.java
new file mode 100644
index 0000000..8def88e
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/filter2/recordlevel/IncrementallyUpdatedFilterPredicateBuilderBase.java
@@ -0,0 +1,97 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.filter2.recordlevel;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.parquet.hadoop.metadata.ColumnPath;
+import org.apache.parquet.filter2.predicate.FilterPredicate;
+import org.apache.parquet.filter2.predicate.FilterPredicate.Visitor;
+import org.apache.parquet.filter2.predicate.Operators.And;
+import org.apache.parquet.filter2.predicate.Operators.Not;
+import org.apache.parquet.filter2.predicate.Operators.Or;
+import org.apache.parquet.filter2.recordlevel.IncrementallyUpdatedFilterPredicate.ValueInspector;
+
+import static org.apache.parquet.Preconditions.checkArgument;
+
+/**
+ * The implementation of this abstract class is auto-generated by
+ * {@link org.apache.parquet.filter2.IncrementallyUpdatedFilterPredicateGenerator}
+ *
+ * Constructs a {@link IncrementallyUpdatedFilterPredicate} from a {@link org.apache.parquet.filter2.predicate.FilterPredicate}
+ * This is how records are filtered during record assembly. The implementation is generated in order to avoid autoboxing.
+ *
+ * Note: the supplied predicate must not contain any instances of the not() operator as this is not
+ * supported by this filter.
+ *
+ * the supplied predicate should first be run through {@link org.apache.parquet.filter2.predicate.LogicalInverseRewriter} to rewrite it
+ * in a form that doesn't make use of the not() operator.
+ *
+ * the supplied predicate should also have already been run through
+ * {@link org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator}
+ * to make sure it is compatible with the schema of this file.
+ *
+ * TODO: UserDefinedPredicates still autobox however
+ */
+public abstract class IncrementallyUpdatedFilterPredicateBuilderBase implements Visitor<IncrementallyUpdatedFilterPredicate> {
+  private boolean built = false;
+  private final Map<ColumnPath, List<ValueInspector>> valueInspectorsByColumn = new HashMap<ColumnPath, List<ValueInspector>>();
+
+  public IncrementallyUpdatedFilterPredicateBuilderBase() { }
+
+  public final IncrementallyUpdatedFilterPredicate build(FilterPredicate pred) {
+    checkArgument(!built, "This builder has already been used");
+    IncrementallyUpdatedFilterPredicate incremental = pred.accept(this);
+    built = true;
+    return incremental;
+  }
+
+  protected final void addValueInspector(ColumnPath columnPath, ValueInspector valueInspector) {
+    List<ValueInspector> valueInspectors = valueInspectorsByColumn.get(columnPath);
+    if (valueInspectors == null) {
+      valueInspectors = new ArrayList<ValueInspector>();
+      valueInspectorsByColumn.put(columnPath, valueInspectors);
+    }
+    valueInspectors.add(valueInspector);
+  }
+
+  public Map<ColumnPath, List<ValueInspector>> getValueInspectorsByColumn() {
+    return valueInspectorsByColumn;
+  }
+
+  @Override
+  public final IncrementallyUpdatedFilterPredicate visit(And and) {
+    return new IncrementallyUpdatedFilterPredicate.And(and.getLeft().accept(this), and.getRight().accept(this));
+  }
+
+  @Override
+  public final IncrementallyUpdatedFilterPredicate visit(Or or) {
+    return new IncrementallyUpdatedFilterPredicate.Or(or.getLeft().accept(this), or.getRight().accept(this));
+  }
+
+  @Override
+  public final IncrementallyUpdatedFilterPredicate visit(Not not) {
+    throw new IllegalArgumentException(
+        "This predicate contains a not! Did you forget to run this predicate through LogicalInverseRewriter? " + not);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/filter2/recordlevel/IncrementallyUpdatedFilterPredicateEvaluator.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/filter2/recordlevel/IncrementallyUpdatedFilterPredicateEvaluator.java b/parquet-column/src/main/java/org/apache/parquet/filter2/recordlevel/IncrementallyUpdatedFilterPredicateEvaluator.java
new file mode 100644
index 0000000..d1aa66c
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/filter2/recordlevel/IncrementallyUpdatedFilterPredicateEvaluator.java
@@ -0,0 +1,63 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.filter2.recordlevel;
+
+import org.apache.parquet.filter2.recordlevel.IncrementallyUpdatedFilterPredicate.And;
+import org.apache.parquet.filter2.recordlevel.IncrementallyUpdatedFilterPredicate.Or;
+import org.apache.parquet.filter2.recordlevel.IncrementallyUpdatedFilterPredicate.ValueInspector;
+import org.apache.parquet.filter2.recordlevel.IncrementallyUpdatedFilterPredicate.Visitor;
+
+import static org.apache.parquet.Preconditions.checkNotNull;
+
+/**
+ * Determines whether an {@link IncrementallyUpdatedFilterPredicate} is satisfied or not.
+ * This implementation makes the assumption that all {@link ValueInspector}s in an unknown state
+ * represent columns with a null value, and updates them accordingly.
+ *
+ * TODO: We could also build an evaluator that detects if enough values are known to determine the outcome
+ * TODO: of the predicate and quit the record assembly early. (https://issues.apache.org/jira/browse/PARQUET-37)
+ */
+public class IncrementallyUpdatedFilterPredicateEvaluator implements Visitor {
+  private static final IncrementallyUpdatedFilterPredicateEvaluator INSTANCE = new IncrementallyUpdatedFilterPredicateEvaluator();
+
+  public static boolean evaluate(IncrementallyUpdatedFilterPredicate pred) {
+    checkNotNull(pred, "pred");
+    return pred.accept(INSTANCE);
+  }
+
+  private IncrementallyUpdatedFilterPredicateEvaluator() {}
+
+  @Override
+  public boolean visit(ValueInspector p) {
+    if (!p.isKnown()) {
+      p.updateNull();
+    }
+    return p.getResult();
+  }
+
+  @Override
+  public boolean visit(And and) {
+    return and.getLeft().accept(this) && and.getRight().accept(this);
+  }
+
+  @Override
+  public boolean visit(Or or) {
+    return or.getLeft().accept(this) || or.getRight().accept(this);
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/filter2/recordlevel/IncrementallyUpdatedFilterPredicateResetter.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/filter2/recordlevel/IncrementallyUpdatedFilterPredicateResetter.java b/parquet-column/src/main/java/org/apache/parquet/filter2/recordlevel/IncrementallyUpdatedFilterPredicateResetter.java
new file mode 100644
index 0000000..a75731a
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/filter2/recordlevel/IncrementallyUpdatedFilterPredicateResetter.java
@@ -0,0 +1,60 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.filter2.recordlevel;
+
+import org.apache.parquet.filter2.recordlevel.IncrementallyUpdatedFilterPredicate.And;
+import org.apache.parquet.filter2.recordlevel.IncrementallyUpdatedFilterPredicate.Or;
+import org.apache.parquet.filter2.recordlevel.IncrementallyUpdatedFilterPredicate.ValueInspector;
+import org.apache.parquet.filter2.recordlevel.IncrementallyUpdatedFilterPredicate.Visitor;
+
+import static org.apache.parquet.Preconditions.checkNotNull;
+
+/**
+ * Resets all the {@link ValueInspector}s in a {@link IncrementallyUpdatedFilterPredicate}.
+ */
+public final class IncrementallyUpdatedFilterPredicateResetter implements Visitor {
+  private static final IncrementallyUpdatedFilterPredicateResetter INSTANCE = new IncrementallyUpdatedFilterPredicateResetter();
+
+  public static void reset(IncrementallyUpdatedFilterPredicate pred) {
+    checkNotNull(pred, "pred");
+    pred.accept(INSTANCE);
+  }
+
+  private IncrementallyUpdatedFilterPredicateResetter() { }
+
+  @Override
+  public boolean visit(ValueInspector p) {
+    p.reset();
+    return false;
+  }
+
+  @Override
+  public boolean visit(And and) {
+    and.getLeft().accept(this);
+    and.getRight().accept(this);
+    return false;
+  }
+
+  @Override
+  public boolean visit(Or or) {
+    or.getLeft().accept(this);
+    or.getRight().accept(this);
+    return false;
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/io/BaseRecordReader.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/io/BaseRecordReader.java b/parquet-column/src/main/java/org/apache/parquet/io/BaseRecordReader.java
new file mode 100644
index 0000000..f2d88fc
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/io/BaseRecordReader.java
@@ -0,0 +1,144 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.io;
+
+import static org.apache.parquet.Log.DEBUG;
+import org.apache.parquet.Log;
+import org.apache.parquet.column.ColumnReadStore;
+import org.apache.parquet.io.RecordReaderImplementation.State;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.io.api.RecordConsumer;
+import org.apache.parquet.io.api.RecordMaterializer;
+
+// TODO(julien): this class appears to be unused -- can it be nuked? - todd
+public abstract class BaseRecordReader<T> extends RecordReader<T> {
+  private static final Log LOG = Log.getLog(BaseRecordReader.class);
+
+  public RecordConsumer recordConsumer;
+  public RecordMaterializer<T> recordMaterializer;
+  public ColumnReadStore columnStore;
+  @Override
+  public T read() {
+    readOneRecord();
+    return recordMaterializer.getCurrentRecord();
+  }
+
+  protected abstract void readOneRecord();
+
+  State[] caseLookup;
+
+  private String endField;
+
+  private int endIndex;
+
+  protected void currentLevel(int currentLevel) {
+    if (DEBUG) LOG.debug("currentLevel: "+currentLevel);
+  }
+
+  protected void log(String message) {
+    if (DEBUG) LOG.debug("bc: "+message);
+  }
+
+  final protected int getCaseId(int state, int currentLevel, int d, int nextR) {
+    return caseLookup[state].getCase(currentLevel, d, nextR).getID();
+  }
+
+  final protected void startMessage() {
+    // reset state
+    endField = null;
+    if (DEBUG) LOG.debug("startMessage()");
+    recordConsumer.startMessage();
+  }
+
+  final protected void startGroup(String field, int index) {
+    startField(field, index);
+    if (DEBUG) LOG.debug("startGroup()");
+    recordConsumer.startGroup();
+  }
+
+  private void startField(String field, int index) {
+    if (DEBUG) LOG.debug("startField("+field+","+index+")");
+    if (endField != null && index == endIndex) {
+      // skip the close/open tag
+      endField = null;
+    } else {
+      if (endField != null) {
+        // close the previous field
+        recordConsumer.endField(endField, endIndex);
+        endField = null;
+      }
+      recordConsumer.startField(field, index);
+    }
+  }
+
+  final protected void addPrimitiveINT64(String field, int index, long value) {
+    startField(field, index);
+    if (DEBUG) LOG.debug("addLong("+value+")");
+    recordConsumer.addLong(value);
+    endField(field, index);
+  }
+
+  private void endField(String field, int index) {
+    if (DEBUG) LOG.debug("endField("+field+","+index+")");
+    if (endField != null) {
+      recordConsumer.endField(endField, endIndex);
+    }
+    endField = field;
+    endIndex = index;
+  }
+
+  final protected void addPrimitiveBINARY(String field, int index, Binary value) {
+    startField(field, index);
+    if (DEBUG) LOG.debug("addBinary("+value+")");
+    recordConsumer.addBinary(value);
+    endField(field, index);
+  }
+
+  final protected void addPrimitiveINT32(String field, int index, int value) {
+    startField(field, index);
+    if (DEBUG) LOG.debug("addInteger("+value+")");
+    recordConsumer.addInteger(value);
+    endField(field, index);
+  }
+
+  final protected void endGroup(String field, int index) {
+    if (endField != null) {
+      // close the previous field
+      recordConsumer.endField(endField, endIndex);
+      endField = null;
+    }
+    if (DEBUG) LOG.debug("endGroup()");
+    recordConsumer.endGroup();
+    endField(field, index);
+  }
+
+  final protected void endMessage() {
+    if (endField != null) {
+      // close the previous field
+      recordConsumer.endField(endField, endIndex);
+      endField = null;
+    }
+    if (DEBUG) LOG.debug("endMessage()");
+    recordConsumer.endMessage();
+  }
+
+  protected void error(String message) {
+    throw new ParquetDecodingException(message);
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/io/ColumnIO.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/io/ColumnIO.java b/parquet-column/src/main/java/org/apache/parquet/io/ColumnIO.java
new file mode 100644
index 0000000..95a969e
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/io/ColumnIO.java
@@ -0,0 +1,138 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.io;
+
+
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.parquet.Log;
+import org.apache.parquet.schema.Type;
+import org.apache.parquet.schema.Type.Repetition;
+
+/**
+ * a structure used to serialize deserialize records
+ *
+ * @author Julien Le Dem
+ *
+ */
+abstract public class ColumnIO {
+
+  static final boolean DEBUG = Log.DEBUG;
+
+  private final GroupColumnIO parent;
+  private final Type type;
+  private final String name;
+  private final int index;
+  private int repetitionLevel;
+  private int definitionLevel;
+  private String[] fieldPath;
+  private int[] indexFieldPath;
+
+
+  ColumnIO(Type type, GroupColumnIO parent, int index) {
+    this.type = type;
+    this.parent = parent;
+    this.index = index;
+    this.name = type.getName();
+  }
+
+  String[] getFieldPath() {
+    return fieldPath;
+  }
+
+  public String getFieldPath(int level) {
+    return fieldPath[level];
+  }
+
+  public int[] getIndexFieldPath() {
+    return indexFieldPath;
+  }
+
+  public int getIndexFieldPath(int level) {
+    return indexFieldPath[level];
+  }
+
+  public int getIndex() {
+    return this.index;
+  }
+
+  public String getName() {
+    return name;
+  }
+
+  int getRepetitionLevel() {
+    return repetitionLevel;
+  }
+
+  int getDefinitionLevel() {
+    return definitionLevel;
+  }
+
+  void setRepetitionLevel(int repetitionLevel) {
+    this.repetitionLevel = repetitionLevel;
+  }
+
+  void setDefinitionLevel(int definitionLevel) {
+    this.definitionLevel = definitionLevel;
+  }
+
+  void setFieldPath(String[] fieldPath, int[] indexFieldPath) {
+    this.fieldPath = fieldPath;
+    this.indexFieldPath = indexFieldPath;
+  }
+
+  public Type getType() {
+    return type;
+  }
+
+  void setLevels(int r, int d, String[] fieldPath, int[] indexFieldPath, List<ColumnIO> repetition, List<ColumnIO> path) {
+    setRepetitionLevel(r);
+    setDefinitionLevel(d);
+    setFieldPath(fieldPath, indexFieldPath);
+  }
+
+  abstract List<String[]> getColumnNames();
+
+  public GroupColumnIO getParent() {
+    return parent;
+  }
+
+  abstract PrimitiveColumnIO getLast();
+  abstract PrimitiveColumnIO getFirst();
+
+  ColumnIO getParent(int r) {
+    if (getRepetitionLevel() == r && getType().isRepetition(Repetition.REPEATED)) {
+      return this;
+    } else  if (getParent()!=null && getParent().getDefinitionLevel()>=r) {
+      return getParent().getParent(r);
+    } else {
+      throw new InvalidRecordException("no parent("+r+") for "+Arrays.toString(this.getFieldPath()));
+    }
+  }
+
+  @Override
+  public String toString() {
+    return this.getClass().getSimpleName()+" "+type.getName()
+        +" r:"+repetitionLevel
+        +" d:"+definitionLevel
+        +" "+Arrays.toString(fieldPath);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/io/ColumnIOFactory.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/io/ColumnIOFactory.java b/parquet-column/src/main/java/org/apache/parquet/io/ColumnIOFactory.java
new file mode 100644
index 0000000..71af780
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/io/ColumnIOFactory.java
@@ -0,0 +1,161 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.io;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.Type;
+import org.apache.parquet.schema.TypeVisitor;
+
+/**
+ * Factory constructing the ColumnIO structure from the schema
+ *
+ * @author Julien Le Dem
+ *
+ */
+public class ColumnIOFactory {
+
+  public class ColumnIOCreatorVisitor implements TypeVisitor {
+
+    private MessageColumnIO columnIO;
+    private GroupColumnIO current;
+    private List<PrimitiveColumnIO> leaves = new ArrayList<PrimitiveColumnIO>();
+    private final boolean validating;
+    private final MessageType requestedSchema;
+    private int currentRequestedIndex;
+    private Type currentRequestedType;
+    private boolean strictTypeChecking;
+
+    public ColumnIOCreatorVisitor(boolean validating, MessageType requestedSchema) {
+      this(validating, requestedSchema, true);
+    }
+    
+    public ColumnIOCreatorVisitor(boolean validating, MessageType requestedSchema, boolean strictTypeChecking) {
+      this.validating = validating;
+      this.requestedSchema = requestedSchema;
+      this.strictTypeChecking = strictTypeChecking;
+    }
+
+    @Override
+    public void visit(MessageType messageType) {
+      columnIO = new MessageColumnIO(requestedSchema, validating);
+      visitChildren(columnIO, messageType, requestedSchema);
+      columnIO.setLevels();
+      columnIO.setLeaves(leaves);
+    }
+
+    @Override
+    public void visit(GroupType groupType) {
+      if (currentRequestedType.isPrimitive()) {
+        incompatibleSchema(groupType, currentRequestedType);
+      }
+      GroupColumnIO newIO = new GroupColumnIO(groupType, current, currentRequestedIndex);
+      current.add(newIO);
+      visitChildren(newIO, groupType, currentRequestedType.asGroupType());
+    }
+
+    private void visitChildren(GroupColumnIO newIO, GroupType groupType, GroupType requestedGroupType) {
+      GroupColumnIO oldIO = current;
+      current = newIO;
+      for (Type type : groupType.getFields()) {
+        // if the file schema does not contain the field it will just stay null
+        if (requestedGroupType.containsField(type.getName())) {
+          currentRequestedIndex = requestedGroupType.getFieldIndex(type.getName());
+          currentRequestedType = requestedGroupType.getType(currentRequestedIndex);
+          if (currentRequestedType.getRepetition().isMoreRestrictiveThan(type.getRepetition())) {
+            incompatibleSchema(type, currentRequestedType);
+          }
+          type.accept(this);
+        }
+      }
+      current = oldIO;
+    }
+
+    @Override
+    public void visit(PrimitiveType primitiveType) {
+      if (!currentRequestedType.isPrimitive() || 
+              (this.strictTypeChecking && currentRequestedType.asPrimitiveType().getPrimitiveTypeName() != primitiveType.getPrimitiveTypeName())) {
+        incompatibleSchema(primitiveType, currentRequestedType);
+      }
+      PrimitiveColumnIO newIO = new PrimitiveColumnIO(primitiveType, current, currentRequestedIndex, leaves.size());
+      current.add(newIO);
+      leaves.add(newIO);
+    }
+
+    private void incompatibleSchema(Type fileType, Type requestedType) {
+      throw new ParquetDecodingException("The requested schema is not compatible with the file schema. incompatible types: " + requestedType + " != " + fileType);
+    }
+
+    public MessageColumnIO getColumnIO() {
+      return columnIO;
+    }
+
+  }
+
+  private final boolean validating;
+
+  /**
+   * validation is off by default
+   */
+  public ColumnIOFactory() {
+    this(false);
+  }
+
+  /**
+   * @param validating to turn validation on
+   */
+  public ColumnIOFactory(boolean validating) {
+    super();
+    this.validating = validating;
+  }
+
+  /**
+   * @param schema the requestedSchema we want to read/write
+   * @param fileSchema the file schema (when reading it can be different from the requested schema)
+   * @return the corresponding serializing/deserializing structure
+   */
+  public MessageColumnIO getColumnIO(MessageType requestedSchema, MessageType fileSchema) {
+    return getColumnIO(requestedSchema, fileSchema, true);
+  }
+  
+  /**
+   * @param schema the requestedSchema we want to read/write
+   * @param fileSchema the file schema (when reading it can be different from the requested schema)
+   * @param strict should file type and requested primitive types match
+   * @return the corresponding serializing/deserializing structure
+   */
+  public MessageColumnIO getColumnIO(MessageType requestedSchema, MessageType fileSchema, boolean strict) {
+    ColumnIOCreatorVisitor visitor = new ColumnIOCreatorVisitor(validating, requestedSchema, strict);
+    fileSchema.accept(visitor);
+    return visitor.getColumnIO();
+  }
+
+  /**
+   * @param schema the schema we want to read/write
+   * @return the corresponding serializing/deserializing structure
+   */
+  public MessageColumnIO getColumnIO(MessageType schema) {
+    return this.getColumnIO(schema, schema);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/io/CompilationException.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/io/CompilationException.java b/parquet-column/src/main/java/org/apache/parquet/io/CompilationException.java
new file mode 100644
index 0000000..e15ab2e
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/io/CompilationException.java
@@ -0,0 +1,47 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.io;
+
+import org.apache.parquet.ParquetRuntimeException;
+
+/**
+ * thrown when a problem occured while compiling the column reader
+ *
+ * @author Julien Le Dem
+ *
+ */
+public class CompilationException extends ParquetRuntimeException {
+  private static final long serialVersionUID = 1L;
+
+  public CompilationException() {
+  }
+
+  public CompilationException(String message, Throwable cause) {
+    super(message, cause);
+  }
+
+  public CompilationException(String message) {
+    super(message);
+  }
+
+  public CompilationException(Throwable cause) {
+    super(cause);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/io/EmptyRecordReader.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/io/EmptyRecordReader.java b/parquet-column/src/main/java/org/apache/parquet/io/EmptyRecordReader.java
new file mode 100644
index 0000000..671c651
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/io/EmptyRecordReader.java
@@ -0,0 +1,50 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.io;
+
+import org.apache.parquet.io.api.GroupConverter;
+import org.apache.parquet.io.api.RecordMaterializer;
+
+/**
+ * used to read empty schema
+ *
+ * @author Mickael Lacour <m....@criteo.com>
+ *
+ * @param <T> the type of the materialized record
+ */
+class EmptyRecordReader<T> extends RecordReader<T> {
+
+  private final GroupConverter recordConsumer;
+  private final RecordMaterializer<T> recordMaterializer;
+
+  public EmptyRecordReader(RecordMaterializer<T> recordMaterializer) {
+    this.recordMaterializer = recordMaterializer;
+    this.recordConsumer = recordMaterializer.getRootConverter(); // TODO: validator(wrap(recordMaterializer), validating, root.getType());
+  }
+
+  /**
+   * @see org.apache.parquet.io.RecordReader#read()
+   */
+  @Override
+  public T read() {
+    recordConsumer.start();
+    recordConsumer.end();
+    return recordMaterializer.getCurrentRecord();
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/io/FilteredRecordReader.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/io/FilteredRecordReader.java b/parquet-column/src/main/java/org/apache/parquet/io/FilteredRecordReader.java
new file mode 100644
index 0000000..3444b1f
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/io/FilteredRecordReader.java
@@ -0,0 +1,99 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.io;
+
+import org.apache.parquet.column.ColumnReader;
+import org.apache.parquet.column.impl.ColumnReadStoreImpl;
+import org.apache.parquet.filter.RecordFilter;
+import org.apache.parquet.filter.UnboundRecordFilter;
+import org.apache.parquet.io.api.RecordMaterializer;
+
+/**
+ * Extends the
+ * @author Jacob Metcalf
+ *
+ */
+class FilteredRecordReader<T> extends RecordReaderImplementation<T> {
+
+  private final RecordFilter recordFilter;
+  private final long recordCount;
+  private long recordsRead = 0;
+
+  /**
+   * @param root          the root of the schema
+   * @param validating
+   * @param columnStore
+   * @param unboundFilter Filter records, pass in NULL_FILTER to leave unfiltered.
+   */
+  public FilteredRecordReader(MessageColumnIO root, RecordMaterializer<T> recordMaterializer, boolean validating,
+                              ColumnReadStoreImpl columnStore, UnboundRecordFilter unboundFilter, long recordCount) {
+    super(root, recordMaterializer, validating, columnStore);
+    this.recordCount = recordCount;
+    if ( unboundFilter != null ) {
+      recordFilter = unboundFilter.bind(getColumnReaders());
+    } else {
+      recordFilter = null;
+    }
+  }
+
+  /**
+   * Override read() method to provide skip.
+   */
+  @Override
+  public T read() {
+    skipToMatch();
+    if (recordsRead == recordCount) {
+      return null;
+    }
+    ++ recordsRead;
+    return super.read();
+  }
+
+  // FilteredRecordReader skips forwards itself, it never asks the layer above to do the skipping for it.
+  // This is different from how filtering is handled in the filter2 API
+  @Override
+  public boolean shouldSkipCurrentRecord() {
+    return false;
+  }
+
+  /**
+   * Skips forwards until the filter finds the first match. Returns false
+   * if none found.
+   */
+  private void skipToMatch() {
+    while (recordsRead < recordCount && !recordFilter.isMatch()) {
+      State currentState = getState(0);
+      do {
+        ColumnReader columnReader = currentState.column;
+
+        // currentLevel = depth + 1 at this point
+        // set the current value
+        if (columnReader.getCurrentDefinitionLevel() >= currentState.maxDefinitionLevel) {
+          columnReader.skip();
+        }
+        columnReader.consume();
+
+        // Based on repetition level work out next state to go to
+        int nextR = currentState.maxRepetitionLevel == 0 ? 0 : columnReader.getCurrentRepetitionLevel();
+        currentState = currentState.getNextState(nextR);
+      } while (currentState != null);
+      ++ recordsRead;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/io/GroupColumnIO.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/io/GroupColumnIO.java b/parquet-column/src/main/java/org/apache/parquet/io/GroupColumnIO.java
new file mode 100644
index 0000000..1efe0d1
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/io/GroupColumnIO.java
@@ -0,0 +1,122 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.io;
+
+import static org.apache.parquet.schema.Type.Repetition.REPEATED;
+import static org.apache.parquet.schema.Type.Repetition.REQUIRED;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.parquet.Log;
+import org.apache.parquet.schema.GroupType;
+
+/**
+ * Group level of the IO structure
+ *
+ *
+ * @author Julien Le Dem
+ *
+ */
+public class GroupColumnIO extends ColumnIO {
+  private static final Log LOG = Log.getLog(GroupColumnIO.class);
+
+  private final Map<String, ColumnIO> childrenByName = new HashMap<String, ColumnIO>();
+  private final List<ColumnIO> children = new ArrayList<ColumnIO>();
+  private int childrenSize = 0;
+
+  GroupColumnIO(GroupType groupType, GroupColumnIO parent, int index) {
+    super(groupType, parent, index);
+  }
+
+  void add(ColumnIO child) {
+    children.add(child);
+    childrenByName.put(child.getType().getName(), child);
+    ++ childrenSize;
+  }
+
+  @Override
+  void setLevels(int r, int d, String[] fieldPath, int[] indexFieldPath, List<ColumnIO> repetition, List<ColumnIO> path) {
+    super.setLevels(r, d, fieldPath, indexFieldPath, repetition, path);
+    for (ColumnIO child : this.children) {
+      String[] newFieldPath = Arrays.copyOf(fieldPath, fieldPath.length + 1);
+      int[] newIndexFieldPath = Arrays.copyOf(indexFieldPath, indexFieldPath.length + 1);
+      newFieldPath[fieldPath.length] = child.getType().getName();
+      newIndexFieldPath[indexFieldPath.length] = child.getIndex();
+      List<ColumnIO> newRepetition;
+      if (child.getType().isRepetition(REPEATED)) {
+        newRepetition = new ArrayList<ColumnIO>(repetition);
+        newRepetition.add(child);
+      } else {
+        newRepetition = repetition;
+      }
+      List<ColumnIO> newPath = new ArrayList<ColumnIO>(path);
+      newPath.add(child);
+      child.setLevels(
+          // the type repetition level increases whenever there's a possible repetition
+          child.getType().isRepetition(REPEATED) ? r + 1 : r,
+          // the type definition level increases whenever a field can be missing (not required)
+          !child.getType().isRepetition(REQUIRED) ? d + 1 : d,
+          newFieldPath,
+          newIndexFieldPath,
+          newRepetition,
+          newPath
+          );
+
+    }
+  }
+
+  @Override
+  List<String[]> getColumnNames() {
+    ArrayList<String[]> result = new ArrayList<String[]>();
+    for (ColumnIO c : children) {
+      result.addAll(c.getColumnNames());
+    }
+    return result;
+  }
+
+  PrimitiveColumnIO getLast() {
+    return children.get(children.size()-1).getLast();
+  }
+
+  PrimitiveColumnIO getFirst() {
+    return children.get(0).getFirst();
+  }
+
+  public ColumnIO getChild(String name) {
+    return childrenByName.get(name);
+  }
+
+  public ColumnIO getChild(int fieldIndex) {
+    try {
+      return children.get(fieldIndex);
+    } catch (IndexOutOfBoundsException e) {
+      throw new InvalidRecordException("could not get child " + fieldIndex + " from " + children, e);
+    }
+  }
+
+  public int getChildrenCount() {
+    return childrenSize;
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/io/InvalidRecordException.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/io/InvalidRecordException.java b/parquet-column/src/main/java/org/apache/parquet/io/InvalidRecordException.java
new file mode 100644
index 0000000..d3d0111
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/io/InvalidRecordException.java
@@ -0,0 +1,48 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.io;
+
+import org.apache.parquet.ParquetRuntimeException;
+
+/**
+ * thrown when an invalid record is encountered
+ *
+ * @author Julien Le Dem
+ *
+ */
+public class InvalidRecordException extends ParquetRuntimeException {
+  private static final long serialVersionUID = 1L;
+
+  public InvalidRecordException() {
+    super();
+  }
+
+  public InvalidRecordException(String message, Throwable cause) {
+    super(message, cause);
+  }
+
+  public InvalidRecordException(String message) {
+    super(message);
+  }
+
+  public InvalidRecordException(Throwable cause) {
+    super(cause);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/io/MessageColumnIO.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/io/MessageColumnIO.java b/parquet-column/src/main/java/org/apache/parquet/io/MessageColumnIO.java
new file mode 100644
index 0000000..e24aedb
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/io/MessageColumnIO.java
@@ -0,0 +1,396 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.io;
+
+import java.util.Arrays;
+import java.util.BitSet;
+import java.util.List;
+
+import org.apache.parquet.Log;
+import org.apache.parquet.column.ColumnWriteStore;
+import org.apache.parquet.column.ColumnWriter;
+import org.apache.parquet.column.impl.ColumnReadStoreImpl;
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.filter.UnboundRecordFilter;
+import org.apache.parquet.filter2.compat.FilterCompat;
+import org.apache.parquet.filter2.compat.FilterCompat.Filter;
+import org.apache.parquet.filter2.compat.FilterCompat.FilterPredicateCompat;
+import org.apache.parquet.filter2.compat.FilterCompat.NoOpFilter;
+import org.apache.parquet.filter2.compat.FilterCompat.UnboundRecordFilterCompat;
+import org.apache.parquet.filter2.compat.FilterCompat.Visitor;
+import org.apache.parquet.filter2.predicate.FilterPredicate;
+import org.apache.parquet.filter2.recordlevel.FilteringRecordMaterializer;
+import org.apache.parquet.filter2.recordlevel.IncrementallyUpdatedFilterPredicate;
+import org.apache.parquet.filter2.recordlevel.IncrementallyUpdatedFilterPredicateBuilder;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.io.api.RecordConsumer;
+import org.apache.parquet.io.api.RecordMaterializer;
+import org.apache.parquet.schema.MessageType;
+
+import static org.apache.parquet.Preconditions.checkNotNull;
+
+/**
+ * Message level of the IO structure
+ *
+ *
+ * @author Julien Le Dem
+ *
+ */
+public class MessageColumnIO extends GroupColumnIO {
+  private static final Log logger = Log.getLog(MessageColumnIO.class);
+
+  private static final boolean DEBUG = Log.DEBUG;
+
+  private List<PrimitiveColumnIO> leaves;
+
+  private final boolean validating;
+
+  MessageColumnIO(MessageType messageType, boolean validating) {
+    super(messageType, null, 0);
+    this.validating = validating;
+  }
+
+  public List<String[]> getColumnNames() {
+    return super.getColumnNames();
+  }
+
+  public <T> RecordReader<T> getRecordReader(PageReadStore columns,
+                                             RecordMaterializer<T> recordMaterializer) {
+    return getRecordReader(columns, recordMaterializer, FilterCompat.NOOP);
+  }
+
+  /**
+   * @deprecated use {@link #getRecordReader(PageReadStore, RecordMaterializer, Filter)}
+   */
+  @Deprecated
+  public <T> RecordReader<T> getRecordReader(PageReadStore columns,
+                                             RecordMaterializer<T> recordMaterializer,
+                                             UnboundRecordFilter filter) {
+    return getRecordReader(columns, recordMaterializer, FilterCompat.get(filter));
+  }
+
+  public <T> RecordReader<T> getRecordReader(final PageReadStore columns,
+                                             final RecordMaterializer<T> recordMaterializer,
+                                             final Filter filter) {
+    checkNotNull(columns, "columns");
+    checkNotNull(recordMaterializer, "recordMaterializer");
+    checkNotNull(filter, "filter");
+
+    if (leaves.isEmpty()) {
+      return new EmptyRecordReader<T>(recordMaterializer);
+    }
+
+    return filter.accept(new Visitor<RecordReader<T>>() {
+      @Override
+      public RecordReader<T> visit(FilterPredicateCompat filterPredicateCompat) {
+
+        FilterPredicate predicate = filterPredicateCompat.getFilterPredicate();
+        IncrementallyUpdatedFilterPredicateBuilder builder = new IncrementallyUpdatedFilterPredicateBuilder();
+        IncrementallyUpdatedFilterPredicate streamingPredicate = builder.build(predicate);
+        RecordMaterializer<T> filteringRecordMaterializer = new FilteringRecordMaterializer<T>(
+            recordMaterializer,
+            leaves,
+            builder.getValueInspectorsByColumn(),
+            streamingPredicate);
+
+        return new RecordReaderImplementation<T>(
+            MessageColumnIO.this,
+            filteringRecordMaterializer,
+            validating,
+            new ColumnReadStoreImpl(columns, filteringRecordMaterializer.getRootConverter(), getType()));
+      }
+
+      @Override
+      public RecordReader<T> visit(UnboundRecordFilterCompat unboundRecordFilterCompat) {
+        return new FilteredRecordReader<T>(
+            MessageColumnIO.this,
+            recordMaterializer,
+            validating,
+            new ColumnReadStoreImpl(columns, recordMaterializer.getRootConverter(), getType()),
+            unboundRecordFilterCompat.getUnboundRecordFilter(),
+            columns.getRowCount()
+        );
+
+      }
+
+      @Override
+      public RecordReader<T> visit(NoOpFilter noOpFilter) {
+        return new RecordReaderImplementation<T>(
+            MessageColumnIO.this,
+            recordMaterializer,
+            validating,
+            new ColumnReadStoreImpl(columns, recordMaterializer.getRootConverter(), getType()));
+      }
+    });
+  }
+
+  private class MessageColumnIORecordConsumer extends RecordConsumer {
+    private ColumnIO currentColumnIO;
+    private int currentLevel = 0;
+
+    private class FieldsMarker {
+      private BitSet vistedIndexes = new BitSet();
+
+      @Override
+      public String toString() {
+        return "VistedIndex{" +
+                "vistedIndexes=" + vistedIndexes +
+                '}';
+      }
+
+      public void reset(int fieldsCount) {
+        this.vistedIndexes.clear(0, fieldsCount);
+      }
+
+      public void markWritten(int i) {
+        vistedIndexes.set(i);
+      }
+
+      public boolean isWritten(int i) {
+        return vistedIndexes.get(i);
+      }
+    }
+
+    //track at each level of depth, which fields are written, so nulls can be inserted for the unwritten fields
+    private final FieldsMarker[] fieldsWritten;
+    private final int[] r;
+    private final ColumnWriter[] columnWriter;
+    private final ColumnWriteStore columns;
+    private boolean emptyField = true;
+
+    public MessageColumnIORecordConsumer(ColumnWriteStore columns) {
+      this.columns = columns;
+      int maxDepth = 0;
+      this.columnWriter = new ColumnWriter[MessageColumnIO.this.getLeaves().size()];
+      for (PrimitiveColumnIO primitiveColumnIO : MessageColumnIO.this.getLeaves()) {
+        maxDepth = Math.max(maxDepth, primitiveColumnIO.getFieldPath().length);
+        columnWriter[primitiveColumnIO.getId()] = columns.getColumnWriter(primitiveColumnIO.getColumnDescriptor());
+      }
+
+      fieldsWritten = new FieldsMarker[maxDepth];
+      for (int i = 0; i < maxDepth; i++) {
+        fieldsWritten[i] = new FieldsMarker();
+      }
+      r = new int[maxDepth];
+    }
+
+    public void printState() {
+      log(currentLevel + ", " + fieldsWritten[currentLevel] + ": " + Arrays.toString(currentColumnIO.getFieldPath()) + " r:" + r[currentLevel]);
+      if (r[currentLevel] > currentColumnIO.getRepetitionLevel()) {
+        // sanity check
+        throw new InvalidRecordException(r[currentLevel] + "(r) > " + currentColumnIO.getRepetitionLevel() + " ( schema r)");
+      }
+    }
+
+    private void log(Object m) {
+      String indent = "";
+      for (int i = 0; i<currentLevel; ++i) {
+        indent += "  ";
+      }
+      logger.debug(indent + m);
+    }
+
+    @Override
+    public void startMessage() {
+      if (DEBUG) log("< MESSAGE START >");
+      currentColumnIO = MessageColumnIO.this;
+      r[0] = 0;
+      int numberOfFieldsToVisit = ((GroupColumnIO)currentColumnIO).getChildrenCount();
+      fieldsWritten[0].reset(numberOfFieldsToVisit);
+      if (DEBUG) printState();
+    }
+
+    @Override
+    public void endMessage() {
+      writeNullForMissingFieldsAtCurrentLevel();
+      columns.endRecord();
+      if (DEBUG) log("< MESSAGE END >");
+      if (DEBUG) printState();
+    }
+
+    @Override
+    public void startField(String field, int index) {
+      try {
+        if (DEBUG) log("startField(" + field + ", " + index + ")");
+        currentColumnIO = ((GroupColumnIO)currentColumnIO).getChild(index);
+        emptyField = true;
+        if (DEBUG) printState();
+      } catch (RuntimeException e) {
+        throw new ParquetEncodingException("error starting field " + field + " at " + index, e);
+      }
+    }
+
+    @Override
+    public void endField(String field, int index) {
+      if (DEBUG) log("endField(" + field + ", " + index + ")");
+      currentColumnIO = currentColumnIO.getParent();
+      if (emptyField) {
+        throw new ParquetEncodingException("empty fields are illegal, the field should be ommited completely instead");
+      }
+      fieldsWritten[currentLevel].markWritten(index);
+      r[currentLevel] = currentLevel == 0 ? 0 : r[currentLevel - 1];
+      if (DEBUG) printState();
+    }
+
+    private void writeNullForMissingFieldsAtCurrentLevel() {
+      int currentFieldsCount = ((GroupColumnIO)currentColumnIO).getChildrenCount();
+      for (int i = 0; i < currentFieldsCount; i++) {
+        if (!fieldsWritten[currentLevel].isWritten(i)) {
+          try {
+            ColumnIO undefinedField = ((GroupColumnIO)currentColumnIO).getChild(i);
+            int d = currentColumnIO.getDefinitionLevel();
+            if (DEBUG)
+              log(Arrays.toString(undefinedField.getFieldPath()) + ".writeNull(" + r[currentLevel] + "," + d + ")");
+            writeNull(undefinedField, r[currentLevel], d);
+          } catch (RuntimeException e) {
+            throw new ParquetEncodingException("error while writing nulls for fields of indexes " + i + " . current index: " + fieldsWritten[currentLevel], e);
+          }
+        }
+      }
+    }
+
+    private void writeNull(ColumnIO undefinedField, int r, int d) {
+      if (undefinedField.getType().isPrimitive()) {
+        columnWriter[((PrimitiveColumnIO)undefinedField).getId()].writeNull(r, d);
+      } else {
+        GroupColumnIO groupColumnIO = (GroupColumnIO)undefinedField;
+        int childrenCount = groupColumnIO.getChildrenCount();
+        for (int i = 0; i < childrenCount; i++) {
+          writeNull(groupColumnIO.getChild(i), r, d);
+        }
+      }
+    }
+
+    private void setRepetitionLevel() {
+      r[currentLevel] = currentColumnIO.getRepetitionLevel();
+      if (DEBUG) log("r: " + r[currentLevel]);
+    }
+
+    @Override
+    public void startGroup() {
+      if (DEBUG) log("startGroup()");
+
+      ++ currentLevel;
+      r[currentLevel] = r[currentLevel - 1];
+
+      int fieldsCount = ((GroupColumnIO)currentColumnIO).getChildrenCount();
+      fieldsWritten[currentLevel].reset(fieldsCount);
+      if (DEBUG) printState();
+    }
+
+    @Override
+    public void endGroup() {
+      if (DEBUG) log("endGroup()");
+      emptyField = false;
+      writeNullForMissingFieldsAtCurrentLevel();
+      -- currentLevel;
+
+      setRepetitionLevel();
+      if (DEBUG) printState();
+    }
+
+    private ColumnWriter getColumnWriter() {
+      return columnWriter[((PrimitiveColumnIO)currentColumnIO).getId()];
+    }
+
+    @Override
+    public void addInteger(int value) {
+      if (DEBUG) log("addInt(" + value + ")");
+      emptyField = false;
+      getColumnWriter().write(value, r[currentLevel], currentColumnIO.getDefinitionLevel());
+
+      setRepetitionLevel();
+      if (DEBUG) printState();
+    }
+
+    @Override
+    public void addLong(long value) {
+      if (DEBUG) log("addLong(" + value + ")");
+      emptyField = false;
+      getColumnWriter().write(value, r[currentLevel], currentColumnIO.getDefinitionLevel());
+
+      setRepetitionLevel();
+      if (DEBUG) printState();
+    }
+
+    @Override
+    public void addBoolean(boolean value) {
+      if (DEBUG) log("addBoolean(" + value + ")");
+      emptyField = false;
+      getColumnWriter().write(value, r[currentLevel], currentColumnIO.getDefinitionLevel());
+
+      setRepetitionLevel();
+      if (DEBUG) printState();
+    }
+
+    @Override
+    public void addBinary(Binary value) {
+      if (DEBUG) log("addBinary(" + value.length() + " bytes)");
+      emptyField = false;
+      getColumnWriter().write(value, r[currentLevel], currentColumnIO.getDefinitionLevel());
+
+      setRepetitionLevel();
+      if (DEBUG) printState();
+    }
+
+    @Override
+    public void addFloat(float value) {
+      if (DEBUG) log("addFloat(" + value + ")");
+      emptyField = false;
+      getColumnWriter().write(value, r[currentLevel], currentColumnIO.getDefinitionLevel());
+
+      setRepetitionLevel();
+      if (DEBUG) printState();
+    }
+
+    @Override
+    public void addDouble(double value) {
+      if (DEBUG) log("addDouble(" + value + ")");
+      emptyField = false;
+      getColumnWriter().write(value, r[currentLevel], currentColumnIO.getDefinitionLevel());
+
+      setRepetitionLevel();
+      if (DEBUG) printState();
+    }
+
+  }
+
+  public RecordConsumer getRecordWriter(ColumnWriteStore columns) {
+    RecordConsumer recordWriter = new MessageColumnIORecordConsumer(columns);
+    if (DEBUG) recordWriter = new RecordConsumerLoggingWrapper(recordWriter);
+    return validating ? new ValidatingRecordConsumer(recordWriter, getType()) : recordWriter;
+  }
+
+  void setLevels() {
+    setLevels(0, 0, new String[0], new int[0], Arrays.<ColumnIO>asList(this), Arrays.<ColumnIO>asList(this));
+  }
+
+  void setLeaves(List<PrimitiveColumnIO> leaves) {
+    this.leaves = leaves;
+  }
+
+  public List<PrimitiveColumnIO> getLeaves() {
+    return this.leaves;
+  }
+
+  @Override
+  public MessageType getType() {
+    return (MessageType)super.getType();
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/io/ParquetDecodingException.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/io/ParquetDecodingException.java b/parquet-column/src/main/java/org/apache/parquet/io/ParquetDecodingException.java
new file mode 100644
index 0000000..1007e32
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/io/ParquetDecodingException.java
@@ -0,0 +1,47 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.io;
+
+import org.apache.parquet.ParquetRuntimeException;
+
+/**
+ * thrown when an encoding problem occured
+ *
+ * @author Julien Le Dem
+ *
+ */
+public class ParquetDecodingException extends ParquetRuntimeException {
+  private static final long serialVersionUID = 1L;
+
+  public ParquetDecodingException() {
+  }
+
+  public ParquetDecodingException(String message, Throwable cause) {
+    super(message, cause);
+  }
+
+  public ParquetDecodingException(String message) {
+    super(message);
+  }
+
+  public ParquetDecodingException(Throwable cause) {
+    super(cause);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/io/ParquetEncodingException.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/io/ParquetEncodingException.java b/parquet-column/src/main/java/org/apache/parquet/io/ParquetEncodingException.java
new file mode 100644
index 0000000..05f9c56
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/io/ParquetEncodingException.java
@@ -0,0 +1,47 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.io;
+
+import org.apache.parquet.ParquetRuntimeException;
+
+/**
+ * thrown when a decoding problem occured
+ *
+ * @author Julien Le Dem
+ *
+ */
+public class ParquetEncodingException extends ParquetRuntimeException {
+  private static final long serialVersionUID = 1L;
+
+  public ParquetEncodingException() {
+  }
+
+  public ParquetEncodingException(String message, Throwable cause) {
+    super(message, cause);
+  }
+
+  public ParquetEncodingException(String message) {
+    super(message);
+  }
+
+  public ParquetEncodingException(Throwable cause) {
+    super(cause);
+  }
+
+}