You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by dv...@apache.org on 2014/08/30 02:33:38 UTC

git commit: PARQUET-79: add a streaming Thrift API, to enable processing the metadata as we read it and skipping unnecessary fields.

Repository: incubator-parquet-format
Updated Branches:
  refs/heads/master fcba8d2e1 -> addbbb914


PARQUET-79: add a streaming Thrift API, to enable processing the metadata as we read it and skipping unnecessary fields.

This pull request provides an API to read thrift in a streaming fashion.
This enables ignoring fields that are not needed without loading them into memory.
It also aloow treating the data as it comes instead of when it's fully loaded in memory.

Author: julien <ju...@twitter.com>

Closes #8 from julienledem/streaming_metadata and squashes the following commits:

621769a [julien] cleanup refactoring
a58913d [julien] rename add to consume
e5c78fc [julien] #simplify
cb386ce [julien] RIP TypedConsumerProvider, @tsdeng did not like you
8dd801e [julien] Merge branch 'master' into streaming_metadata
958726f [julien] javadoc; fix apis
9be786a [julien] added simple readMetaData method
bee937a [julien] refactor, cleanup
6368bdc [julien] streaming thrift reader
71c85de [julien] first stab


Project: http://git-wip-us.apache.org/repos/asf/incubator-parquet-format/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-parquet-format/commit/addbbb91
Tree: http://git-wip-us.apache.org/repos/asf/incubator-parquet-format/tree/addbbb91
Diff: http://git-wip-us.apache.org/repos/asf/incubator-parquet-format/diff/addbbb91

Branch: refs/heads/master
Commit: addbbb914a74f4ccf64d4e3a84fb92b035e2ce62
Parents: fcba8d2
Author: julien <ju...@twitter.com>
Authored: Fri Aug 29 17:33:19 2014 -0700
Committer: Dmitriy Ryaboy <dm...@twitter.com>
Committed: Fri Aug 29 17:33:19 2014 -0700

----------------------------------------------------------------------
 pom.xml                                         |   2 +-
 src/main/java/parquet/format/Util.java          | 142 ++++++++++++++
 .../java/parquet/format/event/Consumers.java    | 181 ++++++++++++++++++
 .../format/event/EventBasedThriftReader.java    | 111 +++++++++++
 .../parquet/format/event/FieldConsumer.java     |  25 +++
 .../parquet/format/event/TypedConsumer.java     | 186 +++++++++++++++++++
 src/test/java/parquet/format/TestUtil.java      |  65 +++++++
 7 files changed, 711 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-parquet-format/blob/addbbb91/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 0e5bafd..fe59faf 100644
--- a/pom.xml
+++ b/pom.xml
@@ -166,7 +166,7 @@
     <dependency>
       <groupId>junit</groupId>
       <artifactId>junit</artifactId>
-      <version>3.8.1</version>
+      <version>4.10</version>
       <scope>test</scope>
     </dependency>
   </dependencies>

http://git-wip-us.apache.org/repos/asf/incubator-parquet-format/blob/addbbb91/src/main/java/parquet/format/Util.java
----------------------------------------------------------------------
diff --git a/src/main/java/parquet/format/Util.java b/src/main/java/parquet/format/Util.java
index 9210f37..b063e2b 100644
--- a/src/main/java/parquet/format/Util.java
+++ b/src/main/java/parquet/format/Util.java
@@ -1,8 +1,20 @@
 package parquet.format;
 
+import static parquet.format.FileMetaData._Fields.CREATED_BY;
+import static parquet.format.FileMetaData._Fields.KEY_VALUE_METADATA;
+import static parquet.format.FileMetaData._Fields.NUM_ROWS;
+import static parquet.format.FileMetaData._Fields.ROW_GROUPS;
+import static parquet.format.FileMetaData._Fields.SCHEMA;
+import static parquet.format.FileMetaData._Fields.VERSION;
+import static parquet.format.event.Consumers.fieldConsumer;
+import static parquet.format.event.Consumers.listElementsOf;
+import static parquet.format.event.Consumers.listOf;
+import static parquet.format.event.Consumers.struct;
+
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.util.List;
 
 import org.apache.thrift.TBase;
 import org.apache.thrift.TException;
@@ -10,6 +22,13 @@ import org.apache.thrift.protocol.TCompactProtocol;
 import org.apache.thrift.protocol.TProtocol;
 import org.apache.thrift.transport.TIOStreamTransport;
 
+import parquet.format.event.Consumers.Consumer;
+import parquet.format.event.Consumers.DelegatingFieldConsumer;
+import parquet.format.event.EventBasedThriftReader;
+import parquet.format.event.TypedConsumer.I32Consumer;
+import parquet.format.event.TypedConsumer.I64Consumer;
+import parquet.format.event.TypedConsumer.StringConsumer;
+
 /**
  * Utility to read/write metadata
  * We use the TCompactProtocol to serialize metadata
@@ -34,6 +53,129 @@ public class Util {
   public static FileMetaData readFileMetaData(InputStream from) throws IOException {
     return read(from, new FileMetaData());
   }
+  /**
+   * reads the meta data from the stream
+   * @param from the stream to read the metadata from
+   * @param skipRowGroups whether row groups should be skipped
+   * @return the resulting metadata
+   * @throws IOException
+   */
+  public static FileMetaData readFileMetaData(InputStream from, boolean skipRowGroups) throws IOException {
+    FileMetaData md = new FileMetaData();
+    if (skipRowGroups) {
+      readFileMetaData(from, new DefaultFileMetaDataConsumer(md), skipRowGroups);
+    } else {
+      read(from, md);
+    }
+    return md;
+  }
+
+  /**
+   * To read metadata in a streaming fashion.
+   *
+   * @author Julien Le Dem
+   *
+   */
+  public static abstract class FileMetaDataConsumer {
+    abstract public void setVersion(int version);
+    abstract public void setSchema(List<SchemaElement> schema);
+    abstract public void setNumRows(long numRows);
+    abstract public void addRowGroup(RowGroup rowGroup);
+    abstract public void addKeyValueMetaData(KeyValue kv);
+    abstract public void setCreatedBy(String createdBy);
+  }
+
+  /**
+   * Simple default consumer that sets the fields
+   *
+   * @author Julien Le Dem
+   *
+   */
+  public static final class DefaultFileMetaDataConsumer extends FileMetaDataConsumer {
+    private final FileMetaData md;
+
+    public DefaultFileMetaDataConsumer(FileMetaData md) {
+      this.md = md;
+    }
+
+    @Override
+    public void setVersion(int version) {
+      md.setVersion(version);
+    }
+
+    @Override
+    public void setSchema(List<SchemaElement> schema) {
+      md.setSchema(schema);
+    }
+
+    @Override
+    public void setNumRows(long numRows) {
+      md.setNum_rows(numRows);
+    }
+
+    @Override
+    public void setCreatedBy(String createdBy) {
+      md.setCreated_by(createdBy);
+    }
+
+    @Override
+    public void addRowGroup(RowGroup rowGroup) {
+      md.addToRow_groups(rowGroup);
+    }
+
+    @Override
+    public void addKeyValueMetaData(KeyValue kv) {
+      md.addToKey_value_metadata(kv);
+    }
+  }
+
+  public static void readFileMetaData(InputStream from, FileMetaDataConsumer consumer) throws IOException {
+    readFileMetaData(from, consumer, false);
+  }
+
+  public static void readFileMetaData(InputStream from, final FileMetaDataConsumer consumer, boolean skipRowGroups) throws IOException {
+    try {
+      DelegatingFieldConsumer eventConsumer = fieldConsumer()
+      .onField(VERSION, new I32Consumer() {
+        @Override
+        public void consume(int value) {
+          consumer.setVersion(value);
+        }
+      }).onField(SCHEMA, listOf(SchemaElement.class, new Consumer<List<SchemaElement>>() {
+        @Override
+        public void consume(List<SchemaElement> schema) {
+          consumer.setSchema(schema);
+        }
+      })).onField(NUM_ROWS, new I64Consumer() {
+        @Override
+        public void consume(long value) {
+          consumer.setNumRows(value);
+        }
+      }).onField(KEY_VALUE_METADATA, listElementsOf(struct(KeyValue.class, new Consumer<KeyValue>() {
+        @Override
+        public void consume(KeyValue kv) {
+          consumer.addKeyValueMetaData(kv);
+        }
+      }))).onField(CREATED_BY, new StringConsumer() {
+        @Override
+        public void consume(String value) {
+          consumer.setCreatedBy(value);
+        }
+      });
+      if (!skipRowGroups) {
+        eventConsumer = eventConsumer.onField(ROW_GROUPS, listElementsOf(struct(RowGroup.class, new Consumer<RowGroup>() {
+          @Override
+          public void consume(RowGroup rowGroup) {
+            consumer.addRowGroup(rowGroup);
+          }
+        })));
+      }
+      new EventBasedThriftReader(protocol(from)).readStruct(eventConsumer);
+
+    } catch (TException e) {
+      throw new IOException("can not read FileMetaData: " + e.getMessage(), e);
+    }
+  }
 
   private static TProtocol protocol(OutputStream to) {
     return protocol(new TIOStreamTransport(to));

http://git-wip-us.apache.org/repos/asf/incubator-parquet-format/blob/addbbb91/src/main/java/parquet/format/event/Consumers.java
----------------------------------------------------------------------
diff --git a/src/main/java/parquet/format/event/Consumers.java b/src/main/java/parquet/format/event/Consumers.java
new file mode 100644
index 0000000..ce2788d
--- /dev/null
+++ b/src/main/java/parquet/format/event/Consumers.java
@@ -0,0 +1,181 @@
+package parquet.format.event;
+
+import static java.util.Collections.unmodifiableMap;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.thrift.TBase;
+import org.apache.thrift.TException;
+import org.apache.thrift.TFieldIdEnum;
+import org.apache.thrift.protocol.TList;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.protocol.TProtocolUtil;
+
+import parquet.format.event.Consumers.Consumer;
+import parquet.format.event.TypedConsumer.BoolConsumer;
+import parquet.format.event.TypedConsumer.ListConsumer;
+import parquet.format.event.TypedConsumer.StructConsumer;
+
+/**
+ * Entry point for reading thrift in a streaming fashion
+ *
+ * @author Julien Le Dem
+ *
+ */
+public class Consumers {
+
+  /**
+   * To consume objects coming from a DelegatingFieldConsumer
+   * @author Julien Le Dem
+   *
+   * @param <T> the type of consumed objects
+   */
+  public static interface Consumer<T> {
+    void consume(T t);
+  }
+
+  /**
+   * Delegates reading the field to TypedConsumers.
+   * There is one TypedConsumer per thrift type.
+   * use {@link DelegatingFieldConsumer#onField(TFieldIdEnum, BoolConsumer)} et al. to consume specific thrift fields.
+   * @see Consumers#fieldConsumer()
+   * @author Julien Le Dem
+   *
+   */
+  public static class DelegatingFieldConsumer implements FieldConsumer {
+
+    private final Map<Short, TypedConsumer> contexts;
+    private final FieldConsumer defaultFieldEventConsumer;
+
+    private DelegatingFieldConsumer(FieldConsumer defaultFieldEventConsumer, Map<Short, TypedConsumer> contexts) {
+      this.defaultFieldEventConsumer = defaultFieldEventConsumer;
+      this.contexts = unmodifiableMap(contexts);
+    }
+
+    private DelegatingFieldConsumer() {
+      this(new SkippingFieldConsumer());
+    }
+
+    private DelegatingFieldConsumer(FieldConsumer defaultFieldEventConsumer) {
+      this(defaultFieldEventConsumer, Collections.<Short, TypedConsumer>emptyMap());
+    }
+
+    public DelegatingFieldConsumer onField(TFieldIdEnum e, TypedConsumer typedConsumer) {
+      Map<Short, TypedConsumer> newContexts = new HashMap<Short, TypedConsumer>(contexts);
+      newContexts.put(e.getThriftFieldId(), typedConsumer);
+      return new DelegatingFieldConsumer(defaultFieldEventConsumer, newContexts);
+    }
+
+    @Override
+    public void consumeField(
+        TProtocol protocol, EventBasedThriftReader reader,
+        short id, byte type) throws TException {
+      TypedConsumer delegate = contexts.get(id);
+      if (delegate != null) {
+        delegate.read(protocol, reader, type);
+      } else {
+        defaultFieldEventConsumer.consumeField(protocol, reader, id, type);
+      }
+    }
+  }
+
+  /**
+   * call onField on the resulting DelegatingFieldConsumer to handle individual fields
+   * @return a new DelegatingFieldConsumer
+   */
+  public static DelegatingFieldConsumer fieldConsumer() {
+    return new DelegatingFieldConsumer();
+  }
+
+  /**
+   * To consume a list of elements
+   * @param c the type of the list content
+   * @param consumer the consumer that will receive the list
+   * @return a ListConsumer that can be passed to the DelegatingFieldConsumer
+   */
+  public static <T extends TBase<T,? extends TFieldIdEnum>> ListConsumer listOf(Class<T> c, final Consumer<List<T>> consumer) {
+    class ListConsumer implements Consumer<T> {
+      List<T> list;
+      @Override
+      public void consume(T t) {
+        list.add(t);
+      }
+    }
+    final ListConsumer co = new ListConsumer();
+    return new DelegatingListElementsConsumer(struct(c, co)) {
+      @Override
+      public void consumeList(TProtocol protocol,
+          EventBasedThriftReader reader, TList tList) throws TException {
+        co.list = new ArrayList<T>();
+        super.consumeList(protocol, reader, tList);
+        consumer.consume(co.list);
+      }
+    };
+  }
+
+  /**
+   * To consume list elements one by one
+   * @param consumer the consumer that will read the elements
+   * @return a ListConsumer that can be passed to the DelegatingFieldConsumer
+   */
+  public static ListConsumer listElementsOf(TypedConsumer consumer) {
+    return new DelegatingListElementsConsumer(consumer);
+  }
+
+  public static <T extends TBase<T,? extends TFieldIdEnum>> StructConsumer struct(final Class<T> c, final Consumer<T> consumer) {
+    return new TBaseStructConsumer<T>(c, consumer);
+  }
+}
+
+class SkippingFieldConsumer implements FieldConsumer {
+  @Override
+  public void consumeField(TProtocol protocol, EventBasedThriftReader reader, short id, byte type) throws TException {
+    TProtocolUtil.skip(protocol, type);
+  }
+}
+
+class DelegatingListElementsConsumer extends ListConsumer {
+
+  private TypedConsumer elementConsumer;
+
+  protected DelegatingListElementsConsumer(TypedConsumer consumer) {
+    this.elementConsumer = consumer;
+  }
+
+  @Override
+  public void consumeElement(TProtocol protocol, EventBasedThriftReader reader, byte elemType) throws TException {
+    elementConsumer.read(protocol, reader, elemType);
+  }
+}
+class TBaseStructConsumer<T extends TBase<T, ? extends TFieldIdEnum>> extends StructConsumer {
+
+  private final Class<T> c;
+  private Consumer<T> consumer;
+
+  public TBaseStructConsumer(Class<T> c, Consumer<T> consumer) {
+    this.c = c;
+    this.consumer = consumer;
+  }
+
+  @Override
+  public void consumeStruct(TProtocol protocol, EventBasedThriftReader reader) throws TException {
+    T o = newObject();
+    o.read(protocol);
+    consumer.consume(o);
+  }
+
+  protected T newObject() {
+    try {
+      return c.newInstance();
+    } catch (InstantiationException e) {
+      throw new RuntimeException(c.getName(), e);
+    } catch (IllegalAccessException e) {
+      throw new RuntimeException(c.getName(), e);
+    }
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-parquet-format/blob/addbbb91/src/main/java/parquet/format/event/EventBasedThriftReader.java
----------------------------------------------------------------------
diff --git a/src/main/java/parquet/format/event/EventBasedThriftReader.java b/src/main/java/parquet/format/event/EventBasedThriftReader.java
new file mode 100644
index 0000000..2e66d41
--- /dev/null
+++ b/src/main/java/parquet/format/event/EventBasedThriftReader.java
@@ -0,0 +1,111 @@
+package parquet.format.event;
+
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TField;
+import org.apache.thrift.protocol.TList;
+import org.apache.thrift.protocol.TMap;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.protocol.TSet;
+import org.apache.thrift.protocol.TType;
+
+import parquet.format.event.TypedConsumer.ListConsumer;
+import parquet.format.event.TypedConsumer.MapConsumer;
+import parquet.format.event.TypedConsumer.SetConsumer;
+
+/**
+ * Event based reader for Thrift
+ *
+ * @author Julien Le Dem
+ *
+ */
+public final class EventBasedThriftReader {
+
+  private final TProtocol protocol;
+
+  /**
+   * @param protocol the protocol to read from
+   */
+  public EventBasedThriftReader(TProtocol protocol) {
+    this.protocol = protocol;
+  }
+
+  /**
+   * reads a Struct from the underlying protocol and passes the field events to the FieldConsumer
+   * @param c the field consumer
+   * @throws TException
+   */
+  public void readStruct(FieldConsumer c) throws TException {
+    protocol.readStructBegin();
+    readStructContent(c);
+    protocol.readStructEnd();
+  }
+
+  /**
+   * reads the content of a struct (fields) from the underlying protocol and passes the events to c
+   * @param c the field consumer
+   * @throws TException
+   */
+  public void readStructContent(FieldConsumer c) throws TException {
+    TField field;
+    while (true) {
+      field = protocol.readFieldBegin();
+      if (field.type == TType.STOP) {
+        break;
+      }
+      c.consumeField(protocol, this, field.id, field.type);
+    }
+  }
+
+  /**
+   * reads the set content (elements) from the underlying protocol and passes the events to the set event consumer
+   * @param eventConsumer the consumer
+   * @param tSet the set descriptor
+   * @throws TException
+   */
+  public void readSetContent(SetConsumer eventConsumer, TSet tSet)
+      throws TException {
+    for (int i = 0; i < tSet.size; i++) {
+      eventConsumer.consumeElement(protocol, this, tSet.elemType);
+    }
+  }
+
+  /**
+   * reads the map content (key values) from the underlying protocol and passes the events to the map event consumer
+   * @param eventConsumer the consumer
+   * @param tMap the map descriptor
+   * @throws TException
+   */
+  public void readMapContent(MapConsumer eventConsumer, TMap tMap)
+      throws TException {
+    for (int i = 0; i < tMap.size; i++) {
+      eventConsumer.consumeEntry(protocol, this, tMap.keyType, tMap.valueType);
+    }
+  }
+
+  /**
+   * reads a key-value pair
+   * @param keyType the type of the key
+   * @param keyConsumer the consumer for the key
+   * @param valueType the type of the value
+   * @param valueConsumer the consumer for the value
+   * @throws TException
+   */
+  public void readMapEntry(byte keyType, TypedConsumer keyConsumer, byte valueType, TypedConsumer valueConsumer)
+      throws TException {
+    keyConsumer.read(protocol, this, keyType);
+    valueConsumer.read(protocol, this, valueType);
+  }
+
+  /**
+   * reads the list content (elements) from the underlying protocol and passes the events to the list event consumer
+   * @param eventConsumer the consumer
+   * @param tList the list descriptor
+   * @throws TException
+   */
+  public void readListContent(ListConsumer eventConsumer, TList tList)
+      throws TException {
+    for (int i = 0; i < tList.size; i++) {
+      eventConsumer.consumeElement(protocol, this, tList.elemType);
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-parquet-format/blob/addbbb91/src/main/java/parquet/format/event/FieldConsumer.java
----------------------------------------------------------------------
diff --git a/src/main/java/parquet/format/event/FieldConsumer.java b/src/main/java/parquet/format/event/FieldConsumer.java
new file mode 100644
index 0000000..d8dd038
--- /dev/null
+++ b/src/main/java/parquet/format/event/FieldConsumer.java
@@ -0,0 +1,25 @@
+package parquet.format.event;
+
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TProtocol;
+
+/**
+ * To receive Thrift field events
+ *
+ * @author Julien Le Dem
+ *
+ */
+public interface FieldConsumer {
+
+  /**
+   * called by the EventBasedThriftReader when reading a field from a Struct
+   * @param protocol the underlying protocol
+   * @param eventBasedThriftReader the reader to delegate to further calls.
+   * @param id the id of the field
+   * @param type the type of the field
+   * @return the typed consumer to pass the value to
+   * @throws TException
+   */
+  public void consumeField(TProtocol protocol, EventBasedThriftReader eventBasedThriftReader, short id, byte type) throws TException;
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-parquet-format/blob/addbbb91/src/main/java/parquet/format/event/TypedConsumer.java
----------------------------------------------------------------------
diff --git a/src/main/java/parquet/format/event/TypedConsumer.java b/src/main/java/parquet/format/event/TypedConsumer.java
new file mode 100644
index 0000000..f4e6fe1
--- /dev/null
+++ b/src/main/java/parquet/format/event/TypedConsumer.java
@@ -0,0 +1,186 @@
+package parquet.format.event;
+
+import static org.apache.thrift.protocol.TType.BOOL;
+import static org.apache.thrift.protocol.TType.BYTE;
+import static org.apache.thrift.protocol.TType.DOUBLE;
+import static org.apache.thrift.protocol.TType.I16;
+import static org.apache.thrift.protocol.TType.I32;
+import static org.apache.thrift.protocol.TType.I64;
+import static org.apache.thrift.protocol.TType.LIST;
+import static org.apache.thrift.protocol.TType.MAP;
+import static org.apache.thrift.protocol.TType.SET;
+import static org.apache.thrift.protocol.TType.STRING;
+import static org.apache.thrift.protocol.TType.STRUCT;
+
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TList;
+import org.apache.thrift.protocol.TMap;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.protocol.TSet;
+
+/**
+ * receive thrift events of a given type
+ *
+ * @author Julien Le Dem
+ *
+ */
+abstract public class TypedConsumer {
+
+  abstract public static class DoubleConsumer extends TypedConsumer {
+    protected DoubleConsumer() { super(DOUBLE); }
+    @Override
+    final void read(TProtocol protocol, EventBasedThriftReader reader) throws TException {
+      this.consume(protocol.readDouble());
+    }
+    abstract public void consume(double value);
+  }
+
+  abstract public static class ByteConsumer extends TypedConsumer {
+    protected ByteConsumer() { super(BYTE); }
+    @Override
+    final void read(TProtocol protocol, EventBasedThriftReader reader) throws TException {
+      this.consume(protocol.readByte());
+    }
+    abstract public void consume(byte value);
+  }
+
+  abstract public static class BoolConsumer extends TypedConsumer {
+    protected BoolConsumer() { super(BOOL); }
+    @Override
+    final void read(TProtocol protocol, EventBasedThriftReader reader) throws TException {
+      this.consume(protocol.readBool());
+    }
+    abstract public void consume(boolean value);
+  }
+
+  abstract public static class I32Consumer extends TypedConsumer {
+    protected I32Consumer() { super(I32); }
+    @Override
+    final void read(TProtocol protocol, EventBasedThriftReader reader) throws TException {
+      this.consume(protocol.readI32());
+    }
+    abstract public void consume(int value);
+  }
+
+  abstract public static class I64Consumer extends TypedConsumer {
+    protected I64Consumer() { super(I64); }
+    final void read(TProtocol protocol, EventBasedThriftReader reader) throws TException {
+      this.consume(protocol.readI64());
+    }
+    abstract public void consume(long value);
+  }
+
+  abstract public static class I16Consumer extends TypedConsumer {
+    protected I16Consumer() { super(I16); }
+    @Override
+    final void read(TProtocol protocol, EventBasedThriftReader reader) throws TException {
+      this.consume(protocol.readI16());
+    }
+    abstract public void consume(short value);
+  }
+
+  abstract public static class StringConsumer extends TypedConsumer {
+    protected StringConsumer() { super(STRING); }
+    @Override
+    final void read(TProtocol protocol, EventBasedThriftReader reader) throws TException {
+      this.consume(protocol.readString());
+    }
+    abstract public void consume(String value);
+  }
+
+  abstract public static class StructConsumer extends TypedConsumer {
+    protected StructConsumer() { super(STRUCT); }
+    @Override
+    final void read(TProtocol protocol, EventBasedThriftReader reader) throws TException {
+      this.consumeStruct(protocol, reader);
+    }
+    /**
+     * can either delegate to the reader or read the struct from the protocol
+     * reader.readStruct(fieldConsumer);
+     * @param protocol the underlying protocol
+     * @param reader the reader to delegate to
+     * @throws TException
+     */
+    abstract public void consumeStruct(TProtocol protocol, EventBasedThriftReader reader) throws TException;
+  }
+
+  abstract public static class ListConsumer extends TypedConsumer {
+    protected ListConsumer() { super(LIST); }
+    @Override
+    final void read(TProtocol protocol, EventBasedThriftReader reader) throws TException {
+      this.consumeList(protocol, reader, protocol.readListBegin());
+      protocol.readListEnd();
+    }
+    public void consumeList(TProtocol protocol, EventBasedThriftReader reader, TList tList) throws TException {
+      reader.readListContent(this, tList);
+    }
+    /**
+     * can either delegate to the reader or read the element from the protocol
+     * @param protocol the underlying protocol
+     * @param reader the reader to delegate to
+     * @throws TException
+     */
+    abstract public void consumeElement(TProtocol protocol, EventBasedThriftReader reader, byte elemType) throws TException;
+  }
+
+  abstract public static class SetConsumer extends TypedConsumer {
+    protected SetConsumer() { super(SET); }
+    @Override
+    final void read(TProtocol protocol, EventBasedThriftReader reader) throws TException {
+      this.consumeSet(protocol, reader, protocol.readSetBegin());
+      protocol.readSetEnd();
+    }
+    public void consumeSet(TProtocol protocol, EventBasedThriftReader reader, TSet tSet) throws TException {
+      reader.readSetContent(this, tSet);
+    }
+    /**
+     * can either delegate to the reader or read the set from the protocol
+     * @param protocol the underlying protocol
+     * @param reader the reader to delegate to
+     * @throws TException
+     */
+    abstract public void consumeElement(
+        TProtocol protocol, EventBasedThriftReader reader,
+        byte elemType) throws TException;
+  }
+
+  abstract public static class MapConsumer extends TypedConsumer {
+    protected MapConsumer() { super(MAP); }
+    @Override
+    final void read(TProtocol protocol, EventBasedThriftReader reader)
+        throws TException {
+      this.consumeMap(protocol, reader , protocol.readMapBegin());
+      protocol.readMapEnd();
+    }
+    public void consumeMap(TProtocol protocol, EventBasedThriftReader reader, TMap tMap) throws TException {
+      reader.readMapContent(this, tMap);
+    }
+    /**
+     * can either delegate to the reader or read the map entry from the protocol
+     * @param protocol the underlying protocol
+     * @param reader the reader to delegate to
+     * @throws TException
+     */
+    abstract public void consumeEntry(
+        TProtocol protocol, EventBasedThriftReader reader,
+        byte keyType, byte valueType) throws TException;
+  }
+
+  public final byte type;
+
+  private TypedConsumer(byte type) {
+    this.type = type;
+  }
+
+  final public void read(TProtocol protocol, EventBasedThriftReader reader, byte type) throws TException {
+    if (this.type != type) {
+      throw new TException(
+          "Incorrect type in stream. "
+              + "Expected " + this.type
+              + " but got " + type);
+    }
+    this.read(protocol, reader);
+  }
+
+  abstract void read(TProtocol protocol, EventBasedThriftReader reader) throws TException;
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-parquet-format/blob/addbbb91/src/test/java/parquet/format/TestUtil.java
----------------------------------------------------------------------
diff --git a/src/test/java/parquet/format/TestUtil.java b/src/test/java/parquet/format/TestUtil.java
new file mode 100644
index 0000000..14f814d
--- /dev/null
+++ b/src/test/java/parquet/format/TestUtil.java
@@ -0,0 +1,65 @@
+package parquet.format;
+
+import static java.util.Arrays.asList;
+import static junit.framework.Assert.assertEquals;
+import static junit.framework.Assert.assertNull;
+import static parquet.format.Util.readFileMetaData;
+import static parquet.format.Util.writeFileMetaData;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+
+import org.junit.Test;
+
+import parquet.format.Util.DefaultFileMetaDataConsumer;
+public class TestUtil {
+
+  @Test
+  public void testReadFileMetadata() throws Exception {
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    FileMetaData md = new FileMetaData(
+        1,
+        asList(new SchemaElement("foo")),
+        10,
+        asList(
+            new RowGroup(
+                asList(
+                    new ColumnChunk(0),
+                    new ColumnChunk(1)
+                    ),
+                10,
+                5),
+            new RowGroup(
+                asList(
+                    new ColumnChunk(2),
+                    new ColumnChunk(3)
+                    ),
+                11,
+                5)
+        )
+    );
+    writeFileMetaData(md , baos);
+    FileMetaData md2 = readFileMetaData(in(baos));
+    FileMetaData md3 = new FileMetaData();
+    readFileMetaData(in(baos), new DefaultFileMetaDataConsumer(md3));
+    FileMetaData md4 = new FileMetaData();
+    readFileMetaData(in(baos), new DefaultFileMetaDataConsumer(md4), true);
+    FileMetaData md5 = readFileMetaData(in(baos), true);
+    FileMetaData md6 = readFileMetaData(in(baos), false);
+    assertEquals(md, md2);
+    assertEquals(md, md3);
+    assertNull(md4.getRow_groups());
+    assertNull(md5.getRow_groups());
+    assertEquals(md4, md5);
+    md4.setRow_groups(md.getRow_groups());
+    md5.setRow_groups(md.getRow_groups());
+    assertEquals(md, md4);
+    assertEquals(md, md5);
+    assertEquals(md4, md5);
+    assertEquals(md, md6);
+  }
+
+  private ByteArrayInputStream in(ByteArrayOutputStream baos) {
+    return new ByteArrayInputStream(baos.toByteArray());
+  }
+}