You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by bl...@apache.org on 2015/04/28 01:12:34 UTC

[37/51] [partial] parquet-mr git commit: PARQUET-23: Rename to org.apache.parquet.

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/io/PrimitiveColumnIO.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/io/PrimitiveColumnIO.java b/parquet-column/src/main/java/org/apache/parquet/io/PrimitiveColumnIO.java
new file mode 100644
index 0000000..15c28c8
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/io/PrimitiveColumnIO.java
@@ -0,0 +1,113 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.io;
+
+
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.schema.Type;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
+
+
+/**
+ * Primitive level of the IO structure
+ *
+ *
+ * @author Julien Le Dem
+ *
+ */
+public class PrimitiveColumnIO extends ColumnIO {
+//  private static final Logger logger = Logger.getLogger(PrimitiveColumnIO.class.getName());
+
+  private ColumnIO[] path;
+  private ColumnDescriptor columnDescriptor;
+  private final int id;
+
+  PrimitiveColumnIO(Type type, GroupColumnIO parent, int index, int id) {
+    super(type, parent, index);
+    this.id = id;
+  }
+
+  @Override
+  void setLevels(int r, int d, String[] fieldPath, int[] fieldIndexPath, List<ColumnIO> repetition, List<ColumnIO> path) {
+    super.setLevels(r, d, fieldPath, fieldIndexPath, repetition, path);
+    PrimitiveType type = getType().asPrimitiveType();
+    this.columnDescriptor = new ColumnDescriptor(
+        fieldPath, 
+        type.getPrimitiveTypeName(),
+        type.getTypeLength(),
+        getRepetitionLevel(), 
+        getDefinitionLevel());
+    this.path = path.toArray(new ColumnIO[path.size()]);
+  }
+
+  @Override
+  List<String[]> getColumnNames() {
+    return Arrays.asList(new String[][] { getFieldPath() });
+  }
+
+  public ColumnDescriptor getColumnDescriptor() {
+    return columnDescriptor;
+  }
+
+  public ColumnIO[] getPath() {
+    return path;
+  }
+
+  public boolean isLast(int r) {
+    return getLast(r) == this;
+  }
+
+  private PrimitiveColumnIO getLast(int r) {
+    ColumnIO parent = getParent(r);
+
+    PrimitiveColumnIO last = parent.getLast();
+    return last;
+  }
+
+  @Override
+  PrimitiveColumnIO getLast() {
+    return this;
+  }
+
+  @Override
+  PrimitiveColumnIO getFirst() {
+    return this;
+  }
+  public boolean isFirst(int r) {
+    return getFirst(r) == this;
+  }
+
+  private PrimitiveColumnIO getFirst(int r) {
+    ColumnIO parent = getParent(r);
+    return parent.getFirst();
+  }
+
+  public PrimitiveTypeName getPrimitive() {
+    return getType().asPrimitiveType().getPrimitiveTypeName();
+  }
+
+  public int getId() {
+    return id;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/io/RecordConsumerLoggingWrapper.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/io/RecordConsumerLoggingWrapper.java b/parquet-column/src/main/java/org/apache/parquet/io/RecordConsumerLoggingWrapper.java
new file mode 100644
index 0000000..f753b2a
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/io/RecordConsumerLoggingWrapper.java
@@ -0,0 +1,178 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.io;
+
+import java.util.Arrays;
+import org.apache.parquet.Log;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.io.api.RecordConsumer;
+
+/**
+ * This class can be used to wrap an actual RecordConsumer and log all calls
+ *
+ * @author Julien Le Dem
+ *
+ */
+public class RecordConsumerLoggingWrapper extends RecordConsumer {
+    private static final Log logger = Log.getLog(RecordConsumerLoggingWrapper.class);
+    private static final boolean DEBUG = Log.DEBUG;
+
+    private final RecordConsumer delegate;
+
+    int indent = 0;
+
+    /**
+     * all calls a delegate to the wrapped delegate
+     * @param delegate
+     */
+    public RecordConsumerLoggingWrapper(RecordConsumer delegate) {
+      this.delegate = delegate;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void startField(String field, int index) {
+      if (DEBUG) logOpen(field);
+      delegate.startField(field, index);
+    }
+
+    private void logOpen(String field) {
+      log("<"+field+">");
+    }
+
+    private String indent() {
+      StringBuilder result = new StringBuilder();
+      for (int i = 0; i < indent; i++) {
+        result.append("  ");
+      }
+      return result.toString();
+    }
+
+    private void log(Object value) {
+      logger.debug(indent() + value);
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void startGroup() {
+      if (DEBUG) ++indent;
+      if (DEBUG) log("<!-- start group -->");
+      delegate.startGroup();
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void addInteger(int value) {
+      if (DEBUG) log(value);
+      delegate.addInteger(value);
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void addLong(long value) {
+      if (DEBUG) log(value);
+      delegate.addLong(value);
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void addBoolean(boolean value) {
+      if (DEBUG) log(value);
+      delegate.addBoolean(value);
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void addBinary(Binary value) {
+      if (DEBUG) log(Arrays.toString(value.getBytes()));
+      delegate.addBinary(value);
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void addFloat(float value) {
+      if (DEBUG) log(value);
+      delegate.addFloat(value);
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void addDouble(double value) {
+      if (DEBUG) log(value);
+      delegate.addDouble(value);
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void endGroup() {
+      if (DEBUG) log("<!-- end group -->");
+      if (DEBUG) --indent;
+      delegate.endGroup();
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void endField(String field, int index) {
+      if (DEBUG) logClose(field);
+      delegate.endField(field, index);
+    }
+
+    private void logClose(String field) {
+      log("</"+field+">");
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void startMessage() {
+      if (DEBUG) log("<!-- start message -->");
+      delegate.startMessage();
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void endMessage() {
+      delegate.endMessage();
+      if (DEBUG) log("<!-- end message -->");
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/io/RecordReader.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/io/RecordReader.java b/parquet-column/src/main/java/org/apache/parquet/io/RecordReader.java
new file mode 100644
index 0000000..a9eb2f5
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/io/RecordReader.java
@@ -0,0 +1,43 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.io;
+
+
+/**
+ * used to read reassembled records
+ * @author Julien Le Dem
+ *
+ * @param <T> the type of the materialized record
+ */
+public abstract class RecordReader<T> {
+
+  /**
+   * Reads one record and returns it.
+   * @return the materialized record
+   */
+  public abstract T read();
+
+  /**
+   * Returns whether the current record should be skipped (dropped)
+   * Will be called *after* read()
+   */
+  public boolean shouldSkipCurrentRecord() {
+    return false;
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/io/RecordReaderImplementation.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/io/RecordReaderImplementation.java b/parquet-column/src/main/java/org/apache/parquet/io/RecordReaderImplementation.java
new file mode 100644
index 0000000..7a87cbb
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/io/RecordReaderImplementation.java
@@ -0,0 +1,473 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.io;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.parquet.Log;
+import org.apache.parquet.column.ColumnReader;
+import org.apache.parquet.column.impl.ColumnReadStoreImpl;
+import org.apache.parquet.io.api.Converter;
+import org.apache.parquet.io.api.GroupConverter;
+import org.apache.parquet.io.api.PrimitiveConverter;
+import org.apache.parquet.io.api.RecordConsumer;
+import org.apache.parquet.io.api.RecordMaterializer;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
+
+
+/**
+ * used to read reassembled records
+ * @author Julien Le Dem
+ *
+ * @param <T> the type of the materialized record
+ */
+class RecordReaderImplementation<T> extends RecordReader<T> {
+  private static final Log LOG = Log.getLog(RecordReaderImplementation.class);
+
+  public static class Case {
+
+    private int id;
+    private final int startLevel;
+    private final int depth;
+    private final int nextLevel;
+    private final boolean goingUp;
+    private final boolean goingDown;
+    private final int nextState;
+    private final boolean defined;
+
+    public Case(int startLevel, int depth, int nextLevel, int nextState, boolean defined) {
+      this.startLevel = startLevel;
+      this.depth = depth;
+      this.nextLevel = nextLevel;
+      this.nextState = nextState;
+      this.defined = defined;
+      // means going up the tree (towards the leaves) of the record
+      // true if we need to open up groups in this case
+      goingUp = startLevel <= depth;
+      // means going down the tree (towards the root) of the record
+      // true if we need to close groups in this case
+      goingDown = depth + 1 > nextLevel;
+    }
+
+    public void setID(int id) {
+      this.id = id;
+    }
+
+    @Override
+    // this implementation is buggy but the simpler one bellow has duplicates.
+    // it still works but generates more code than necessary
+    // a middle ground is necessary
+//    public int hashCode() {
+//      int hashCode = 0;
+//      if (goingUp) {
+//        hashCode += 1 * (1 + startLevel) + 2 * (1 + depth);
+//      }
+//      if (goingDown) {
+//        hashCode += 3 * (1 + depth) + 5 * (1 + nextLevel);
+//      }
+//      return hashCode;
+//    }
+
+    public int hashCode() {
+      int hashCode = 17;
+      hashCode += 31 * startLevel;
+      hashCode += 31 * depth;
+      hashCode += 31 * nextLevel;
+      hashCode += 31 * nextState;
+      hashCode += 31 * (defined ? 0 : 1);
+      return hashCode;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (obj instanceof Case) {
+        return equals((Case)obj);
+      }
+      return false;
+    };
+
+    // see comment for hashCode above
+//    public boolean equals(Case other) {
+//      if (goingUp && !other.goingUp || !goingUp && other.goingUp) {
+//        return false;
+//      }
+//      if (goingUp && other.goingUp && (startLevel != other.startLevel || depth != other.depth)) {
+//        return false;
+//      }
+//      if (goingDown && !other.goingDown || !goingDown && other.goingDown) {
+//        return false;
+//      }
+//      if (goingDown && other.goingDown && (depth != other.depth || nextLevel != other.nextLevel)) {
+//        return false;
+//      }
+//      return true;
+//    }
+
+    public boolean equals(Case other) {
+      return startLevel == other.startLevel
+          && depth == other.depth
+          && nextLevel == other.nextLevel
+          && nextState == other.nextState
+          && ((defined && other.defined) || (!defined && !other.defined));
+    }
+
+    public int getID() {
+      return id;
+    }
+
+    public int getStartLevel() {
+      return startLevel;
+    }
+
+    public int getDepth() {
+      return depth;
+    }
+    public int getNextLevel() {
+      return nextLevel;
+    }
+
+    public int getNextState() {
+      return nextState;
+    }
+
+    public boolean isGoingUp() {
+      return goingUp;
+    }
+
+    public boolean isGoingDown() {
+      return goingDown;
+    }
+
+    public boolean isDefined() {
+      return defined;
+    }
+
+    @Override
+    public String toString() {
+      return "Case " + startLevel + " -> " + depth + " -> " + nextLevel + "; goto sate_"+getNextState();
+    }
+
+  }
+
+  public static class State {
+
+    public final int id;
+    public final PrimitiveColumnIO primitiveColumnIO;
+    public final int maxDefinitionLevel;
+    public final int maxRepetitionLevel;
+    public final PrimitiveTypeName primitive;
+    public final ColumnReader column;
+    public final String[] fieldPath; // indexed by currentLevel
+    public final int[] indexFieldPath; // indexed by currentLevel
+    public final GroupConverter[] groupConverterPath;
+    public final PrimitiveConverter primitiveConverter;
+    public final String primitiveField;
+    public final int primitiveFieldIndex;
+    public final int[] nextLevel; //indexed by next r
+
+    private int[] definitionLevelToDepth; // indexed by current d
+    private State[] nextState; // indexed by next r
+    private Case[][][] caseLookup;
+    private List<Case> definedCases;
+    private List<Case> undefinedCases;
+
+    private State(int id, PrimitiveColumnIO primitiveColumnIO, ColumnReader column, int[] nextLevel, GroupConverter[] groupConverterPath, PrimitiveConverter primitiveConverter) {
+      this.id = id;
+      this.primitiveColumnIO = primitiveColumnIO;
+      this.maxDefinitionLevel = primitiveColumnIO.getDefinitionLevel();
+      this.maxRepetitionLevel = primitiveColumnIO.getRepetitionLevel();
+      this.column = column;
+      this.nextLevel = nextLevel;
+      this.groupConverterPath = groupConverterPath;
+      this.primitiveConverter = primitiveConverter;
+      this.primitive = primitiveColumnIO.getType().asPrimitiveType().getPrimitiveTypeName();
+      this.fieldPath = primitiveColumnIO.getFieldPath();
+      this.primitiveField = fieldPath[fieldPath.length - 1];
+      this.indexFieldPath = primitiveColumnIO.getIndexFieldPath();
+      this.primitiveFieldIndex = indexFieldPath[indexFieldPath.length - 1];
+    }
+
+    public int getDepth(int definitionLevel) {
+      return definitionLevelToDepth[definitionLevel];
+    }
+
+    public List<Case> getDefinedCases() {
+      return definedCases;
+    }
+
+    public List<Case> getUndefinedCases() {
+      return undefinedCases;
+    }
+
+    public Case getCase(int currentLevel, int d, int nextR) {
+      return caseLookup[currentLevel][d][nextR];
+    }
+
+    public State getNextState(int nextR) {
+      return nextState[nextR];
+    }
+  }
+
+  private final GroupConverter recordRootConverter;
+  private final RecordMaterializer<T> recordMaterializer;
+
+  private State[] states;
+  private ColumnReader[] columnReaders;
+
+  private boolean shouldSkipCurrentRecord = false;
+
+  /**
+   * @param root the root of the schema
+   * @param recordMaterializer responsible of materializing the records
+   * @param validating whether we should validate against the schema
+   * @param columnStore where to read the column data from
+   */
+  public RecordReaderImplementation(MessageColumnIO root, RecordMaterializer<T> recordMaterializer, boolean validating, ColumnReadStoreImpl columnStore) {
+    this.recordMaterializer = recordMaterializer;
+    this.recordRootConverter = recordMaterializer.getRootConverter(); // TODO: validator(wrap(recordMaterializer), validating, root.getType());
+    PrimitiveColumnIO[] leaves = root.getLeaves().toArray(new PrimitiveColumnIO[root.getLeaves().size()]);
+    columnReaders = new ColumnReader[leaves.length];
+    int[][] nextColumnIdxForRepLevel = new int[leaves.length][];
+    int[][] levelToClose = new int[leaves.length][];
+    GroupConverter[][] groupConverterPaths = new GroupConverter[leaves.length][];
+    PrimitiveConverter[] leafConverters = new PrimitiveConverter[leaves.length];
+    int[] firstIndexForLevel  = new int[256]; // "256 levels of nesting ought to be enough for anybody"
+    // build the automaton
+    for (int i = 0; i < leaves.length; i++) {
+      PrimitiveColumnIO leafColumnIO = leaves[i];
+      //generate converters along the path from root to leaf
+      final int[] indexFieldPath = leafColumnIO.getIndexFieldPath();
+      groupConverterPaths[i] = new GroupConverter[indexFieldPath.length - 1];
+      GroupConverter current = this.recordRootConverter;
+      for (int j = 0; j < indexFieldPath.length - 1; j++) {
+        current = current.getConverter(indexFieldPath[j]).asGroupConverter();
+        groupConverterPaths[i][j] = current;
+      }
+      leafConverters[i] = current.getConverter(indexFieldPath[indexFieldPath.length - 1]).asPrimitiveConverter();
+      columnReaders[i] = columnStore.getColumnReader(leafColumnIO.getColumnDescriptor());
+      int maxRepetitionLevel = leafColumnIO.getRepetitionLevel();
+      nextColumnIdxForRepLevel[i] = new int[maxRepetitionLevel+1];
+
+      levelToClose[i] = new int[maxRepetitionLevel+1]; //next level
+      for (int nextRepLevel = 0; nextRepLevel <= maxRepetitionLevel; ++nextRepLevel) {
+        // remember which is the first for this level
+        if (leafColumnIO.isFirst(nextRepLevel)) {
+          firstIndexForLevel[nextRepLevel] = i;
+        }
+        int nextColIdx;
+        //TODO: when we use nextColumnIdxForRepLevel, should we provide current rep level or the rep level for next item
+        // figure out automaton transition
+        if (nextRepLevel == 0) { // 0 always means jump to the next (the last one being a special case)
+          nextColIdx = i + 1;
+        } else if (leafColumnIO.isLast(nextRepLevel)) { // when we are at the last of the next repetition level we jump back to the first
+          nextColIdx = firstIndexForLevel[nextRepLevel];
+        } else { // otherwise we just go back to the next.
+          nextColIdx = i + 1;
+        }
+        // figure out which level down the tree we need to go back
+        if (nextColIdx == leaves.length) { // reached the end of the record => close all levels
+          levelToClose[i][nextRepLevel] = 0;
+        } else if (leafColumnIO.isLast(nextRepLevel)) { // reached the end of this level => close the repetition level
+          ColumnIO parent = leafColumnIO.getParent(nextRepLevel);
+          levelToClose[i][nextRepLevel] = parent.getFieldPath().length - 1;
+        } else { // otherwise close until the next common parent
+          levelToClose[i][nextRepLevel] = getCommonParentLevel(
+              leafColumnIO.getFieldPath(),
+              leaves[nextColIdx].getFieldPath());
+        }
+        // sanity check: that would be a bug
+        if (levelToClose[i][nextRepLevel] > leaves[i].getFieldPath().length-1) {
+          throw new ParquetEncodingException(Arrays.toString(leaves[i].getFieldPath())+" -("+nextRepLevel+")-> "+levelToClose[i][nextRepLevel]);
+        }
+        nextColumnIdxForRepLevel[i][nextRepLevel] = nextColIdx;
+      }
+    }
+    states = new State[leaves.length];
+    for (int i = 0; i < leaves.length; i++) {
+      states[i] = new State(i, leaves[i], columnReaders[i], levelToClose[i], groupConverterPaths[i], leafConverters[i]);
+
+      int[] definitionLevelToDepth = new int[states[i].primitiveColumnIO.getDefinitionLevel() + 1];
+      // for each possible definition level, determine the depth at which to create groups
+      final ColumnIO[] path = states[i].primitiveColumnIO.getPath();
+      int depth = 0;
+      for (int d = 0; d < definitionLevelToDepth.length; ++d) {
+        while (depth < (states[i].fieldPath.length - 1)
+          && d >= path[depth + 1].getDefinitionLevel()
+          ) {
+          ++ depth;
+        }
+        definitionLevelToDepth[d] = depth - 1;
+      }
+      states[i].definitionLevelToDepth = definitionLevelToDepth;
+    }
+    for (int i = 0; i < leaves.length; i++) {
+      State state = states[i];
+      int[] nextStateIds = nextColumnIdxForRepLevel[i];
+      state.nextState = new State[nextStateIds.length];
+      for (int j = 0; j < nextStateIds.length; j++) {
+        state.nextState[j] = nextStateIds[j] == states.length ? null : states[nextStateIds[j]];
+      }
+    }
+    for (int i = 0; i < states.length; i++) {
+      State state = states[i];
+      final Map<Case, Case> definedCases = new HashMap<Case, Case>();
+      final Map<Case, Case> undefinedCases = new HashMap<Case, Case>();
+      Case[][][] caseLookup = new Case[state.fieldPath.length][][];
+      for (int currentLevel = 0; currentLevel < state.fieldPath.length; ++ currentLevel) {
+        caseLookup[currentLevel] = new Case[state.maxDefinitionLevel+1][];
+        for (int d = 0; d <= state.maxDefinitionLevel; ++ d) {
+          caseLookup[currentLevel][d] = new Case[state.maxRepetitionLevel+1];
+          for (int nextR = 0; nextR <= state.maxRepetitionLevel; ++ nextR) {
+            int caseStartLevel = currentLevel;
+            int caseDepth = Math.max(state.getDepth(d), caseStartLevel - 1);
+            int caseNextLevel = Math.min(state.nextLevel[nextR], caseDepth + 1);
+            Case currentCase = new Case(caseStartLevel, caseDepth, caseNextLevel, getNextReader(state.id, nextR), d == state.maxDefinitionLevel);
+            Map<Case, Case> cases = currentCase.isDefined() ? definedCases : undefinedCases;
+            if (!cases.containsKey(currentCase)) {
+              currentCase.setID(cases.size());
+              cases.put(currentCase, currentCase);
+            } else {
+              currentCase = cases.get(currentCase);
+            }
+            caseLookup[currentLevel][d][nextR] = currentCase;
+          }
+        }
+      }
+      state.caseLookup = caseLookup;
+      state.definedCases = new ArrayList<Case>(definedCases.values());
+      state.undefinedCases = new ArrayList<Case>(undefinedCases.values());
+      Comparator<Case> caseComparator = new Comparator<Case>() {
+        @Override
+        public int compare(Case o1, Case o2) {
+          return o1.id - o2.id;
+        }
+      };
+      Collections.sort(state.definedCases, caseComparator);
+      Collections.sort(state.undefinedCases, caseComparator);
+    }
+  }
+
+  //TODO: have those wrappers for a converter
+  private RecordConsumer validator(RecordConsumer recordConsumer, boolean validating, MessageType schema) {
+    return validating ? new ValidatingRecordConsumer(recordConsumer, schema) : recordConsumer;
+  }
+
+  private RecordConsumer wrap(RecordConsumer recordConsumer) {
+    if (Log.DEBUG) {
+      return new RecordConsumerLoggingWrapper(recordConsumer);
+    }
+    return recordConsumer;
+  }
+
+  /**
+   * @see org.apache.parquet.io.RecordReader#read()
+   */
+  @Override
+  public T read() {
+    int currentLevel = 0;
+    recordRootConverter.start();
+    State currentState = states[0];
+    do {
+      ColumnReader columnReader = currentState.column;
+      int d = columnReader.getCurrentDefinitionLevel();
+      // creating needed nested groups until the current field (opening tags)
+      int depth = currentState.definitionLevelToDepth[d];
+      for (; currentLevel <= depth; ++currentLevel) {
+        currentState.groupConverterPath[currentLevel].start();
+      }
+      // currentLevel = depth + 1 at this point
+      // set the current value
+      if (d >= currentState.maxDefinitionLevel) {
+        // not null
+        columnReader.writeCurrentValueToConverter();
+      }
+      columnReader.consume();
+
+      int nextR = currentState.maxRepetitionLevel == 0 ? 0 : columnReader.getCurrentRepetitionLevel();
+      // level to go to close current groups
+      int next = currentState.nextLevel[nextR];
+      for (; currentLevel > next; currentLevel--) {
+        currentState.groupConverterPath[currentLevel - 1].end();
+      }
+
+      currentState = currentState.nextState[nextR];
+    } while (currentState != null);
+    recordRootConverter.end();
+    T record = recordMaterializer.getCurrentRecord();
+    shouldSkipCurrentRecord = record == null;
+    if (shouldSkipCurrentRecord) {
+      recordMaterializer.skipCurrentRecord();
+    }
+    return record;
+  }
+
+  @Override
+  public boolean shouldSkipCurrentRecord() {
+    return shouldSkipCurrentRecord;
+  }
+
+  private static void log(String string) {
+    LOG.debug(string);
+  }
+
+  int getNextReader(int current, int nextRepetitionLevel) {
+    State nextState = states[current].nextState[nextRepetitionLevel];
+    return nextState == null ? states.length : nextState.id;
+  }
+
+  int getNextLevel(int current, int nextRepetitionLevel) {
+    return states[current].nextLevel[nextRepetitionLevel];
+  }
+
+  private int getCommonParentLevel(String[] previous, String[] next) {
+    int i = 0;
+    while (i < Math.min(previous.length, next.length) && previous[i].equals(next[i])) {
+      ++i;
+    }
+    return i;
+  }
+
+  protected int getStateCount() {
+    return states.length;
+  }
+
+  protected State getState(int i) {
+    return states[i];
+  }
+
+  protected RecordMaterializer<T> getMaterializer() {
+    return recordMaterializer;
+  }
+
+  protected Converter getRecordConsumer() {
+    return recordRootConverter;
+  }
+
+  protected Iterable<ColumnReader> getColumnReaders() {
+    // Converting the array to an iterable ensures that the array cannot be altered
+    return Arrays.asList(columnReaders);
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/io/ValidatingRecordConsumer.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/io/ValidatingRecordConsumer.java b/parquet-column/src/main/java/org/apache/parquet/io/ValidatingRecordConsumer.java
new file mode 100644
index 0000000..e1d3ba7
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/io/ValidatingRecordConsumer.java
@@ -0,0 +1,230 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.io;
+
+import java.util.ArrayDeque;
+import java.util.Arrays;
+import java.util.Deque;
+
+import org.apache.parquet.Log;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.io.api.RecordConsumer;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.Type;
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
+import org.apache.parquet.schema.Type.Repetition;
+
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.*;
+
+/**
+ * Wraps a record consumer
+ * Validates the record written against the schema and pass down the event to the wrapped consumer
+ *
+ * @author Julien Le Dem
+ *
+ */
+public class ValidatingRecordConsumer extends RecordConsumer {
+  private static final Log LOG = Log.getLog(ValidatingRecordConsumer.class);
+  private static final boolean DEBUG = Log.DEBUG;
+
+  private final RecordConsumer delegate;
+
+  private Deque<Type> types = new ArrayDeque<Type>();
+  private Deque<Integer> fields = new ArrayDeque<Integer>();
+  private Deque<Integer> previousField = new ArrayDeque<Integer>();
+  private Deque<Integer> fieldValueCount = new ArrayDeque<Integer>();
+
+  /**
+   *
+   * @param delegate the consumer to pass down the event to
+   * @param schema the schema to validate against
+   */
+  public ValidatingRecordConsumer(RecordConsumer delegate, MessageType schema) {
+    this.delegate = delegate;
+    this.types.push(schema);
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  public void startMessage() {
+    previousField.push(-1);
+    delegate.startMessage();
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  public void endMessage() {
+    delegate.endMessage();
+    validateMissingFields(types.peek().asGroupType().getFieldCount());
+    previousField.pop();
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  public void startField(String field, int index) {
+    if (index <= previousField.peek()) {
+      throw new InvalidRecordException("fields must be added in order " + field + " index " + index + " is before previous field " + previousField.peek());
+    }
+    validateMissingFields(index);
+    fields.push(index);
+    fieldValueCount.push(0);
+    delegate.startField(field, index);
+  }
+
+  private void validateMissingFields(int index) {
+    for (int i = previousField.peek() + 1; i < index; i++) {
+      Type type = types.peek().asGroupType().getType(i);
+      if (type.isRepetition(Repetition.REQUIRED)) {
+        throw new InvalidRecordException("required field is missing " + type);
+      }
+    }
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  public void endField(String field, int index) {
+    delegate.endField(field, index);
+    fieldValueCount.pop();
+    previousField.push(fields.pop());
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  public void startGroup() {
+    previousField.push(-1);
+    types.push(types.peek().asGroupType().getType(fields.peek()));
+    delegate.startGroup();
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  public void endGroup() {
+    delegate.endGroup();
+    validateMissingFields(types.peek().asGroupType().getFieldCount());
+    types.pop();
+    previousField.pop();
+  }
+
+  private void validate(PrimitiveTypeName p) {
+    Type currentType = types.peek().asGroupType().getType(fields.peek());
+    int c = fieldValueCount.pop() + 1;
+    fieldValueCount.push(c);
+    if (DEBUG) LOG.debug("validate " + p + " for " + currentType.getName());
+    switch (currentType.getRepetition()) {
+      case OPTIONAL:
+      case REQUIRED:
+        if (c > 1) {
+          throw new InvalidRecordException("repeated value when the type is not repeated in " + currentType);
+        }
+        break;
+      case REPEATED:
+        break;
+      default:
+        throw new InvalidRecordException("unknown repetition " + currentType.getRepetition() + " in " + currentType);
+    }
+    if (!currentType.isPrimitive() || currentType.asPrimitiveType().getPrimitiveTypeName() != p) {
+      throw new InvalidRecordException("expected type " + p + " but got "+ currentType);
+    }
+  }
+
+  private void validate(PrimitiveTypeName... ptypes) {
+    Type currentType = types.peek().asGroupType().getType(fields.peek());
+    int c = fieldValueCount.pop() + 1;
+    fieldValueCount.push(c);
+    if (DEBUG) LOG.debug("validate " + Arrays.toString(ptypes) + " for " + currentType.getName());
+    switch (currentType.getRepetition()) {
+      case OPTIONAL:
+      case REQUIRED:
+        if (c > 1) {
+          throw new InvalidRecordException("repeated value when the type is not repeated in " + currentType);
+        }
+        break;
+      case REPEATED:
+        break;
+      default:
+        throw new InvalidRecordException("unknown repetition " + currentType.getRepetition() + " in " + currentType);
+    }
+    if (!currentType.isPrimitive()) {
+      throw new InvalidRecordException(
+          "expected type in " + Arrays.toString(ptypes) + " but got " + currentType);
+    }
+    for (PrimitiveTypeName p : ptypes) {
+      if (currentType.asPrimitiveType().getPrimitiveTypeName() == p) {
+        return; // type is valid
+      }
+    }
+    throw new InvalidRecordException(
+        "expected type in " + Arrays.toString(ptypes) + " but got " + currentType);
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  public void addInteger(int value) {
+    validate(INT32);
+    delegate.addInteger(value);
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  public void addLong(long value) {
+    validate(INT64);
+    delegate.addLong(value);
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  public void addBoolean(boolean value) {
+    validate(BOOLEAN);
+    delegate.addBoolean(value);
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  public void addBinary(Binary value) {
+    validate(BINARY, INT96, FIXED_LEN_BYTE_ARRAY);
+    delegate.addBinary(value);
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  public void addFloat(float value) {
+    validate(FLOAT);
+    delegate.addFloat(value);
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  public void addDouble(double value) {
+    validate(DOUBLE);
+    delegate.addDouble(value);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/io/api/Binary.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/io/api/Binary.java b/parquet-column/src/main/java/org/apache/parquet/io/api/Binary.java
new file mode 100644
index 0000000..d3abec5
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/io/api/Binary.java
@@ -0,0 +1,413 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.io.api;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.ObjectStreamException;
+import java.io.OutputStream;
+import java.io.Serializable;
+import java.io.UnsupportedEncodingException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+
+import org.apache.parquet.io.ParquetEncodingException;
+
+import static org.apache.parquet.bytes.BytesUtils.UTF8;
+
+abstract public class Binary implements Comparable<Binary>, Serializable {
+
+  // this isn't really something others should extend
+  private Binary() { }
+
+  public static final Binary EMPTY = fromByteArray(new byte[0]);
+
+  abstract public String toStringUsingUTF8();
+
+  abstract public int length();
+
+  abstract public void writeTo(OutputStream out) throws IOException;
+
+  abstract public void writeTo(DataOutput out) throws IOException;
+
+  abstract public byte[] getBytes();
+
+  abstract boolean equals(byte[] bytes, int offset, int length);
+
+  abstract boolean equals(Binary other);
+
+  abstract public int compareTo(Binary other);
+
+  abstract int compareTo(byte[] bytes, int offset, int length);
+
+  abstract public ByteBuffer toByteBuffer();
+
+  @Override
+  public boolean equals(Object obj) {
+    if (obj == null) {
+      return false;
+    }
+    if (obj instanceof Binary) {
+      return equals((Binary)obj);
+    }
+    return false;
+  }
+
+  @Override
+  public String toString() {
+    return "Binary{" + length() + " bytes, " + Arrays.toString(getBytes()) + "}";
+  }
+
+  private static class ByteArraySliceBackedBinary extends Binary {
+    private final byte[] value;
+    private final int offset;
+    private final int length;
+
+    public ByteArraySliceBackedBinary(byte[] value, int offset, int length) {
+      this.value = value;
+      this.offset = offset;
+      this.length = length;
+    }
+
+    @Override
+    public String toStringUsingUTF8() {
+      return UTF8.decode(ByteBuffer.wrap(value, offset, length)).toString();
+      // TODO: figure out why the following line was much slower
+      // rdb: new String(...) is slower because it instantiates a new Decoder,
+      //      while Charset#decode uses a thread-local decoder cache
+      // return new String(value, offset, length, BytesUtils.UTF8);
+    }
+
+    @Override
+    public int length() {
+      return length;
+    }
+
+    @Override
+    public void writeTo(OutputStream out) throws IOException {
+      out.write(value, offset, length);
+    }
+
+    @Override
+    public byte[] getBytes() {
+      return Arrays.copyOfRange(value, offset, offset + length);
+    }
+
+    @Override
+    public int hashCode() {
+      return Binary.hashCode(value, offset, length);
+    }
+
+    @Override
+    boolean equals(Binary other) {
+      return other.equals(value, offset, length);
+    }
+
+    @Override
+    boolean equals(byte[] other, int otherOffset, int otherLength) {
+      return Binary.equals(value, offset, length, other, otherOffset, otherLength);
+    }
+
+    @Override
+    public int compareTo(Binary other) {
+      return other.compareTo(value, offset, length);
+    }
+
+    @Override
+    int compareTo(byte[] other, int otherOffset, int otherLength) {
+      return Binary.compareTwoByteArrays(value, offset, length, other, otherOffset, otherLength);
+    }
+
+    @Override
+    public ByteBuffer toByteBuffer() {
+      return ByteBuffer.wrap(value, offset, length);
+    }
+
+    @Override
+    public void writeTo(DataOutput out) throws IOException {
+      out.write(value, offset, length);
+    }
+
+  }
+
+  private static class FromStringBinary extends ByteArrayBackedBinary {
+    public FromStringBinary(byte[] value) {
+      super(value);
+    }
+
+    @Override
+    public String toString() {
+      return "Binary{\"" + toStringUsingUTF8() + "\"}";
+    }
+  }
+
+  public static Binary fromByteArray(final byte[] value, final int offset, final int length) {
+    return new ByteArraySliceBackedBinary(value, offset, length);
+  }
+
+  private static class ByteArrayBackedBinary extends Binary {
+    private final byte[] value;
+
+    public ByteArrayBackedBinary(byte[] value) {
+      this.value = value;
+    }
+
+    @Override
+    public String toStringUsingUTF8() {
+      return UTF8.decode(ByteBuffer.wrap(value)).toString();
+    }
+
+    @Override
+    public int length() {
+      return value.length;
+    }
+
+    @Override
+    public void writeTo(OutputStream out) throws IOException {
+      out.write(value);
+    }
+
+    @Override
+    public byte[] getBytes() {
+      return value;
+    }
+
+    @Override
+    public int hashCode() {
+      return Binary.hashCode(value, 0, value.length);
+    }
+
+    @Override
+    boolean equals(Binary other) {
+      return other.equals(value, 0, value.length);
+    }
+
+    @Override
+    boolean equals(byte[] other, int otherOffset, int otherLength) {
+      return Binary.equals(value, 0, value.length, other, otherOffset, otherLength);
+    }
+
+    @Override
+    public int compareTo(Binary other) {
+      return other.compareTo(value, 0, value.length);
+    }
+
+    @Override
+    int compareTo(byte[] other, int otherOffset, int otherLength) {
+      return Binary.compareTwoByteArrays(value, 0, value.length, other, otherOffset, otherLength);
+    }
+
+    @Override
+    public ByteBuffer toByteBuffer() {
+      return ByteBuffer.wrap(value);
+    }
+
+    @Override
+    public void writeTo(DataOutput out) throws IOException {
+      out.write(value);
+    }
+
+  }
+
+  public static Binary fromByteArray(final byte[] value) {
+    return new ByteArrayBackedBinary(value);
+  }
+
+  private static class ByteBufferBackedBinary extends Binary {
+    private transient ByteBuffer value;
+
+    public ByteBufferBackedBinary(ByteBuffer value) {
+      this.value = value;
+    }
+
+    @Override
+    public String toStringUsingUTF8() {
+      return UTF8.decode(value).toString();
+    }
+
+    @Override
+    public int length() {
+      return value.remaining();
+    }
+
+    @Override
+    public void writeTo(OutputStream out) throws IOException {
+      // TODO: should not have to materialize those bytes
+      out.write(getBytes());
+    }
+
+    @Override
+    public byte[] getBytes() {
+      byte[] bytes = new byte[value.remaining()];
+
+      value.mark();
+      value.get(bytes).reset();
+      return bytes;
+    }
+
+    @Override
+    public int hashCode() {
+      if (value.hasArray()) {
+        return Binary.hashCode(value.array(), value.arrayOffset() + value.position(),
+            value.arrayOffset() + value.remaining());
+      }
+      byte[] bytes = getBytes();
+      return Binary.hashCode(bytes, 0, bytes.length);
+    }
+
+    @Override
+    boolean equals(Binary other) {
+      if (value.hasArray()) {
+        return other.equals(value.array(), value.arrayOffset() + value.position(),
+            value.arrayOffset() + value.remaining());
+      }
+      byte[] bytes = getBytes();
+      return other.equals(bytes, 0, bytes.length);
+    }
+
+    @Override
+    boolean equals(byte[] other, int otherOffset, int otherLength) {
+      if (value.hasArray()) {
+        return Binary.equals(value.array(), value.arrayOffset() + value.position(),
+            value.arrayOffset() + value.remaining(), other, otherOffset, otherLength);
+      }
+      byte[] bytes = getBytes();
+      return Binary.equals(bytes, 0, bytes.length, other, otherOffset, otherLength);
+    }
+
+    @Override
+    public int compareTo(Binary other) {
+      if (value.hasArray()) {
+        return other.compareTo(value.array(), value.arrayOffset() + value.position(),
+            value.arrayOffset() + value.remaining());
+      }
+      byte[] bytes = getBytes();
+      return other.compareTo(bytes, 0, bytes.length);
+    }
+
+    @Override
+    int compareTo(byte[] other, int otherOffset, int otherLength) {
+      if (value.hasArray()) {
+        return Binary.compareTwoByteArrays(value.array(), value.arrayOffset() + value.position(),
+            value.arrayOffset() + value.remaining(), other, otherOffset, otherLength);
+      }
+      byte[] bytes = getBytes();
+      return Binary.compareTwoByteArrays(bytes, 0, bytes.length, other, otherOffset, otherLength);
+    }
+
+    @Override
+    public ByteBuffer toByteBuffer() {
+      return value;
+    }
+
+    @Override
+    public void writeTo(DataOutput out) throws IOException {
+      // TODO: should not have to materialize those bytes
+      out.write(getBytes());
+    }
+
+    private void writeObject(java.io.ObjectOutputStream out) throws IOException {
+      byte[] bytes = getBytes();
+      out.writeInt(bytes.length);
+      out.write(bytes);
+    }
+
+    private void readObject(java.io.ObjectInputStream in) throws IOException, ClassNotFoundException {
+      int length = in.readInt();
+      byte[] bytes = new byte[length];
+      in.readFully(bytes, 0, length);
+      this.value = ByteBuffer.wrap(bytes);
+    }
+
+    private void readObjectNoData() throws ObjectStreamException {
+      this.value = ByteBuffer.wrap(new byte[0]);
+    }
+
+  }
+
+  public static Binary fromByteBuffer(final ByteBuffer value) {
+    return new ByteBufferBackedBinary(value);
+  }
+
+  public static Binary fromString(final String value) {
+    try {
+      return new FromStringBinary(value.getBytes("UTF-8"));
+    } catch (UnsupportedEncodingException e) {
+      throw new ParquetEncodingException("UTF-8 not supported.", e);
+    }
+  }
+
+  /**
+   * @see {@link Arrays#hashCode(byte[])}
+   * @param array
+   * @param offset
+   * @param length
+   * @return
+   */
+  private static final int hashCode(byte[] array, int offset, int length) {
+    int result = 1;
+    for (int i = offset; i < offset + length; i++) {
+      byte b = array[i];
+      result = 31 * result + b;
+    }
+    return result;
+  }
+
+  /**
+   * @see {@link Arrays#equals(byte[], byte[])}
+   * @param array1
+   * @param offset1
+   * @param length1
+   * @param array2
+   * @param offset2
+   * @param length2
+   * @return
+   */
+  private static final boolean equals(byte[] array1, int offset1, int length1, byte[] array2, int offset2, int length2) {
+    if (array1 == null && array2 == null) return true;
+    if (array1 == null || array2 == null) return false;
+    if (length1 != length2) return false;
+    if (array1 == array2 && offset1 == offset2) return true;
+    for (int i = 0; i < length1; i++) {
+      if (array1[i + offset1] != array2[i + offset2]) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  private static final int compareTwoByteArrays(byte[] array1, int offset1, int length1,
+                                                byte[] array2, int offset2, int length2) {
+    if (array1 == null && array2 == null) return 0;
+    if (array1 == array2 && offset1 == offset2 && length1 == length2) return 0;
+    int min_length = (length1 < length2) ? length1 : length2;
+    for (int i = 0; i < min_length; i++) {
+      if (array1[i + offset1] < array2[i + offset2]) {
+        return 1;
+      }
+      if (array1[i + offset1] > array2[i + offset2]) {
+        return -1;
+      }
+    }
+    // check remainder
+    if (length1 == length2) { return 0; }
+    else if (length1 < length2) { return 1;}
+    else { return -1; }
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/io/api/Converter.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/io/api/Converter.java b/parquet-column/src/main/java/org/apache/parquet/io/api/Converter.java
new file mode 100644
index 0000000..648e245
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/io/api/Converter.java
@@ -0,0 +1,40 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.io.api;
+
+/**
+ * Represent a tree of converters
+ * that materializes tuples
+ *
+ * @author Julien Le Dem
+ *
+ */
+public abstract class Converter {
+
+  abstract public boolean isPrimitive();
+
+  public PrimitiveConverter asPrimitiveConverter() {
+    throw new ClassCastException("Expected instance of primitive converter but got \"" + getClass().getName() + "\"");
+  }
+
+  public GroupConverter asGroupConverter() {
+    throw new ClassCastException("Expected instance of group converter but got \"" + getClass().getName() + "\"");
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/io/api/GroupConverter.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/io/api/GroupConverter.java b/parquet-column/src/main/java/org/apache/parquet/io/api/GroupConverter.java
new file mode 100644
index 0000000..823e388
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/io/api/GroupConverter.java
@@ -0,0 +1,58 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.io.api;
+
+
+/**
+ * converter for group nodes
+ *
+ * @author Julien Le Dem
+ *
+ */
+abstract public class GroupConverter extends Converter {
+
+  @Override
+  public boolean isPrimitive() {
+    return false;
+  }
+
+  @Override
+  public GroupConverter asGroupConverter() {
+    return this;
+  }
+
+  /**
+   * called at initialization based on schema
+   * must consistently return the same object
+   * @param fieldIndex index of the field in this group
+   * @return the corresponding converter
+   */
+  abstract public Converter getConverter(int fieldIndex);
+
+  /** runtime calls  **/
+
+  /** called at the beginning of the group managed by this converter */
+  abstract public void start();
+
+  /**
+   * call at the end of the group
+   */
+  abstract public void end();
+
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/io/api/PrimitiveConverter.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/io/api/PrimitiveConverter.java b/parquet-column/src/main/java/org/apache/parquet/io/api/PrimitiveConverter.java
new file mode 100644
index 0000000..763c6fd
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/io/api/PrimitiveConverter.java
@@ -0,0 +1,111 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.io.api;
+
+import org.apache.parquet.column.Dictionary;
+
+/**
+ * converter for leaves of the schema
+ *
+ * @author Julien Le Dem
+ *
+ */
+abstract public class PrimitiveConverter extends Converter {
+
+  @Override
+  public boolean isPrimitive() {
+    return true;
+  }
+
+  @Override
+  public PrimitiveConverter asPrimitiveConverter() {
+    return this;
+  }
+
+  /**
+   * if it returns true we will attempt to use dictionary based conversion instead
+   * @return if dictionary is supported
+   */
+  public boolean hasDictionarySupport() {
+    return false;
+  }
+
+  /**
+   * Set the dictionary to use if the data was encoded using dictionary encoding
+   * and the converter hasDictionarySupport().
+   * @param dictionary the dictionary to use for conversion
+   */
+  public void setDictionary(Dictionary dictionary) {
+    throw new UnsupportedOperationException(getClass().getName());
+  }
+
+  /** runtime calls  **/
+
+  /**
+   * add a value based on the dictionary set with setDictionary()
+   * Will be used if the Converter has dictionary support and the data was encoded using a dictionary
+   * @param dictionaryId the id in the dictionary of the value to add
+   */
+  public void addValueFromDictionary(int dictionaryId) {
+    throw new UnsupportedOperationException(getClass().getName());
+  }
+
+  /**
+   * @param value value to set
+   */
+  public void addBinary(Binary value) {
+    throw new UnsupportedOperationException(getClass().getName());
+  }
+
+  /**
+   * @param value value to set
+   */
+  public void addBoolean(boolean value) {
+    throw new UnsupportedOperationException(getClass().getName());
+  }
+
+  /**
+   * @param value value to set
+   */
+  public void addDouble(double value) {
+    throw new UnsupportedOperationException(getClass().getName());
+  }
+
+  /**
+   * @param value value to set
+   */
+  public void addFloat(float value) {
+    throw new UnsupportedOperationException(getClass().getName());
+  }
+
+  /**
+   * @param value value to set
+   */
+  public void addInt(int value) {
+    throw new UnsupportedOperationException(getClass().getName());
+  }
+
+  /**
+   * @param value value to set
+   */
+  public void addLong(long value) {
+    throw new UnsupportedOperationException(getClass().getName());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/io/api/RecordConsumer.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/io/api/RecordConsumer.java b/parquet-column/src/main/java/org/apache/parquet/io/api/RecordConsumer.java
new file mode 100644
index 0000000..953d87a
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/io/api/RecordConsumer.java
@@ -0,0 +1,128 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.io.api;
+
+
+/**
+ *
+ * Abstraction for writing records
+ * It decouples the striping algorithm from the actual record model
+ * example:
+ * <pre>
+ * startMessage()
+ *  startField("A", 0)
+ *   addValue(1)
+ *   addValue(2)
+ *  endField("A", 0)
+ *  startField("B", 1)
+ *   startGroup()
+ *    startField("C", 0)
+ *     addValue(3)
+ *    endField("C", 0)
+ *   endGroup()
+ *  endField("B", 1)
+ * endMessage()
+ * </pre>
+ *
+ * would produce the following message:
+ * <pre>
+ * {
+ *   A: [1, 2]
+ *   B: {
+ *     C: 3
+ *   }
+ * }
+ * </pre>
+ * @author Julien Le Dem
+ *
+ */
+abstract public class RecordConsumer {
+
+  /**
+   * start a new record
+   */
+  abstract public void startMessage();
+
+  /**
+   * end of a record
+   */
+  abstract public void endMessage();
+
+  /**
+   * start of a field in a group or message
+   * if the field is repeated the field is started only once and all values added in between start and end
+   * @param field name of the field
+   * @param index of the field in the group or message
+   */
+  abstract public void startField(String field, int index);
+
+  /**
+   * end of a field in a group or message
+   * @param field name of the field
+   * @param index of the field in the group or message
+   */
+  abstract public void endField(String field, int index);
+
+  /**
+   * start of a group in a field
+   */
+  abstract public void startGroup();
+
+  /**
+   * end of a group in a field
+   */
+  abstract public void endGroup();
+
+  /**
+   * add an int value in the current field
+   * @param value
+   */
+  abstract public void addInteger(int value);
+
+  /**
+   * add a long value in the current field
+   * @param value
+   */
+  abstract public void addLong(long value);
+
+  /**
+   * add a boolean value in the current field
+   * @param value
+   */
+  abstract public void addBoolean(boolean value);
+
+  /**
+   * add a binary value in the current field
+   * @param value
+   */
+  abstract public void addBinary(Binary value);
+
+  /**
+   * add a float value in the current field
+   * @param value
+   */
+  abstract public void addFloat(float value);
+
+  /**
+   * add a double value in the current field
+   * @param value
+   */
+  abstract public void addDouble(double value);
+
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/io/api/RecordMaterializer.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/io/api/RecordMaterializer.java b/parquet-column/src/main/java/org/apache/parquet/io/api/RecordMaterializer.java
new file mode 100644
index 0000000..98e4d50
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/io/api/RecordMaterializer.java
@@ -0,0 +1,48 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.io.api;
+
+/**
+ * Top-level class which should be implemented in order to materialize objects from
+ * a stream of Parquet data.
+ * 
+ * Each record will be wrapped by {@link GroupConverter#start()} and {@link GroupConverter#end()},
+ * between which the appropriate fields will be materialized.
+ *
+ * @author Julien Le Dem
+ *
+ * @param <T> the materialized object class
+ */
+abstract public class RecordMaterializer<T> {
+
+  /**
+   * @return the result of the conversion
+   */
+  abstract public T getCurrentRecord();
+
+  /**
+   * Called if {@link #getCurrentRecord()} isn't going to be called.
+   */
+  public void skipCurrentRecord() { }
+
+  /**
+   * @return the root converter for this tree
+   */
+  abstract public GroupConverter getRootConverter();
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/schema/ConversionPatterns.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/schema/ConversionPatterns.java b/parquet-column/src/main/java/org/apache/parquet/schema/ConversionPatterns.java
new file mode 100644
index 0000000..0f68fc3
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/schema/ConversionPatterns.java
@@ -0,0 +1,104 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.schema;
+
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
+import org.apache.parquet.schema.Type.Repetition;
+
+import static org.apache.parquet.schema.OriginalType.*;
+
+/**
+ * Utility functions to convert from Java-like map and list types
+ * to equivalent Parquet types.
+ */
+public abstract class ConversionPatterns {
+  /**
+   * to preserve the difference between empty list and null when optional
+   *
+   * @param repetition
+   * @param alias        name of the field
+   * @param originalType
+   * @param nested       the nested repeated field
+   * @return a group type
+   */
+  private static GroupType listWrapper(Repetition repetition, String alias, OriginalType originalType, Type nested) {
+    if (!nested.isRepetition(Repetition.REPEATED)) {
+      throw new IllegalArgumentException("Nested type should be repeated: " + nested);
+    }
+    return new GroupType(repetition, alias, originalType, nested);
+  }
+
+  public static GroupType mapType(Repetition repetition, String alias, Type keyType, Type valueType) {
+    return mapType(repetition, alias, "map", keyType, valueType);
+  }
+
+  public static GroupType stringKeyMapType(Repetition repetition, String alias, String mapAlias, Type valueType) {
+    return mapType(repetition, alias, mapAlias, new PrimitiveType(Repetition.REQUIRED, PrimitiveTypeName.BINARY, "key", OriginalType.UTF8), valueType);
+  }
+
+  public static GroupType stringKeyMapType(Repetition repetition, String alias, Type valueType) {
+    return stringKeyMapType(repetition, alias, "map", valueType);
+  }
+
+  public static GroupType mapType(Repetition repetition, String alias, String mapAlias, Type keyType, Type valueType) {
+    //support projection only on key of a map
+    if (valueType == null) {
+      return listWrapper(
+              repetition,
+              alias,
+              MAP,
+              new GroupType(
+                      Repetition.REPEATED,
+                      mapAlias,
+                      MAP_KEY_VALUE,
+                      keyType)
+      );
+    } else {
+      if (!valueType.getName().equals("value")) {
+        throw new RuntimeException(valueType.getName() + " should be value");
+      }
+      return listWrapper(
+              repetition,
+              alias,
+              MAP,
+              new GroupType(
+                      Repetition.REPEATED,
+                      mapAlias,
+                      MAP_KEY_VALUE,
+                      keyType,
+                      valueType)
+      );
+    }
+  }
+
+  /**
+   * @param repetition
+   * @param alias      name of the field
+   * @param nestedType
+   * @return
+   */
+  public static GroupType listType(Repetition repetition, String alias, Type nestedType) {
+    return listWrapper(
+            repetition,
+            alias,
+            LIST,
+            nestedType
+    );
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/schema/DecimalMetadata.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/schema/DecimalMetadata.java b/parquet-column/src/main/java/org/apache/parquet/schema/DecimalMetadata.java
new file mode 100644
index 0000000..446b956
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/schema/DecimalMetadata.java
@@ -0,0 +1,57 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.schema;
+
+public class DecimalMetadata {
+  private final int precision;
+  private final int scale;
+
+  public DecimalMetadata(int precision, int scale) {
+    this.precision = precision;
+    this.scale = scale;
+  }
+
+  public int getPrecision() {
+    return precision;
+  }
+
+  public int getScale() {
+    return scale;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+
+    DecimalMetadata that = (DecimalMetadata) o;
+
+    if (precision != that.precision) return false;
+    if (scale != that.scale) return false;
+
+    return true;
+  }
+
+  @Override
+  public int hashCode() {
+    int result = precision;
+    result = 31 * result + scale;
+    return result;
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/schema/GroupType.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/schema/GroupType.java b/parquet-column/src/main/java/org/apache/parquet/schema/GroupType.java
new file mode 100644
index 0000000..027fbc0
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/schema/GroupType.java
@@ -0,0 +1,391 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.schema;
+
+import static java.util.Arrays.asList;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.parquet.io.InvalidRecordException;
+
+/**
+ * Represents a group type: a list of fields
+ *
+ * @author Julien Le Dem
+ *
+ */
+public class GroupType extends Type {
+
+  private final List<Type> fields;
+  private final Map<String, Integer> indexByName;
+
+  /**
+   * @param repetition OPTIONAL, REPEATED, REQUIRED
+   * @param name the name of the field
+   * @param fields the contained fields
+   */
+  public GroupType(Repetition repetition, String name, List<Type> fields) {
+    this(repetition, name, null, fields, null);
+  }
+
+  /**
+   * @param repetition OPTIONAL, REPEATED, REQUIRED
+   * @param name the name of the field
+   * @param fields the contained fields
+   */
+  public GroupType(Repetition repetition, String name, Type... fields) {
+    this(repetition, name, Arrays.asList(fields));
+  }
+
+  /**
+   * @param repetition OPTIONAL, REPEATED, REQUIRED
+   * @param name the name of the field
+   * @param originalType (optional) the original type to help with cross schema conversion (LIST, MAP, ...)
+   * @param fields the contained fields
+   */
+  @Deprecated
+  public GroupType(Repetition repetition, String name, OriginalType originalType, Type... fields) {
+    this(repetition, name, originalType, Arrays.asList(fields));
+  }
+
+  /**
+   * @param repetition OPTIONAL, REPEATED, REQUIRED
+   * @param name the name of the field
+   * @param originalType (optional) the original type to help with cross schema conversion (LIST, MAP, ...)
+   * @param fields the contained fields
+   */
+  @Deprecated
+  public GroupType(Repetition repetition, String name, OriginalType originalType, List<Type> fields) {
+    this(repetition, name, originalType, fields, null);
+  }
+
+  /**
+   * @param repetition OPTIONAL, REPEATED, REQUIRED
+   * @param name the name of the field
+   * @param originalType (optional) the original type to help with cross schema conversion (LIST, MAP, ...)
+   * @param fields the contained fields
+   * @param id the id of the field
+   */
+  GroupType(Repetition repetition, String name, OriginalType originalType, List<Type> fields, ID id) {
+    super(name, repetition, originalType, id);
+    this.fields = fields;
+    this.indexByName = new HashMap<String, Integer>();
+    for (int i = 0; i < fields.size(); i++) {
+      indexByName.put(fields.get(i).getName(), i);
+    }
+  }
+
+  /**
+   * @param id the field id
+   * @return a new GroupType with the same fields and a new id
+   */
+  @Override
+  public GroupType withId(int id) {
+    return new GroupType(getRepetition(), getName(), getOriginalType(), fields, new ID(id));
+  }
+
+  /**
+   * @param newFields
+   * @return a group with the same attributes and new fields.
+   */
+  public GroupType withNewFields(List<Type> newFields) {
+    return new GroupType(getRepetition(), getName(), getOriginalType(), newFields, getId());
+  }
+
+  /**
+   * @param newFields
+   * @return a group with the same attributes and new fields.
+   */
+  public GroupType withNewFields(Type... newFields) {
+    return withNewFields(asList(newFields));
+  }
+
+  /**
+   * returns the name of the corresponding field
+   * @param index the index of the desired field in this type
+   * @return the name of the field at this index
+   */
+  public String getFieldName(int index) {
+    return fields.get(index).getName();
+  }
+
+  /**
+   * @param name the requested name
+   * @return whether this type contains a field with that name
+   */
+  public boolean containsField(String name) {
+    return indexByName.containsKey(name);
+  }
+
+  /**
+   *
+   * @param name
+   * @return the index of the field with that name
+   */
+  public int getFieldIndex(String name) {
+    if (!indexByName.containsKey(name)) {
+      throw new InvalidRecordException(name + " not found in " + this);
+    }
+    return indexByName.get(name);
+  }
+
+  /**
+   * @return the fields contained in this type
+   */
+  public List<Type> getFields() {
+    return fields;
+  }
+
+  /**
+   * @return the number of fields in this type
+   */
+  public int getFieldCount() {
+    return fields.size();
+  }
+
+  /**
+   * @return false
+   */
+  @Override
+  public boolean isPrimitive() {
+    return false;
+  }
+
+  /**
+   * @param fieldName
+   * @return the type of this field by name
+   */
+  public Type getType(String fieldName) {
+    return getType(getFieldIndex(fieldName));
+  }
+
+  /**
+   * @param index
+   * @return the type of this field by index
+   */
+  public Type getType(int index) {
+    return fields.get(index);
+  }
+
+  /**
+   * appends a display string for of the members of this group to sb
+   * @param sb where to append
+   * @param indent the indentation level
+   */
+  void membersDisplayString(StringBuilder sb, String indent) {
+    for (Type field : fields) {
+      field.writeToStringBuilder(sb, indent);
+      if (field.isPrimitive()) {
+        sb.append(";");
+      }
+      sb.append("\n");
+    }
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void writeToStringBuilder(StringBuilder sb, String indent) {
+    sb.append(indent)
+        .append(getRepetition().name().toLowerCase())
+        .append(" group ")
+        .append(getName())
+        .append(getOriginalType() == null ? "" : " (" + getOriginalType() +")")
+        .append(getId() == null ? "" : " = " + getId())
+        .append(" {\n");
+    membersDisplayString(sb, indent + "  ");
+    sb.append(indent)
+        .append("}");
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void accept(TypeVisitor visitor) {
+    visitor.visit(this);
+  }
+
+  @Override @Deprecated
+  protected int typeHashCode() {
+    return hashCode();
+  }
+
+  @Override @Deprecated
+  protected boolean typeEquals(Type other) {
+    return equals(other);
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public int hashCode() {
+    return super.hashCode() * 31 + getFields().hashCode();
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  protected boolean equals(Type otherType) {
+    return
+        !otherType.isPrimitive()
+        && super.equals(otherType)
+        && getFields().equals(otherType.asGroupType().getFields());
+  }
+
+  @Override
+  protected int getMaxRepetitionLevel(String[] path, int depth) {
+    int myVal = isRepetition(Repetition.REPEATED) ? 1 : 0;
+    if (depth == path.length) {
+      return myVal;
+    }
+    return myVal + getType(path[depth]).getMaxRepetitionLevel(path, depth + 1);
+  }
+
+  @Override
+  protected int getMaxDefinitionLevel(String[] path, int depth) {
+    int myVal = !isRepetition(Repetition.REQUIRED) ? 1 : 0;
+    if (depth == path.length) {
+      return myVal;
+    }
+    return myVal + getType(path[depth]).getMaxDefinitionLevel(path, depth + 1);
+  }
+
+  @Override
+  protected Type getType(String[] path, int depth) {
+    if (depth == path.length) {
+      return this;
+    }
+    return getType(path[depth]).getType(path, depth + 1);
+  }
+
+  @Override
+  protected boolean containsPath(String[] path, int depth) {
+    if (depth == path.length) {
+      return false;
+    }
+    return containsField(path[depth]) && getType(path[depth]).containsPath(path, depth + 1);
+  }
+
+  @Override
+  protected List<String[]> getPaths(int depth) {
+    List<String[]> result = new ArrayList<String[]>();
+    for (Type field : fields) {
+      List<String[]> paths = field.getPaths(depth + 1);
+      for (String[] path : paths) {
+        path[depth] = field.getName();
+        result.add(path);
+      }
+    }
+    return result;
+  }
+
+  @Override
+  void checkContains(Type subType) {
+    super.checkContains(subType);
+    checkGroupContains(subType);
+  }
+
+  void checkGroupContains(Type subType) {
+    if (subType.isPrimitive()) {
+      throw new InvalidRecordException(subType + " found: expected " + this);
+    }
+    List<Type> fields = subType.asGroupType().getFields();
+    for (Type otherType : fields) {
+      Type thisType = this.getType(otherType.getName());
+      thisType.checkContains(otherType);
+    }
+  }
+
+  @Override
+  <T> T convert(List<GroupType> path, TypeConverter<T> converter) {
+    List<GroupType> childrenPath = new ArrayList<GroupType>(path);
+    childrenPath.add(this);
+    final List<T> children = convertChildren(childrenPath, converter);
+    return converter.convertGroupType(path, this, children);
+  }
+
+  protected <T> List<T> convertChildren(List<GroupType> path, TypeConverter<T> converter) {
+    List<T> children = new ArrayList<T>(fields.size());
+    for (Type field : fields) {
+      children.add(field.convert(path, converter));
+    }
+    return children;
+  }
+
+  @Override
+  protected Type union(Type toMerge) {
+    return union(toMerge, true);
+  }
+
+  @Override
+  protected Type union(Type toMerge, boolean strict) {
+    if (toMerge.isPrimitive()) {
+      throw new IncompatibleSchemaModificationException("can not merge primitive type " + toMerge + " into group type " + this);
+    }
+    return new GroupType(toMerge.getRepetition(), getName(), mergeFields(toMerge.asGroupType()));
+  }
+
+  /**
+   * produces the list of fields resulting from merging toMerge into the fields of this
+   * @param toMerge the group containing the fields to merge
+   * @return the merged list
+   */
+  List<Type> mergeFields(GroupType toMerge) {
+    return mergeFields(toMerge, true);
+  }
+
+  /**
+   * produces the list of fields resulting from merging toMerge into the fields of this
+   * @param toMerge the group containing the fields to merge
+   * @param strict should schema primitive types match
+   * @return the merged list
+   */
+  List<Type> mergeFields(GroupType toMerge, boolean strict) {
+    List<Type> newFields = new ArrayList<Type>();
+    // merge existing fields
+    for (Type type : this.getFields()) {
+      Type merged;
+      if (toMerge.containsField(type.getName())) {
+        Type fieldToMerge = toMerge.getType(type.getName());
+        if (fieldToMerge.getRepetition().isMoreRestrictiveThan(type.getRepetition())) {
+          throw new IncompatibleSchemaModificationException("repetition constraint is more restrictive: can not merge type " + fieldToMerge + " into " + type);
+        }
+        merged = type.union(fieldToMerge, strict);
+      } else {
+        merged = type;
+      }
+      newFields.add(merged);
+    }
+    // add new fields
+    for (Type type : toMerge.getFields()) {
+      if (!this.containsField(type.getName())) {
+        newFields.add(type);
+      }
+    }
+    return newFields;
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/schema/IncompatibleSchemaModificationException.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/schema/IncompatibleSchemaModificationException.java b/parquet-column/src/main/java/org/apache/parquet/schema/IncompatibleSchemaModificationException.java
new file mode 100644
index 0000000..cde5e75
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/schema/IncompatibleSchemaModificationException.java
@@ -0,0 +1,49 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.schema;
+
+import org.apache.parquet.ParquetRuntimeException;
+
+/**
+ * thrown when we are trying to read together files with incompatible schemas.
+ *
+ * @author Julien Le Dem
+ *
+ */
+public class IncompatibleSchemaModificationException extends ParquetRuntimeException {
+  private static final long serialVersionUID = 1L;
+
+  public IncompatibleSchemaModificationException() {
+    super();
+  }
+
+  public IncompatibleSchemaModificationException(String message,
+      Throwable cause) {
+    super(message, cause);
+  }
+
+  public IncompatibleSchemaModificationException(String message) {
+    super(message);
+  }
+
+  public IncompatibleSchemaModificationException(Throwable cause) {
+    super(cause);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/schema/MessageType.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/schema/MessageType.java b/parquet-column/src/main/java/org/apache/parquet/schema/MessageType.java
new file mode 100644
index 0000000..1e26ed2
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/schema/MessageType.java
@@ -0,0 +1,148 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.schema;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.io.InvalidRecordException;
+
+/**
+ * The root of a schema
+ *
+ * @author Julien Le Dem
+ *
+ */
+public final class MessageType extends GroupType {
+
+  /**
+   *
+   * @param name the name of the type
+   * @param fields the fields contained by this message
+   */
+  public MessageType(String name, Type... fields) {
+    super(Repetition.REPEATED, name, fields);
+  }
+
+ /**
+  *
+  * @param name the name of the type
+  * @param fields the fields contained by this message
+  */
+ public MessageType(String name, List<Type> fields) {
+   super(Repetition.REPEATED, name, fields);
+ }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void accept(TypeVisitor visitor) {
+    visitor.visit(this);
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void writeToStringBuilder(StringBuilder sb, String indent) {
+    sb.append("message ")
+        .append(getName())
+        .append(getOriginalType() == null ? "" : " (" + getOriginalType() +")")
+        .append(" {\n");
+    membersDisplayString(sb, "  ");
+    sb.append("}\n");
+  }
+
+  /**
+   * @return the max repetition level that might be needed to encode the
+   * type at 'path'.
+   */
+  public int getMaxRepetitionLevel(String ... path) {
+    return getMaxRepetitionLevel(path, 0) - 1;
+  }
+
+  /**
+   * @return the max repetition level that might be needed to encode the
+   * type at 'path'.
+   */
+  public int getMaxDefinitionLevel(String ... path) {
+    return getMaxDefinitionLevel(path, 0) - 1;
+  }
+
+  public Type getType(String ... path) {
+    return getType(path, 0);
+  }
+
+  public ColumnDescriptor getColumnDescription(String[] path) {
+    int maxRep = getMaxRepetitionLevel(path);
+    int maxDef = getMaxDefinitionLevel(path);
+    PrimitiveType type = getType(path).asPrimitiveType();
+    return new ColumnDescriptor(path, type.getPrimitiveTypeName(),
+                                type.getTypeLength(), maxRep, maxDef);
+  }
+
+  public List<String[]> getPaths() {
+    return this.getPaths(0);
+  }
+
+  public List<ColumnDescriptor> getColumns() {
+    List<String[]> paths = this.getPaths(0);
+    List<ColumnDescriptor> columns = new ArrayList<ColumnDescriptor>(paths.size());
+    for (String[] path : paths) {
+      // TODO: optimize this
+      PrimitiveType primitiveType = getType(path).asPrimitiveType();
+      columns.add(new ColumnDescriptor(
+                      path,
+                      primitiveType.getPrimitiveTypeName(),
+                      primitiveType.getTypeLength(),
+                      getMaxRepetitionLevel(path),
+                      getMaxDefinitionLevel(path)));
+    }
+    return columns;
+  }
+
+  @Override
+  public void checkContains(Type subType) {
+    if (!(subType instanceof MessageType)) {
+      throw new InvalidRecordException(subType + " found: expected " + this);
+    }
+    checkGroupContains(subType);
+  }
+
+  public <T> T convertWith(TypeConverter<T> converter) {
+    final ArrayList<GroupType> path = new ArrayList<GroupType>();
+    path.add(this);
+    return converter.convertMessageType(this, convertChildren(path, converter));
+  }
+
+  public boolean containsPath(String[] path) {
+    return containsPath(path, 0);
+  }
+
+  public MessageType union(MessageType toMerge) {
+    return union(toMerge, true);
+  }
+
+  public MessageType union(MessageType toMerge, boolean strict) {
+    return new MessageType(this.getName(), mergeFields(toMerge, strict));
+  }
+
+}