You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by ga...@apache.org on 2020/08/25 09:33:23 UTC

[parquet-mr] branch master updated: PARQUET-1455: [parquet-protobuf] Handle protobuf enum schema evolution and unknown enum value (#561)

This is an automated email from the ASF dual-hosted git repository.

gabor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-mr.git


The following commit(s) were added to refs/heads/master by this push:
     new 3d45fd5  PARQUET-1455: [parquet-protobuf] Handle protobuf enum schema evolution and unknown enum value (#561)
3d45fd5 is described below

commit 3d45fd5480b1b58074c5b53d0738774cf355d11e
Author: wineandcheeze <qi...@users.noreply.github.com>
AuthorDate: Tue Aug 25 11:33:11 2020 +0200

    PARQUET-1455: [parquet-protobuf] Handle protobuf enum schema evolution and unknown enum value (#561)
    
    Protobuf can set enum field using number, while a number does not
    match any enum value defined in the schema, it is still accepted
    and a label "UNKNOWN_ENUM_<enumName>_<number>" is generated when
    we use protobuf reflection API (proto descriptors) to access it.
    And in parquet-protobuf, we rely on protobuf reflection API to
    convert forward/backward between the two world.
    
    There are two cases of unknown enum while using parquet-protobuf:
      1. Protobuf already contains unknown enum when we write it to
      parquet (eg1. sometmes people set enum fields using numbers; eg2
      writer deserialize data from wire and the sender can have a newer
      version of proto schema with new enum values). The behavior of
      parquet-protobuf writer as before this patch is to write a label
      "UNKNOWN_ENUM_<number>" as string in the enum column of parquet.
      And when we read it back as protobuf, we found this unknown label
      which does not match any enum def (even with the same schema as
      the sender in eg2)
      2. Protobuf contains valid value when write to parquet, but the
      reader uses an outdated proto schema which misses some enum
      values. So the not-in-old-schema enum values are "unknown" to the
      reader.
    
    Previous behavior of parquet-proto reader is to reject in both
    cases with some runtime exception.
    
    To be able to handle the problems:
    We keep enum (name -> number) mapping in the parquet metadata, so
    that in read time, reader can discover the number and use protobuf
    reflection API to set enum number.
    Keep in mind though, for the case reading enum with outdated schema
    (case 2), the enum read back will have the right number, but the
    label is set to "UNKNOW_ENUM_<number>". So this feature is helpful
    only if the user is using number to manipulate enum data.
    And for old data containing "true" unknown value (thus case 1)
    created before this patch (thus name -> number mapping is not
    available), we now try to parse the string regarding to the
    "UNKNOWN_ENUM_<number>" pattern.
    If we read old data created before this patch (thus name -> number
    is not available), with an outdated schema, and we find some enum
    value not defined in the schema nor following "UNKNOWN_ENUM_*"
    pattern, we could either fail the job by raising an exception or
    treat the value as unknown enum with number -1, by setting a flag
    in the configuration.
    
    The name -> number mapping is a new metadata under the
    "parquet.proto.enum" namespace. The metadata for protobuf enum
    (label:number) mapping should follow some specific pattern, throw
    BadConfigurationException in read time if it is not.
    
    Tests for enum schema evolution (read/write with different protobuf
    schema) are added.
---
 ...RecordMaterializer.java => ProtoConstants.java} |  42 ++++----
 .../parquet/proto/ProtoMessageConverter.java       | 114 ++++++++++++++++-----
 .../org/apache/parquet/proto/ProtoReadSupport.java |   8 +-
 .../apache/parquet/proto/ProtoRecordConverter.java |  29 ++++--
 .../parquet/proto/ProtoRecordMaterializer.java     |  13 ++-
 .../apache/parquet/proto/ProtoWriteSupport.java    |  54 +++++++++-
 .../parquet/proto/ProtoRecordConverterTest.java    |  23 ++++-
 .../parquet/proto/ProtoSchemaEvolutionTest.java    |  68 ++++++++++++
 .../java/org/apache/parquet/proto/TestUtils.java   |  46 +++++----
 .../src/test/resources/TestProto3SchemaV1.proto    |  38 +++++++
 .../src/test/resources/TestProto3SchemaV2.proto    |  40 ++++++++
 pom.xml                                            |   6 +-
 12 files changed, 393 insertions(+), 88 deletions(-)

diff --git a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoRecordMaterializer.java b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoConstants.java
similarity index 50%
copy from parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoRecordMaterializer.java
copy to parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoConstants.java
index 039a571..7458f89 100644
--- a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoRecordMaterializer.java
+++ b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoConstants.java
@@ -1,4 +1,4 @@
-/* 
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -18,27 +18,23 @@
  */
 package org.apache.parquet.proto;
 
-import com.google.protobuf.Message;
-import com.google.protobuf.MessageOrBuilder;
-import org.apache.parquet.io.api.GroupConverter;
-import org.apache.parquet.io.api.RecordMaterializer;
-import org.apache.parquet.schema.MessageType;
-
-class ProtoRecordMaterializer<T extends MessageOrBuilder> extends RecordMaterializer<T> {
-
-  private final ProtoRecordConverter<T> root;
-
-  public ProtoRecordMaterializer(MessageType requestedSchema, Class<? extends Message> protobufClass) {
-    this.root = new ProtoRecordConverter<T>(protobufClass, requestedSchema);
-  }
+/**
+ * Constants.
+ */
+public final class ProtoConstants {
 
-  @Override
-  public T getCurrentRecord() {
-    return root.getCurrentRecord();
-  }
+  public static final String METADATA_ENUM_PREFIX = "parquet.proto.enum.";
+  public static final String METADATA_ENUM_KEY_VALUE_SEPARATOR = ":";
+  public static final String METADATA_ENUM_ITEM_SEPARATOR = ",";
+  /**
+   * Configuration flag to enable reader to accept enum label that's neither defined in its own proto schema nor conform
+   * to the "UNKNOWN_ENUM_*" pattern with which we can get the enum number. The enum value will be treated as an unknown
+   * enum with number -1. <br>
+   * Enabling it will avoid a job failure, but you should perhaps use an up-to-date schema instead.
+   */
+  public static final String CONFIG_ACCEPT_UNKNOWN_ENUM = "parquet.proto.accept.unknown.enum";
 
-  @Override
-  public GroupConverter getRootConverter() {
-    return root;
+  private ProtoConstants() {
+    // Do not instantiate.
   }
 }
diff --git a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoMessageConverter.java b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoMessageConverter.java
index 173fa77..77f5d52 100644
--- a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoMessageConverter.java
+++ b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoMessageConverter.java
@@ -22,7 +22,9 @@ import com.google.protobuf.ByteString;
 import com.google.protobuf.Descriptors;
 import com.google.protobuf.Message;
 import com.twitter.elephantbird.util.Protobufs;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.parquet.column.Dictionary;
+import org.apache.parquet.hadoop.BadConfigurationException;
 import org.apache.parquet.io.InvalidRecordException;
 import org.apache.parquet.io.ParquetDecodingException;
 import org.apache.parquet.io.api.Binary;
@@ -33,6 +35,8 @@ import org.apache.parquet.schema.GroupType;
 import org.apache.parquet.schema.IncompatibleSchemaModificationException;
 import org.apache.parquet.schema.LogicalTypeAnnotation;
 import org.apache.parquet.schema.Type;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.HashMap;
 import java.util.List;
@@ -42,30 +46,46 @@ import java.util.Set;
 
 import static com.google.protobuf.Descriptors.FieldDescriptor.JavaType;
 import static java.util.Optional.of;
+import static org.apache.parquet.proto.ProtoConstants.CONFIG_ACCEPT_UNKNOWN_ENUM;
+import static org.apache.parquet.proto.ProtoConstants.METADATA_ENUM_ITEM_SEPARATOR;
+import static org.apache.parquet.proto.ProtoConstants.METADATA_ENUM_KEY_VALUE_SEPARATOR;
+import static org.apache.parquet.proto.ProtoConstants.METADATA_ENUM_PREFIX;
 
 /**
  * Converts Protocol Buffer message (both top level and inner) to parquet.
  * This is internal class, use {@link ProtoRecordConverter}.
  */
 class ProtoMessageConverter extends GroupConverter {
+  private static final Logger LOG = LoggerFactory.getLogger(ProtoMessageConverter.class);
 
-  private final Converter[] converters;
-  private final ParentValueContainer parent;
-  private final Message.Builder myBuilder;
+  protected final Configuration conf;
+  protected final Converter[] converters;
+  protected final ParentValueContainer parent;
+  protected final Message.Builder myBuilder;
+  protected final Map<String, String> extraMetadata;
 
-  // used in record converter
-  ProtoMessageConverter(ParentValueContainer pvc, Class<? extends Message> protoClass, GroupType parquetSchema) {
-    this(pvc, Protobufs.getMessageBuilder(protoClass), parquetSchema);
+  /**
+   * Used in record converter.
+   *
+   * @param conf Configuration for some customizable behavior,
+   *            eg. "parquet.proto.accept.unknown.enum" - whether to accept an unparsable (after trying with proto enum label and number) enum as `UNKNOWN` with a number -1 (the one generated automatically for each proto enum)
+   * @param pvc The parent value containing the converted proto
+   * @param protoClass The class of the converted proto
+   * @param parquetSchema The (part of) parquet schema that should match to the expected proto
+   * @param extraMetadata Metadata from parquet footer, containing useful information about parquet-proto convertion behavior
+   */
+  ProtoMessageConverter(Configuration conf, ParentValueContainer pvc, Class<? extends Message> protoClass, GroupType parquetSchema, Map<String, String> extraMetadata) {
+    this(conf, pvc, Protobufs.getMessageBuilder(protoClass), parquetSchema, extraMetadata);
   }
 
-
   // For usage in message arrays
-  ProtoMessageConverter(ParentValueContainer pvc, Message.Builder builder, GroupType parquetSchema) {
+  ProtoMessageConverter(Configuration conf, ParentValueContainer pvc, Message.Builder builder, GroupType parquetSchema, Map<String, String> extraMetadata) {
 
     int schemaSize = parquetSchema.getFieldCount();
     converters = new Converter[schemaSize];
-
+    this.conf = conf;
     this.parent = pvc;
+    this.extraMetadata = extraMetadata;
     int parquetFieldIndex = 1;
 
     if (pvc == null) {
@@ -108,7 +128,7 @@ class ProtoMessageConverter extends GroupConverter {
     myBuilder.clear();
   }
 
-  private Converter newMessageConverter(final Message.Builder parentBuilder, final Descriptors.FieldDescriptor fieldDescriptor, Type parquetType) {
+  protected Converter newMessageConverter(final Message.Builder parentBuilder, final Descriptors.FieldDescriptor fieldDescriptor, Type parquetType) {
 
     boolean isRepeated = fieldDescriptor.isRepeated();
 
@@ -148,7 +168,7 @@ class ProtoMessageConverter extends GroupConverter {
     }).orElseGet(() -> newScalarConverter(parent, parentBuilder, fieldDescriptor, parquetType));
   }
 
-  private Converter newScalarConverter(ParentValueContainer pvc, Message.Builder parentBuilder, Descriptors.FieldDescriptor fieldDescriptor, Type parquetType) {
+  protected Converter newScalarConverter(ParentValueContainer pvc, Message.Builder parentBuilder, Descriptors.FieldDescriptor fieldDescriptor, Type parquetType) {
 
     JavaType javaType = fieldDescriptor.getJavaType();
 
@@ -163,7 +183,7 @@ class ProtoMessageConverter extends GroupConverter {
       case LONG: return new ProtoLongConverter(pvc);
       case MESSAGE: {
         Message.Builder subBuilder = parentBuilder.newBuilderForField(fieldDescriptor);
-        return new ProtoMessageConverter(pvc, subBuilder, parquetType.asGroupType());
+        return new ProtoMessageConverter(conf, pvc, subBuilder, parquetType.asGroupType(), extraMetadata);
       }
     }
 
@@ -190,25 +210,45 @@ class ProtoMessageConverter extends GroupConverter {
     private final Map<Binary, Descriptors.EnumValueDescriptor> enumLookup;
     private Descriptors.EnumValueDescriptor[] dict;
     private final ParentValueContainer parent;
+    private final Descriptors.EnumDescriptor enumType;
+    private final String unknownEnumPrefix;
+    private final boolean acceptUnknownEnum;
 
     public ProtoEnumConverter(ParentValueContainer parent, Descriptors.FieldDescriptor fieldType) {
       this.parent = parent;
       this.fieldType = fieldType;
-      this.enumLookup = makeLookupStructure(fieldType);
+      this.enumType = fieldType.getEnumType();
+      this.enumLookup = makeLookupStructure(enumType);
+      unknownEnumPrefix = "UNKNOWN_ENUM_VALUE_" + enumType.getName() + "_";
+      acceptUnknownEnum = conf.getBoolean(CONFIG_ACCEPT_UNKNOWN_ENUM, false);
     }
 
     /**
      * Fills lookup structure for translating between parquet enum values and Protocol buffer enum values.
      * */
-    private Map<Binary, Descriptors.EnumValueDescriptor> makeLookupStructure(Descriptors.FieldDescriptor enumFieldType) {
-      Descriptors.EnumDescriptor enumType = enumFieldType.getEnumType();
+    private Map<Binary, Descriptors.EnumValueDescriptor> makeLookupStructure(Descriptors.EnumDescriptor enumType) {
       Map<Binary, Descriptors.EnumValueDescriptor> lookupStructure = new HashMap<Binary, Descriptors.EnumValueDescriptor>();
 
-      List<Descriptors.EnumValueDescriptor> enumValues = enumType.getValues();
+      if (extraMetadata.containsKey(METADATA_ENUM_PREFIX + enumType.getFullName())) {
+        String enumNameNumberPairs = extraMetadata.get(METADATA_ENUM_PREFIX + enumType.getFullName());
+        if (enumNameNumberPairs == null || enumNameNumberPairs.trim().isEmpty()) {
+          LOG.debug("No enum is written for " + enumType.getFullName());
+          return lookupStructure;
+        }
+        for (String enumItem : enumNameNumberPairs.split(METADATA_ENUM_ITEM_SEPARATOR)) {
+          String[] nameAndNumber = enumItem.split(METADATA_ENUM_KEY_VALUE_SEPARATOR);
+          if (nameAndNumber.length != 2) {
+            throw new BadConfigurationException("Invalid enum bookkeeper from the metadata: " + enumNameNumberPairs);
+          }
+          lookupStructure.put(Binary.fromString(nameAndNumber[0]), enumType.findValueByNumberCreatingIfUnknown(Integer.parseInt(nameAndNumber[1])));
+        }
+      } else {
+        List<Descriptors.EnumValueDescriptor> enumValues = enumType.getValues();
 
-      for (Descriptors.EnumValueDescriptor value : enumValues) {
-        String name = value.getName();
-        lookupStructure.put(Binary.fromString(name), enumType.findValueByName(name));
+        for (Descriptors.EnumValueDescriptor value : enumValues) {
+          String name = value.getName();
+          lookupStructure.put(Binary.fromString(name), enumType.findValueByName(name));
+        }
       }
 
       return lookupStructure;
@@ -222,11 +262,37 @@ class ProtoMessageConverter extends GroupConverter {
       Descriptors.EnumValueDescriptor protoValue = enumLookup.get(binaryValue);
 
       if (protoValue == null) {
-        Set<Binary> knownValues = enumLookup.keySet();
-        String msg = "Illegal enum value \"" + binaryValue + "\""
-                + " in protocol buffer \"" + fieldType.getFullName() + "\""
-                + " legal values are: \"" + knownValues + "\"";
-        throw new InvalidRecordException(msg);
+        // in case of unknown enum value, protobuf is creating new EnumValueDescriptor with the unknown number
+        // and name as following "UNKNOWN_ENUM_VALUE_" + parent.getName() + "_" + number
+        // so the idea is to parse the name for data created by parquet-proto before this patch
+        String unknownLabel = binaryValue.toStringUsingUTF8();
+        if (unknownLabel.startsWith(unknownEnumPrefix)) {
+          try {
+            int i = Integer.parseInt(unknownLabel.substring(unknownEnumPrefix.length()));
+            Descriptors.EnumValueDescriptor unknownEnumValue = enumType.findValueByNumberCreatingIfUnknown(i);
+            // build new EnumValueDescriptor and put it in the value cache
+            enumLookup.put(binaryValue, unknownEnumValue);
+            return unknownEnumValue;
+          } catch (NumberFormatException e) {
+            // The value does not respect "UNKNOWN_ENUM_VALUE_" + parent.getName() + "_" + number pattern
+            // We accept it as unknown enum with number -1.
+          }
+        }
+        if (!acceptUnknownEnum) {
+          // Safe mode, when an enum does not have its number in metadata (data written before this fix), and its label
+          // is unrecognizable (neither defined in the schema, nor parsable with "UNKNOWN_ENUM_*" pattern, which means
+          // probably the reader schema is not up-to-date), we reject with an error.
+          Set<Binary> knownValues = enumLookup.keySet();
+          String msg = "Illegal enum value \"" + binaryValue + "\""
+            + " in protocol buffer \"" + fieldType.getFullName() + "\""
+            + " legal values are: \"" + knownValues + "\"";
+          throw new InvalidRecordException(msg);
+        }
+        LOG.error("Found unknown value " +  unknownLabel + " for field " + fieldType.getFullName() +
+          " probably because your proto schema is outdated, accept it as unknown enum with number -1");
+        Descriptors.EnumValueDescriptor unrecognized = enumType.findValueByNumberCreatingIfUnknown(-1);
+        enumLookup.put(binaryValue, unrecognized);
+        return unrecognized;
       }
       return protoValue;
     }
diff --git a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoReadSupport.java b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoReadSupport.java
index 0d79d01..78edf70 100644
--- a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoReadSupport.java
+++ b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoReadSupport.java
@@ -1,4 +1,4 @@
-/* 
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -90,7 +90,7 @@ public class ProtoReadSupport<T extends Message> extends ReadSupport<T> {
 
     MessageType requestedSchema = readContext.getRequestedSchema();
     Class<? extends Message> protobufClass = Protobufs.getProtobufClass(headerProtoClass);
-    return new ProtoRecordMaterializer(requestedSchema, protobufClass);
+    return new ProtoRecordMaterializer(configuration, requestedSchema, protobufClass, keyValueMetaData);
   }
 
 
diff --git a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoRecordConverter.java b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoRecordConverter.java
index e161819..75a67f1 100644
--- a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoRecordConverter.java
+++ b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoRecordConverter.java
@@ -1,4 +1,4 @@
-/* 
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -21,8 +21,12 @@ package org.apache.parquet.proto;
 
 import com.google.protobuf.Message;
 import com.google.protobuf.MessageOrBuilder;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.parquet.schema.MessageType;
 
+import java.util.Collections;
+import java.util.Map;
+
 /**
  * Converts data content of root message from Protocol Buffer message to parquet message.
  * It delegates conversion of inner fields to {@link ProtoMessageConverter} class using inheritance.
@@ -45,15 +49,26 @@ public class ProtoRecordConverter<T extends MessageOrBuilder> extends ProtoMessa
     }
   }
 
+  public ProtoRecordConverter(Configuration conf, Class<? extends Message> protoclass, MessageType parquetSchema, Map<String, String> extraMetadata) {
+    super(conf, new SkipParentValueContainer(), protoclass, parquetSchema, extraMetadata);
+    reusedBuilder = getBuilder();
+  }
+
+  public ProtoRecordConverter(Configuration conf, Message.Builder builder, MessageType parquetSchema, Map<String, String> extraMetadata) {
+    super(conf, new SkipParentValueContainer(), builder, parquetSchema, extraMetadata);
+    reusedBuilder = getBuilder();
+  }
 
+  // Old version constructors, kept for code backward compatibility.
+  // The instance will not be able to handle unknowned enum values written by parquet-proto (the behavior before PARQUET-1455)
+  @Deprecated
   public ProtoRecordConverter(Class<? extends Message> protoclass, MessageType parquetSchema) {
-    super(new SkipParentValueContainer(), protoclass, parquetSchema);
-    reusedBuilder = getBuilder();
+    this(new Configuration(), protoclass, parquetSchema, Collections.emptyMap());
   }
 
+  @Deprecated
   public ProtoRecordConverter(Message.Builder builder, MessageType parquetSchema) {
-    super(new SkipParentValueContainer(), builder, parquetSchema);
-    reusedBuilder = getBuilder();
+    this(new Configuration(), builder, parquetSchema, Collections.emptyMap());
   }
 
   @Override
diff --git a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoRecordMaterializer.java b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoRecordMaterializer.java
index 039a571..dd77ca6 100644
--- a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoRecordMaterializer.java
+++ b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoRecordMaterializer.java
@@ -1,4 +1,4 @@
-/* 
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -20,16 +20,19 @@ package org.apache.parquet.proto;
 
 import com.google.protobuf.Message;
 import com.google.protobuf.MessageOrBuilder;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.parquet.io.api.GroupConverter;
 import org.apache.parquet.io.api.RecordMaterializer;
 import org.apache.parquet.schema.MessageType;
 
+import java.util.Map;
+
 class ProtoRecordMaterializer<T extends MessageOrBuilder> extends RecordMaterializer<T> {
 
   private final ProtoRecordConverter<T> root;
 
-  public ProtoRecordMaterializer(MessageType requestedSchema, Class<? extends Message> protobufClass) {
-    this.root = new ProtoRecordConverter<T>(protobufClass, requestedSchema);
+  public ProtoRecordMaterializer(Configuration conf, MessageType requestedSchema, Class<? extends Message> protobufClass, Map<String, String> metadata) {
+    this.root = new ProtoRecordConverter<T>(conf, protobufClass, requestedSchema, metadata);
   }
 
   @Override
diff --git a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoWriteSupport.java b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoWriteSupport.java
index a803802..2322667 100644
--- a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoWriteSupport.java
+++ b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoWriteSupport.java
@@ -38,6 +38,10 @@ import java.util.*;
 
 import static java.util.Optional.ofNullable;
 
+import static org.apache.parquet.proto.ProtoConstants.METADATA_ENUM_ITEM_SEPARATOR;
+import static org.apache.parquet.proto.ProtoConstants.METADATA_ENUM_KEY_VALUE_SEPARATOR;
+import static org.apache.parquet.proto.ProtoConstants.METADATA_ENUM_PREFIX;
+
 /**
  * Implementation of {@link WriteSupport} for writing Protocol Buffers.
  */
@@ -55,6 +59,9 @@ public class ProtoWriteSupport<T extends MessageOrBuilder> extends WriteSupport<
   private RecordConsumer recordConsumer;
   private Class<? extends Message> protoMessage;
   private MessageWriter messageWriter;
+  // Keep protobuf enum value with number in the metadata, so that in read time, a reader can read at least
+  // the number back even with an outdated schema which might not contain all enum values.
+  private Map<String, Map<String, Integer>> protoEnumBookKeeper = new HashMap<>();
 
   public ProtoWriteSupport() {
   }
@@ -126,13 +133,41 @@ public class ProtoWriteSupport<T extends MessageOrBuilder> extends WriteSupport<
 
     this.messageWriter = new MessageWriter(messageDescriptor, rootSchema);
 
-    Map<String, String> extraMetaData = new HashMap<String, String>();
+    Map<String, String> extraMetaData = new HashMap<>();
     extraMetaData.put(ProtoReadSupport.PB_CLASS, protoMessage.getName());
     extraMetaData.put(ProtoReadSupport.PB_DESCRIPTOR, serializeDescriptor(protoMessage));
     extraMetaData.put(PB_SPECS_COMPLIANT_WRITE, String.valueOf(writeSpecsCompliant));
     return new WriteContext(rootSchema, extraMetaData);
   }
 
+  @Override
+  public FinalizedWriteContext finalizeWrite() {
+    Map<String, String> protoMetadata = enumMetadata();
+    return new FinalizedWriteContext(protoMetadata);
+  }
+
+  private Map<String, String> enumMetadata() {
+    Map<String, String> enumMetadata = new HashMap<>();
+    for (Map.Entry<String, Map<String, Integer>> enumNameNumberMapping : protoEnumBookKeeper.entrySet()) {
+      StringBuilder nameNumberPairs = new StringBuilder();
+      if (enumNameNumberMapping.getValue().isEmpty()) {
+        // No enum is ever written to any column of this file, put an empty string as the value in the metadata
+        LOG.info("No enum is written for " + enumNameNumberMapping.getKey());
+      }
+      int idx = 0;
+      for (Map.Entry<String, Integer> nameNumberPair : enumNameNumberMapping.getValue().entrySet()) {
+        nameNumberPairs.append(nameNumberPair.getKey())
+          .append(METADATA_ENUM_KEY_VALUE_SEPARATOR)
+          .append(nameNumberPair.getValue());
+        idx ++;
+        if (idx < enumNameNumberMapping.getValue().size()) {
+          nameNumberPairs.append(METADATA_ENUM_ITEM_SEPARATOR);
+        }
+      }
+      enumMetadata.put(METADATA_ENUM_PREFIX + enumNameNumberMapping.getKey(), nameNumberPairs.toString());
+    }
+    return enumMetadata;
+  }
 
   class FieldWriter {
     String fieldName;
@@ -202,7 +237,7 @@ public class ProtoWriteSupport<T extends MessageOrBuilder> extends WriteSupport<
         case LONG: return new LongWriter();
         case FLOAT: return new FloatWriter();
         case DOUBLE: return new DoubleWriter();
-        case ENUM: return new EnumWriter();
+        case ENUM: return new EnumWriter(fieldDescriptor.getEnumType());
         case BOOLEAN: return new BooleanWriter();
         case BYTE_STRING: return new BinaryWriter();
       }
@@ -480,10 +515,23 @@ public class ProtoWriteSupport<T extends MessageOrBuilder> extends WriteSupport<
   }
 
   class EnumWriter extends FieldWriter {
+    Map<String, Integer> enumNameNumberPairs;
+
+    public EnumWriter(Descriptors.EnumDescriptor enumType) {
+      if (protoEnumBookKeeper.containsKey(enumType.getFullName())) {
+        enumNameNumberPairs = protoEnumBookKeeper.get(enumType.getFullName());
+      } else {
+        enumNameNumberPairs = new HashMap<>();
+        protoEnumBookKeeper.put(enumType.getFullName(), enumNameNumberPairs);
+      }
+    }
+
     @Override
     final void writeRawValue(Object value) {
-      Binary binary = Binary.fromString(((Descriptors.EnumValueDescriptor) value).getName());
+      Descriptors.EnumValueDescriptor enumValueDesc = (Descriptors.EnumValueDescriptor) value;
+      Binary binary = Binary.fromString(enumValueDesc.getName());
       recordConsumer.addBinary(binary);
+      enumNameNumberPairs.putIfAbsent(enumValueDesc.getName(), enumValueDesc.getNumber());
     }
   }
 
diff --git a/parquet-protobuf/src/test/java/org/apache/parquet/proto/ProtoRecordConverterTest.java b/parquet-protobuf/src/test/java/org/apache/parquet/proto/ProtoRecordConverterTest.java
index e042f96..74cddeb 100644
--- a/parquet-protobuf/src/test/java/org/apache/parquet/proto/ProtoRecordConverterTest.java
+++ b/parquet-protobuf/src/test/java/org/apache/parquet/proto/ProtoRecordConverterTest.java
@@ -25,11 +25,12 @@ import org.apache.parquet.proto.test.TestProtobuf;
 
 import java.util.List;
 
+import static org.apache.parquet.proto.TestUtils.testData;
+import static org.apache.parquet.proto.test.TestProtobuf.SchemaConverterAllDatatypes;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertTrue;
-import static org.apache.parquet.proto.TestUtils.testData;
-import static org.apache.parquet.proto.test.TestProtobuf.SchemaConverterAllDatatypes;
 
 public class ProtoRecordConverterTest {
 
@@ -341,4 +342,22 @@ public class ProtoRecordConverterTest {
 
     testData(builder.build());
   }
+
+  @Test
+  public void testUnknownEnum() throws Exception {
+    TestProto3.SchemaConverterAllDatatypes.Builder data;
+    data = TestProto3.SchemaConverterAllDatatypes.newBuilder();
+    data.setOptionalEnumValue(42);
+
+    TestProto3.SchemaConverterAllDatatypes dataBuilt = data.build();
+    data.clear();
+
+    List<TestProto3.SchemaConverterAllDatatypes> result;
+    result = testData(dataBuilt);
+
+    //data are fully checked in testData function. Lets do one more check.
+    TestProto3.SchemaConverterAllDatatypes o = result.get(0);
+    assertSame(o.getOptionalEnum(), TestProto3.SchemaConverterAllDatatypes.TestEnum.UNRECOGNIZED);
+    assertEquals(o.getOptionalEnumValue(), 42);
+  }
 }
diff --git a/parquet-protobuf/src/test/java/org/apache/parquet/proto/ProtoSchemaEvolutionTest.java b/parquet-protobuf/src/test/java/org/apache/parquet/proto/ProtoSchemaEvolutionTest.java
new file mode 100644
index 0000000..db7f6ce
--- /dev/null
+++ b/parquet-protobuf/src/test/java/org/apache/parquet/proto/ProtoSchemaEvolutionTest.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.proto;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.proto.test.TestProto3SchemaV1;
+import org.apache.parquet.proto.test.TestProto3SchemaV2;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.List;
+
+import static org.apache.parquet.proto.TestUtils.readMessages;
+import static org.apache.parquet.proto.TestUtils.writeMessages;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
+
+/**
+ * Tests for backward/forward compatibility while write and read parquet using different versions of protobuf schema.
+ */
+public class ProtoSchemaEvolutionTest {
+
+  /**
+   * Test we can read enum value (number) with an old schema even the value is missing in the old schema.
+   */
+  @Test
+  public void testEnumSchemaWriteV2ReadV1() throws IOException {
+    TestProto3SchemaV2.MessageSchema dataV2 = TestProto3SchemaV2.MessageSchema.newBuilder()
+      .setOptionalLabelNumberPair(TestProto3SchemaV2.MessageSchema.LabelNumberPair.SECOND)
+      .setOptionalString("string value")
+      .build();
+    Path file = writeMessages(dataV2);
+    List<TestProto3SchemaV1.MessageSchema> messagesV1 = readMessages(file, TestProto3SchemaV1.MessageSchema.class);
+    assertEquals(messagesV1.size(), 1);
+    assertEquals(messagesV1.get(0).getOptionalLabelNumberPairValue(), 2);
+  }
+
+  /**
+   * Write enum value unknown in V1 (thus "UNKNOWN_ENUM_VALUE_*"), and we can read it back with schema V2 that contains
+   * the enum definition.
+   */
+  @Test
+  public void testEnumSchemaWriteV1ReadV2() throws IOException {
+    TestProto3SchemaV1.MessageSchema dataV1WithEnumValueFromV2 = TestProto3SchemaV1.MessageSchema.newBuilder()
+      .setOptionalLabelNumberPairValue(2) // "2" is not defined in V1 enum, but the number is still accepted by protobuf
+      .build();
+    Path file = writeMessages(dataV1WithEnumValueFromV2);
+    List<TestProto3SchemaV2.MessageSchema> messagesV2 = readMessages(file, TestProto3SchemaV2.MessageSchema.class);
+    assertEquals(messagesV2.size(), 1);
+    assertSame(messagesV2.get(0).getOptionalLabelNumberPair(), TestProto3SchemaV2.MessageSchema.LabelNumberPair.SECOND);
+  }
+}
diff --git a/parquet-protobuf/src/test/java/org/apache/parquet/proto/TestUtils.java b/parquet-protobuf/src/test/java/org/apache/parquet/proto/TestUtils.java
index 2c8b41f..2fbd2a4 100644
--- a/parquet-protobuf/src/test/java/org/apache/parquet/proto/TestUtils.java
+++ b/parquet-protobuf/src/test/java/org/apache/parquet/proto/TestUtils.java
@@ -23,13 +23,13 @@ import com.google.protobuf.Message;
 import com.google.protobuf.MessageOrBuilder;
 import com.twitter.elephantbird.util.Protobufs;
 import org.apache.hadoop.fs.Path;
+import org.apache.parquet.hadoop.ParquetReader;
 
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 
-import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.fail;
 
 public class TestUtils {
@@ -78,8 +78,7 @@ public class TestUtils {
 
     checkSameBuilderInstance(messages);
 
-    List<MessageOrBuilder> output = (List<MessageOrBuilder>) writeAndRead(messages);
-
+    List<MessageOrBuilder> output = writeAndRead(messages);
     List<Message> outputAsMessages = asMessages(output);
     Descriptors.Descriptor messageDescriptor = Protobufs.getMessageDescriptor(asMessage(messages[0]).getClass());
     Descriptors.FileDescriptor.Syntax syntax = messageDescriptor.getFile().getSyntax();
@@ -117,7 +116,6 @@ public class TestUtils {
     for (MessageOrBuilder messageOrBuilder : mobs) {
       result.add(asMessage(messageOrBuilder));
     }
-
     return result;
   }
 
@@ -161,22 +159,34 @@ public class TestUtils {
    * Reads messages from given file. The file could/should be created by method writeMessages
    */
   public static <T extends MessageOrBuilder> List<T> readMessages(Path file) throws IOException {
-    ProtoParquetReader<T> reader = new ProtoParquetReader<T>(file);
-
-    List<T> result = new ArrayList<T>();
-    boolean hasNext = true;
-    while (hasNext) {
-      T item = reader.read();
-      if (item == null) {
-        hasNext = false;
-      } else {
-        assertNotNull(item);
-        // It makes sense to return message but production code wont work with messages
-        result.add((T) asMessage(item).toBuilder());
+    return readMessages(file, null);
+  }
+
+  /**
+   * Read messages from given file into the expected proto class.
+   * @param file
+   * @param messageClass
+   * @param <T>
+   * @return List of protobuf messages for the given type.
+   */
+  public static <T extends MessageOrBuilder> List<T> readMessages(Path file, Class<T> messageClass) throws IOException {
+    ParquetReader.Builder readerBuilder = ProtoParquetReader.builder(file);
+    if (messageClass != null) {
+      readerBuilder.set(ProtoReadSupport.PB_CLASS, messageClass.getName()).build();
+    }
+    try (ParquetReader reader = readerBuilder.build()) {
+      List<T> result = new ArrayList<T>();
+      boolean hasNext = true;
+      while (hasNext) {
+        T item = (T) reader.read();
+        if (item == null) {
+          hasNext = false;
+        } else {
+          result.add((T) asMessage(item));
+        }
       }
+      return result;
     }
-    reader.close();
-    return result;
   }
 
   /**
diff --git a/parquet-protobuf/src/test/resources/TestProto3SchemaV1.proto b/parquet-protobuf/src/test/resources/TestProto3SchemaV1.proto
new file mode 100644
index 0000000..6c0c8cc
--- /dev/null
+++ b/parquet-protobuf/src/test/resources/TestProto3SchemaV1.proto
@@ -0,0 +1,38 @@
+syntax = "proto3";
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+package TestProto3.Schema;
+
+option java_package = "org.apache.parquet.proto.test";
+
+// For the test of schema evolution
+// This is the "V1" schema, the "V2" (its evolution) is in TestProto3SchemaV2.proto
+message MessageSchema {
+
+    enum LabelNumberPair {
+        UNKNOWN_VALUE = 0;
+        FIRST = 1;
+    }
+
+    LabelNumberPair optionalLabelNumberPair = 1;
+    string optionalString = 2;
+    int32 optionalInt32 = 3;
+
+}
+
diff --git a/parquet-protobuf/src/test/resources/TestProto3SchemaV2.proto b/parquet-protobuf/src/test/resources/TestProto3SchemaV2.proto
new file mode 100644
index 0000000..846f1a3
--- /dev/null
+++ b/parquet-protobuf/src/test/resources/TestProto3SchemaV2.proto
@@ -0,0 +1,40 @@
+syntax = "proto3";
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+package TestProto3.Schema;
+
+option java_package = "org.apache.parquet.proto.test";
+
+// For the test of schema evolution
+// This is the "V2" schema, which is supposed to be an evolution from the "V1" (TestProto3SchemaV1.proto)
+message MessageSchema {
+
+    enum LabelNumberPair {
+        UNKNOWN_VALUE = 0;
+        FIRST = 1;
+        // We added one more value in V2 comparing to V1
+        SECOND = 2;
+    }
+
+    LabelNumberPair optionalLabelNumberPair = 1;
+    string optionalString = 2;
+    int32 optionalInt32 = 3;
+
+}
+
diff --git a/pom.xml b/pom.xml
index 52e1fdb..a84af6e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -391,7 +391,6 @@
           </configuration>
       </plugin>
 
-
       <plugin>
         <!-- Override source and target from the ASF parent -->
         <groupId>org.apache.maven.plugins</groupId>
@@ -402,6 +401,7 @@
           <target>${maven.compiler.target}</target>
         </configuration>
       </plugin>
+
       <plugin>
         <groupId>org.apache.maven.plugins</groupId>
         <artifactId>maven-failsafe-plugin</artifactId>
@@ -415,6 +415,7 @@
           </execution>
         </executions>
       </plugin>
+
       <plugin>
         <groupId>org.apache.maven.plugins</groupId>
         <artifactId>maven-surefire-plugin</artifactId>
@@ -439,6 +440,7 @@
           </excludes>
         </configuration>
       </plugin>
+
       <plugin>
         <groupId>org.codehaus.mojo</groupId>
         <artifactId>buildnumber-maven-plugin</artifactId>
@@ -452,6 +454,7 @@
          </execution>
        </executions>
       </plugin>
+
       <plugin>
         <groupId>org.apache.rat</groupId>
         <artifactId>apache-rat-plugin</artifactId>
@@ -542,7 +545,6 @@
           </execution>
         </executions>
       </plugin>
-
     </plugins>
   </build>