You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@parquet.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2018/09/12 12:15:01 UTC

[jira] [Commented] (PARQUET-1410) Refactor modules to use the new logical type API

    [ https://issues.apache.org/jira/browse/PARQUET-1410?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16612034#comment-16612034 ] 

ASF GitHub Bot commented on PARQUET-1410:
-----------------------------------------

zivanfi closed pull request #520: PARQUET-1410: Refactor modules to use the new logical type API
URL: https://github.com/apache/parquet-mr/pull/520
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/parquet-arrow/src/main/java/org/apache/parquet/arrow/schema/SchemaConverter.java b/parquet-arrow/src/main/java/org/apache/parquet/arrow/schema/SchemaConverter.java
index b0f122ce0..e02b03b5d 100644
--- a/parquet-arrow/src/main/java/org/apache/parquet/arrow/schema/SchemaConverter.java
+++ b/parquet-arrow/src/main/java/org/apache/parquet/arrow/schema/SchemaConverter.java
@@ -19,22 +19,16 @@
 package org.apache.parquet.arrow.schema;
 
 import static java.util.Arrays.asList;
-import static org.apache.parquet.schema.OriginalType.DATE;
-import static org.apache.parquet.schema.OriginalType.DECIMAL;
-import static org.apache.parquet.schema.OriginalType.INTERVAL;
-import static org.apache.parquet.schema.OriginalType.INT_16;
-import static org.apache.parquet.schema.OriginalType.INT_32;
-import static org.apache.parquet.schema.OriginalType.INT_64;
-import static org.apache.parquet.schema.OriginalType.INT_8;
-import static org.apache.parquet.schema.OriginalType.TIMESTAMP_MILLIS;
-import static org.apache.parquet.schema.OriginalType.TIMESTAMP_MICROS;
-import static org.apache.parquet.schema.OriginalType.TIME_MILLIS;
-import static org.apache.parquet.schema.OriginalType.TIME_MICROS;
-import static org.apache.parquet.schema.OriginalType.UINT_16;
-import static org.apache.parquet.schema.OriginalType.UINT_32;
-import static org.apache.parquet.schema.OriginalType.UINT_64;
-import static org.apache.parquet.schema.OriginalType.UINT_8;
-import static org.apache.parquet.schema.OriginalType.UTF8;
+import static java.util.Optional.empty;
+import static java.util.Optional.of;
+import static org.apache.parquet.schema.LogicalTypeAnnotation.TimeUnit.MICROS;
+import static org.apache.parquet.schema.LogicalTypeAnnotation.TimeUnit.MILLIS;
+import static org.apache.parquet.schema.LogicalTypeAnnotation.dateType;
+import static org.apache.parquet.schema.LogicalTypeAnnotation.decimalType;
+import static org.apache.parquet.schema.LogicalTypeAnnotation.intType;
+import static org.apache.parquet.schema.LogicalTypeAnnotation.stringType;
+import static org.apache.parquet.schema.LogicalTypeAnnotation.timeType;
+import static org.apache.parquet.schema.LogicalTypeAnnotation.timestampType;
 import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
 import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BOOLEAN;
 import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.DOUBLE;
@@ -48,6 +42,7 @@
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Optional;
 
 import org.apache.arrow.vector.types.DateUnit;
 import org.apache.arrow.vector.types.FloatingPointPrecision;
@@ -75,10 +70,9 @@
 import org.apache.parquet.arrow.schema.SchemaMapping.StructTypeMapping;
 import org.apache.parquet.arrow.schema.SchemaMapping.TypeMapping;
 import org.apache.parquet.arrow.schema.SchemaMapping.UnionTypeMapping;
-import org.apache.parquet.schema.DecimalMetadata;
 import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.LogicalTypeAnnotation;
 import org.apache.parquet.schema.MessageType;
-import org.apache.parquet.schema.OriginalType;
 import org.apache.parquet.schema.PrimitiveType;
 import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
 import org.apache.parquet.schema.Type;
@@ -180,13 +174,11 @@ public TypeMapping visit(Int type) {
         boolean signed = type.getIsSigned();
         switch (type.getBitWidth()) {
           case 8:
-            return primitive(INT32, signed ? INT_8 : UINT_8);
           case 16:
-            return primitive(INT32, signed ? INT_16 : UINT_16);
           case 32:
-            return primitive(INT32, signed ? INT_32 : UINT_32);
+            return primitive(INT32, intType(type.getBitWidth(), signed));
           case 64:
-            return primitive(INT64, signed ? INT_64 : UINT_64);
+            return primitive(INT64, intType(64, signed));
           default:
             throw new IllegalArgumentException("Illegal int type: " + field);
         }
@@ -209,7 +201,7 @@ public TypeMapping visit(FloatingPoint type) {
 
       @Override
       public TypeMapping visit(Utf8 type) {
-        return primitive(BINARY, UTF8);
+        return primitive(BINARY, stringType());
       }
 
       @Override
@@ -243,7 +235,7 @@ public TypeMapping visit(Decimal type) {
 
       @Override
       public TypeMapping visit(Date type) {
-        return primitive(INT32, DATE);
+        return primitive(INT32, dateType());
       }
 
       @Override
@@ -251,9 +243,9 @@ public TypeMapping visit(Time type) {
         int bitWidth = type.getBitWidth();
         TimeUnit timeUnit = type.getUnit();
         if (bitWidth == 32 && timeUnit == TimeUnit.MILLISECOND) {
-          return primitive(INT32, TIME_MILLIS);
+          return primitive(INT32, timeType(false, MILLIS));
         } else if (bitWidth == 64 && timeUnit == TimeUnit.MICROSECOND) {
-          return primitive(INT64, TIME_MICROS);
+          return primitive(INT64, timeType(false, MICROS));
         }
         throw new UnsupportedOperationException("Unsupported type " + type);
       }
@@ -262,20 +254,25 @@ public TypeMapping visit(Time type) {
       public TypeMapping visit(Timestamp type) {
         TimeUnit timeUnit = type.getUnit();
         if (timeUnit == TimeUnit.MILLISECOND) {
-          return primitive(INT64, TIMESTAMP_MILLIS);
+          return primitive(INT64, timestampType(isUtcNormalized(type), MILLIS));
         } else if (timeUnit == TimeUnit.MICROSECOND) {
-          return primitive(INT64, TIMESTAMP_MICROS);
+          return primitive(INT64, timestampType(isUtcNormalized(type), MICROS));
         }
         throw new UnsupportedOperationException("Unsupported type " + type);
       }
 
+      private boolean isUtcNormalized(Timestamp timestamp) {
+        String timeZone = timestamp.getTimezone();
+        return timeZone != null && !timeZone.isEmpty();
+      }
+
       /**
        * See https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#interval
        */
       @Override
       public TypeMapping visit(Interval type) {
         // TODO(PARQUET-675): fix interval original types
-        return primitiveFLBA(12, INTERVAL);
+        return primitiveFLBA(12, LogicalTypeAnnotation.IntervalLogicalTypeAnnotation.getInstance());
       }
 
       @Override
@@ -288,18 +285,18 @@ private TypeMapping mapping(PrimitiveType parquetType) {
       }
 
       private TypeMapping decimal(PrimitiveTypeName type, int precision, int scale) {
-        return mapping(Types.optional(type).as(DECIMAL).precision(precision).scale(scale).named(fieldName));
+        return mapping(Types.optional(type).as(decimalType(scale, precision)).named(fieldName));
       }
 
       private TypeMapping primitive(PrimitiveTypeName type) {
         return mapping(Types.optional(type).named(fieldName));
       }
 
-      private TypeMapping primitive(PrimitiveTypeName type, OriginalType otype) {
+      private TypeMapping primitive(PrimitiveTypeName type, LogicalTypeAnnotation otype) {
         return mapping(Types.optional(type).as(otype).named(fieldName));
       }
 
-      private TypeMapping primitiveFLBA(int length, OriginalType otype) {
+      private TypeMapping primitiveFLBA(int length, LogicalTypeAnnotation otype) {
         return mapping(Types.optional(FIXED_LEN_BYTE_ARRAY).length(length).as(otype).named(fieldName));
       }
     });
@@ -363,21 +360,21 @@ private TypeMapping fromParquet(Type type, String name, Repetition repetition) {
    * @return the mapping
    */
   private TypeMapping fromParquetGroup(GroupType type, String name) {
-    OriginalType ot = type.getOriginalType();
-    if (ot == null) {
+    LogicalTypeAnnotation logicalType = type.getLogicalTypeAnnotation();
+    if (logicalType == null) {
       List<TypeMapping> typeMappings = fromParquet(type.getFields());
       Field arrowField = new Field(name, type.isRepetition(OPTIONAL), new Struct(), fields(typeMappings));
       return new StructTypeMapping(arrowField, type, typeMappings);
     } else {
-      switch (ot) {
-        case LIST:
+      return logicalType.accept(new LogicalTypeAnnotation.LogicalTypeAnnotationVisitor<TypeMapping>() {
+        @Override
+        public Optional<TypeMapping> visit(LogicalTypeAnnotation.ListLogicalTypeAnnotation listLogicalType) {
           List3Levels list3Levels = new List3Levels(type);
           TypeMapping child = fromParquet(list3Levels.getElement(), null, list3Levels.getElement().getRepetition());
           Field arrowField = new Field(name, type.isRepetition(OPTIONAL), new ArrowType.List(), asList(child.getArrowField()));
-          return new ListTypeMapping(arrowField, list3Levels, child);
-        default:
-          throw new UnsupportedOperationException("Unsupported type " + type);
-      }
+          return of(new ListTypeMapping(arrowField, list3Levels, child));
+        }
+      }).orElseThrow(() -> new UnsupportedOperationException("Unsupported type " + type));
     }
   }
 
@@ -406,92 +403,82 @@ public TypeMapping convertDOUBLE(PrimitiveTypeName primitiveTypeName) throws Run
 
       @Override
       public TypeMapping convertINT32(PrimitiveTypeName primitiveTypeName) throws RuntimeException {
-        OriginalType ot = type.getOriginalType();
-        if (ot == null) {
+        LogicalTypeAnnotation logicalTypeAnnotation = type.getLogicalTypeAnnotation();
+        if (logicalTypeAnnotation == null) {
           return integer(32, true);
         }
-        switch (ot) {
-          case INT_8:
-            return integer(8, true);
-          case INT_16:
-            return integer(16, true);
-          case INT_32:
-            return integer(32, true);
-          case UINT_8:
-            return integer(8, false);
-          case UINT_16:
-            return integer(16, false);
-          case UINT_32:
-            return integer(32, false);
-          case DECIMAL:
-            return decimal(type.getDecimalMetadata());
-          case DATE:
-            return field(new ArrowType.Date(DateUnit.DAY));
-          case TIME_MILLIS:
-            return field(new ArrowType.Time(TimeUnit.MILLISECOND, 32));
-          default:
-          case INT_64:
-          case UINT_64:
-          case UTF8:
-          case ENUM:
-          case BSON:
-          case INTERVAL:
-          case JSON:
-          case LIST:
-          case MAP:
-          case MAP_KEY_VALUE:
-          case TIMESTAMP_MICROS:
-          case TIMESTAMP_MILLIS:
-          case TIME_MICROS:
-            throw new IllegalArgumentException("illegal type " + type);
-        }
+        return logicalTypeAnnotation.accept(new LogicalTypeAnnotation.LogicalTypeAnnotationVisitor<TypeMapping>() {
+          @Override
+          public Optional<TypeMapping> visit(LogicalTypeAnnotation.DecimalLogicalTypeAnnotation decimalLogicalType) {
+            return of(decimal(decimalLogicalType.getPrecision(), decimalLogicalType.getScale()));
+          }
+
+          @Override
+          public Optional<TypeMapping> visit(LogicalTypeAnnotation.DateLogicalTypeAnnotation dateLogicalType) {
+            return of(field(new ArrowType.Date(DateUnit.DAY)));
+          }
+
+          @Override
+          public Optional<TypeMapping> visit(LogicalTypeAnnotation.TimeLogicalTypeAnnotation timeLogicalType) {
+            return timeLogicalType.getUnit() == MILLIS ? of(field(new ArrowType.Time(TimeUnit.MILLISECOND, 32))) : empty();
+          }
+
+          @Override
+          public Optional<TypeMapping> visit(LogicalTypeAnnotation.IntLogicalTypeAnnotation intLogicalType) {
+            if (intLogicalType.getBitWidth() == 64) {
+              return empty();
+            }
+            return of(integer(intLogicalType.getBitWidth(), intLogicalType.isSigned()));
+          }
+        }).orElseThrow(() -> new IllegalArgumentException("illegal type " + type));
       }
 
       @Override
       public TypeMapping convertINT64(PrimitiveTypeName primitiveTypeName) throws RuntimeException {
-        OriginalType ot = type.getOriginalType();
-        if (ot == null) {
+        LogicalTypeAnnotation logicalTypeAnnotation = type.getLogicalTypeAnnotation();
+        if (logicalTypeAnnotation == null) {
           return integer(64, true);
         }
-        switch (ot) {
-          case INT_8:
-            return integer(8, true);
-          case INT_16:
-            return integer(16, true);
-          case INT_32:
-            return integer(32, true);
-          case INT_64:
-            return integer(64, true);
-          case UINT_8:
-            return integer(8, false);
-          case UINT_16:
-            return integer(16, false);
-          case UINT_32:
-            return integer(32, false);
-          case UINT_64:
-            return integer(64, false);
-          case DECIMAL:
-            return decimal(type.getDecimalMetadata());
-          case DATE:
-            return field(new ArrowType.Date(DateUnit.DAY));
-          case TIMESTAMP_MICROS:
-            return field(new ArrowType.Timestamp(TimeUnit.MICROSECOND, "UTC"));
-          case TIMESTAMP_MILLIS:
-            return field(new ArrowType.Timestamp(TimeUnit.MILLISECOND, "UTC"));
-          case TIME_MICROS:
-            return field(new ArrowType.Time(TimeUnit.MICROSECOND, 64));
-          default:
-          case UTF8:
-          case ENUM:
-          case BSON:
-          case INTERVAL:
-          case JSON:
-          case LIST:
-          case MAP:
-          case MAP_KEY_VALUE:
-          case TIME_MILLIS:
-            throw new IllegalArgumentException("illegal type " + type);
-        }
+
+        return logicalTypeAnnotation.accept(new LogicalTypeAnnotation.LogicalTypeAnnotationVisitor<TypeMapping>() {
+          @Override
+          public Optional<TypeMapping> visit(LogicalTypeAnnotation.DateLogicalTypeAnnotation dateLogicalType) {
+            return of(field(new ArrowType.Date(DateUnit.DAY)));
+          }
+
+          @Override
+          public Optional<TypeMapping> visit(LogicalTypeAnnotation.DecimalLogicalTypeAnnotation decimalLogicalType) {
+            return of(decimal(decimalLogicalType.getPrecision(), decimalLogicalType.getScale()));
+          }
+
+          @Override
+          public Optional<TypeMapping> visit(LogicalTypeAnnotation.IntLogicalTypeAnnotation intLogicalType) {
+            return of(integer(intLogicalType.getBitWidth(), intLogicalType.isSigned()));
+          }
+
+          @Override
+          public Optional<TypeMapping> visit(LogicalTypeAnnotation.TimeLogicalTypeAnnotation timeLogicalType) {
+            if (timeLogicalType.getUnit() == MICROS) {
+              return of(field(new ArrowType.Time(TimeUnit.MICROSECOND, 64)));
+            }
+            return empty();
+          }
+
+          @Override
+          public Optional<TypeMapping> visit(LogicalTypeAnnotation.TimestampLogicalTypeAnnotation timestampLogicalType) {
+            switch (timestampLogicalType.getUnit()) {
+              case MICROS:
+                return of(field(new ArrowType.Timestamp(TimeUnit.MICROSECOND, getTimeZone(timestampLogicalType))));
+              case MILLIS:
+                return of(field(new ArrowType.Timestamp(TimeUnit.MILLISECOND, getTimeZone(timestampLogicalType))));
+            }
+            return empty();
+          }
+
+          private String getTimeZone(LogicalTypeAnnotation.TimestampLogicalTypeAnnotation timestampLogicalType) {
+            return timestampLogicalType.isAdjustedToUTC() ? "UTC" : null;
+          }
+        }).orElseThrow(() -> new IllegalArgumentException("illegal type " + type));
       }
 
       @Override
@@ -512,22 +499,25 @@ public TypeMapping convertBOOLEAN(PrimitiveTypeName primitiveTypeName) throws Ru
 
       @Override
       public TypeMapping convertBINARY(PrimitiveTypeName primitiveTypeName) throws RuntimeException {
-        OriginalType ot = type.getOriginalType();
-        if (ot == null) {
+        LogicalTypeAnnotation logicalTypeAnnotation = type.getLogicalTypeAnnotation();
+        if (logicalTypeAnnotation == null) {
           return field(new ArrowType.Binary());
         }
-        switch (ot) {
-          case UTF8:
-            return field(new ArrowType.Utf8());
-          case DECIMAL:
-            return decimal(type.getDecimalMetadata());
-          default:
-            throw new IllegalArgumentException("illegal type " + type);
-        }
+        return logicalTypeAnnotation.accept(new LogicalTypeAnnotation.LogicalTypeAnnotationVisitor<TypeMapping>() {
+          @Override
+          public Optional<TypeMapping> visit(LogicalTypeAnnotation.StringLogicalTypeAnnotation stringLogicalType) {
+            return of(field(new ArrowType.Utf8()));
+          }
+
+          @Override
+          public Optional<TypeMapping> visit(LogicalTypeAnnotation.DecimalLogicalTypeAnnotation decimalLogicalType) {
+            return of(decimal(decimalLogicalType.getPrecision(), decimalLogicalType.getScale()));
+          }
+        }).orElseThrow(() -> new IllegalArgumentException("illegal type " + type));
       }
 
-      private TypeMapping decimal(DecimalMetadata decimalMetadata) {
-        return field(new ArrowType.Decimal(decimalMetadata.getPrecision(), decimalMetadata.getScale()));
+      private TypeMapping decimal(int precision, int scale) {
+        return field(new ArrowType.Decimal(precision, scale));
       }
 
       private TypeMapping integer(int width, boolean signed) {
diff --git a/parquet-arrow/src/test/java/org/apache/parquet/arrow/schema/TestSchemaConverter.java b/parquet-arrow/src/test/java/org/apache/parquet/arrow/schema/TestSchemaConverter.java
index 2d1f028e2..2817de263 100644
--- a/parquet-arrow/src/test/java/org/apache/parquet/arrow/schema/TestSchemaConverter.java
+++ b/parquet-arrow/src/test/java/org/apache/parquet/arrow/schema/TestSchemaConverter.java
@@ -19,6 +19,10 @@
 package org.apache.parquet.arrow.schema;
 
 import static java.util.Arrays.asList;
+import static org.apache.parquet.schema.LogicalTypeAnnotation.TimeUnit.MICROS;
+import static org.apache.parquet.schema.LogicalTypeAnnotation.TimeUnit.MILLIS;
+import static org.apache.parquet.schema.LogicalTypeAnnotation.timeType;
+import static org.apache.parquet.schema.LogicalTypeAnnotation.timestampType;
 import static org.apache.parquet.schema.OriginalType.DATE;
 import static org.apache.parquet.schema.OriginalType.DECIMAL;
 import static org.apache.parquet.schema.OriginalType.INTERVAL;
@@ -64,10 +68,9 @@
 import org.apache.parquet.example.Paper;
 import org.apache.parquet.schema.MessageType;
 import org.apache.parquet.schema.Types;
+import org.junit.Assert;
 import org.junit.Test;
 
-import junit.framework.Assert;
-
 /**
  * @see SchemaConverter
  */
@@ -90,7 +93,10 @@ private static Field field(String name, ArrowType type, Field... children) {
     field("f", new ArrowType.FixedSizeList(1), field(null, new ArrowType.Date(DateUnit.DAY))),
     field("g", new ArrowType.FloatingPoint(FloatingPointPrecision.SINGLE)),
     field("h", new ArrowType.Timestamp(TimeUnit.MILLISECOND, "UTC")),
-    field("i", new ArrowType.Interval(IntervalUnit.DAY_TIME))
+    field("j", new ArrowType.Timestamp(TimeUnit.MILLISECOND, null)),
+    field("k", new ArrowType.Timestamp(TimeUnit.MICROSECOND, "UTC")),
+    field("l", new ArrowType.Timestamp(TimeUnit.MICROSECOND, null)),
+    field("m", new ArrowType.Interval(IntervalUnit.DAY_TIME))
   ));
   private final MessageType complexParquetSchema = Types.buildMessage()
     .addField(Types.optional(INT32).as(INT_8).named("a"))
@@ -105,8 +111,11 @@ private static Field field(String name, ArrowType type, Field... children) {
       setElementType(Types.optional(INT32).as(DATE).named("element"))
       .named("f"))
     .addField(Types.optional(FLOAT).named("g"))
-    .addField(Types.optional(INT64).as(TIMESTAMP_MILLIS).named("h"))
-    .addField(Types.optional(FIXED_LEN_BYTE_ARRAY).length(12).as(INTERVAL).named("i"))
+    .addField(Types.optional(INT64).as(timestampType(true, MILLIS)).named("h"))
+    .addField(Types.optional(INT64).as(timestampType(false, MILLIS)).named("j"))
+    .addField(Types.optional(INT64).as(timestampType(true, MICROS)).named("k"))
+    .addField(Types.optional(INT64).as(timestampType(false, MICROS)).named("l"))
+    .addField(Types.optional(FIXED_LEN_BYTE_ARRAY).length(12).as(INTERVAL).named("m"))
     .named("root");
 
   private final Schema allTypesArrowSchema = new Schema(asList(
@@ -169,7 +178,7 @@ private static Field field(String name, ArrowType type, Field... children) {
     .addField(Types.optional(INT64).as(DECIMAL).precision(15).scale(5).named("k1"))
     .addField(Types.optional(BINARY).as(DECIMAL).precision(25).scale(5).named("k2"))
     .addField(Types.optional(INT32).as(DATE).named("l"))
-    .addField(Types.optional(INT32).as(TIME_MILLIS).named("m"))
+    .addField(Types.optional(INT32).as(timeType(false, MILLIS)).named("m"))
     .addField(Types.optional(INT64).as(TIMESTAMP_MILLIS).named("n"))
     .addField(Types.optional(FIXED_LEN_BYTE_ARRAY).length(12).as(INTERVAL).named("o"))
     .addField(Types.optional(FIXED_LEN_BYTE_ARRAY).length(12).as(INTERVAL).named("o1"))
@@ -365,7 +374,8 @@ public void testArrowTimeMillisecondToParquet() {
     MessageType expected = converter.fromArrow(new Schema(asList(
       field("a", new ArrowType.Time(TimeUnit.MILLISECOND, 32))
     ))).getParquetSchema();
-    Assert.assertEquals(expected, Types.buildMessage().addField(Types.optional(INT32).as(TIME_MILLIS).named("a")).named("root"));
+    Assert.assertEquals(expected,
+      Types.buildMessage().addField(Types.optional(INT32).as(timeType(false, MILLIS)).named("a")).named("root"));
   }
 
   @Test
@@ -373,7 +383,8 @@ public void testArrowTimeMicrosecondToParquet() {
     MessageType expected = converter.fromArrow(new Schema(asList(
       field("a", new ArrowType.Time(TimeUnit.MICROSECOND, 64))
     ))).getParquetSchema();
-    Assert.assertEquals(expected, Types.buildMessage().addField(Types.optional(INT64).as(TIME_MICROS).named("a")).named("root"));
+    Assert.assertEquals(expected,
+      Types.buildMessage().addField(Types.optional(INT64).as(timeType(false, MICROS)).named("a")).named("root"));
   }
 
   @Test(expected = UnsupportedOperationException.class)
diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java
index 1bb12b983..558446e6b 100644
--- a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java
+++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java
@@ -24,10 +24,9 @@
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.parquet.schema.ConversionPatterns;
-import org.apache.parquet.schema.DecimalMetadata;
 import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.LogicalTypeAnnotation;
 import org.apache.parquet.schema.MessageType;
-import org.apache.parquet.schema.OriginalType;
 import org.apache.parquet.schema.PrimitiveType;
 import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
 import org.apache.parquet.schema.Type;
@@ -36,11 +35,21 @@
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import java.util.Optional;
 
+import static java.util.Optional.empty;
+import static java.util.Optional.of;
 import static org.apache.avro.JsonProperties.NULL_VALUE;
 import static org.apache.parquet.avro.AvroWriteSupport.WRITE_OLD_LIST_STRUCTURE;
 import static org.apache.parquet.avro.AvroWriteSupport.WRITE_OLD_LIST_STRUCTURE_DEFAULT;
-import static org.apache.parquet.schema.OriginalType.*;
+import static org.apache.parquet.schema.LogicalTypeAnnotation.TimeUnit.MICROS;
+import static org.apache.parquet.schema.LogicalTypeAnnotation.TimeUnit.MILLIS;
+import static org.apache.parquet.schema.LogicalTypeAnnotation.dateType;
+import static org.apache.parquet.schema.LogicalTypeAnnotation.decimalType;
+import static org.apache.parquet.schema.LogicalTypeAnnotation.enumType;
+import static org.apache.parquet.schema.LogicalTypeAnnotation.stringType;
+import static org.apache.parquet.schema.LogicalTypeAnnotation.timeType;
+import static org.apache.parquet.schema.LogicalTypeAnnotation.timestampType;
 import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.*;
 import static org.apache.parquet.schema.Type.Repetition.REPEATED;
 
@@ -147,11 +156,11 @@ private Type convertField(String fieldName, Schema schema, Type.Repetition repet
     } else if (type.equals(Schema.Type.BYTES)) {
       builder = Types.primitive(BINARY, repetition);
     } else if (type.equals(Schema.Type.STRING)) {
-      builder = Types.primitive(BINARY, repetition).as(UTF8);
+      builder = Types.primitive(BINARY, repetition).as(stringType());
     } else if (type.equals(Schema.Type.RECORD)) {
       return new GroupType(repetition, fieldName, convertFields(schema.getFields()));
     } else if (type.equals(Schema.Type.ENUM)) {
-      builder = Types.primitive(BINARY, repetition).as(ENUM);
+      builder = Types.primitive(BINARY, repetition).as(enumType());
     } else if (type.equals(Schema.Type.ARRAY)) {
       if (writeOldListStructure) {
         return ConversionPatterns.listType(repetition, fieldName,
@@ -178,12 +187,10 @@ private Type convertField(String fieldName, Schema schema, Type.Repetition repet
     LogicalType logicalType = schema.getLogicalType();
     if (logicalType != null) {
       if (logicalType instanceof LogicalTypes.Decimal) {
-        builder = builder.as(DECIMAL)
-            .precision(((LogicalTypes.Decimal) logicalType).getPrecision())
-            .scale(((LogicalTypes.Decimal) logicalType).getScale());
-
+        LogicalTypes.Decimal decimal = (LogicalTypes.Decimal) logicalType;
+        builder = builder.as(decimalType(decimal.getScale(), decimal.getPrecision()));
       } else {
-        OriginalType annotation = convertLogicalType(logicalType);
+        LogicalTypeAnnotation annotation = convertLogicalType(logicalType);
         if (annotation != null) {
           builder.as(annotation);
         }
@@ -267,7 +274,7 @@ private Schema convertField(final Type parquetType) {
       final PrimitiveType asPrimitive = parquetType.asPrimitiveType();
       final PrimitiveTypeName parquetPrimitiveTypeName =
           asPrimitive.getPrimitiveTypeName();
-      final OriginalType annotation = parquetType.getOriginalType();
+      final LogicalTypeAnnotation annotation = parquetType.getLogicalTypeAnnotation();
       Schema schema = parquetPrimitiveTypeName.convert(
           new PrimitiveType.PrimitiveTypeNameConverter<Schema, RuntimeException>() {
             @Override
@@ -301,7 +308,8 @@ public Schema convertFIXED_LEN_BYTE_ARRAY(PrimitiveTypeName primitiveTypeName) {
             }
             @Override
             public Schema convertBINARY(PrimitiveTypeName primitiveTypeName) {
-              if (annotation == OriginalType.UTF8 || annotation == OriginalType.ENUM) {
+              if (annotation instanceof LogicalTypeAnnotation.StringLogicalTypeAnnotation ||
+                annotation instanceof  LogicalTypeAnnotation.EnumLogicalTypeAnnotation) {
                 return Schema.create(Schema.Type.STRING);
               } else {
                 return Schema.create(Schema.Type.BYTES);
@@ -309,9 +317,8 @@ public Schema convertBINARY(PrimitiveTypeName primitiveTypeName) {
             }
           });
 
-      LogicalType logicalType = convertOriginalType(
-          annotation, asPrimitive.getDecimalMetadata());
-      if (logicalType != null && (annotation != DECIMAL ||
+      LogicalType logicalType = convertLogicalType(annotation);
+      if (logicalType != null && (!(annotation instanceof LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) ||
           parquetPrimitiveTypeName == BINARY ||
           parquetPrimitiveTypeName == FIXED_LEN_BYTE_ARRAY)) {
         schema = logicalType.addToSchema(schema);
@@ -321,10 +328,11 @@ public Schema convertBINARY(PrimitiveTypeName primitiveTypeName) {
 
     } else {
       GroupType parquetGroupType = parquetType.asGroupType();
-      OriginalType originalType = parquetGroupType.getOriginalType();
-      if (originalType != null) {
-        switch(originalType) {
-          case LIST:
+      LogicalTypeAnnotation logicalTypeAnnotation = parquetGroupType.getLogicalTypeAnnotation();
+      if (logicalTypeAnnotation != null) {
+        return logicalTypeAnnotation.accept(new LogicalTypeAnnotation.LogicalTypeAnnotationVisitor<Schema>() {
+          @Override
+          public Optional<Schema> visit(LogicalTypeAnnotation.ListLogicalTypeAnnotation listLogicalType) {
             if (parquetGroupType.getFieldCount()!= 1) {
               throw new UnsupportedOperationException("Invalid list type " + parquetGroupType);
             }
@@ -334,17 +342,29 @@ public Schema convertBINARY(PrimitiveTypeName primitiveTypeName) {
             }
             if (isElementType(repeatedType, parquetGroupType.getName())) {
               // repeated element types are always required
-              return Schema.createArray(convertField(repeatedType));
+              return of(Schema.createArray(convertField(repeatedType)));
             } else {
               Type elementType = repeatedType.asGroupType().getType(0);
               if (elementType.isRepetition(Type.Repetition.OPTIONAL)) {
-                return Schema.createArray(optional(convertField(elementType)));
+                return of(Schema.createArray(optional(convertField(elementType))));
               } else {
-                return Schema.createArray(convertField(elementType));
+                return of(Schema.createArray(convertField(elementType)));
               }
             }
-          case MAP_KEY_VALUE: // for backward-compatibility
-          case MAP:
+          }
+
+          @Override
+          // for backward-compatibility
+          public Optional<Schema> visit(LogicalTypeAnnotation.MapKeyValueTypeAnnotation mapKeyValueLogicalType) {
+            return visitMapOrMapKeyValue();
+          }
+
+          @Override
+          public Optional<Schema> visit(LogicalTypeAnnotation.MapLogicalTypeAnnotation mapLogicalType) {
+            return visitMapOrMapKeyValue();
+          }
+
+          private Optional<Schema> visitMapOrMapKeyValue() {
             if (parquetGroupType.getFieldCount() != 1 || parquetGroupType.getType(0).isPrimitive()) {
               throw new UnsupportedOperationException("Invalid map type " + parquetGroupType);
             }
@@ -356,24 +376,23 @@ public Schema convertBINARY(PrimitiveTypeName primitiveTypeName) {
             Type keyType = mapKeyValType.getType(0);
             if (!keyType.isPrimitive() ||
                 !keyType.asPrimitiveType().getPrimitiveTypeName().equals(PrimitiveTypeName.BINARY) ||
-                !keyType.getOriginalType().equals(OriginalType.UTF8)) {
+                !keyType.getLogicalTypeAnnotation().equals(stringType())) {
               throw new IllegalArgumentException("Map key type must be binary (UTF8): "
                   + keyType);
             }
             Type valueType = mapKeyValType.getType(1);
             if (valueType.isRepetition(Type.Repetition.OPTIONAL)) {
-              return Schema.createMap(optional(convertField(valueType)));
+              return of(Schema.createMap(optional(convertField(valueType))));
             } else {
-              return Schema.createMap(convertField(valueType));
+              return of(Schema.createMap(convertField(valueType)));
             }
-          case ENUM:
-            return Schema.create(Schema.Type.STRING);
-          case UTF8:
-          default:
-            throw new UnsupportedOperationException("Cannot convert Parquet type " +
-                parquetType);
+          }
 
-        }
+          @Override
+          public Optional<Schema> visit(LogicalTypeAnnotation.EnumLogicalTypeAnnotation enumLogicalType) {
+            return of(Schema.create(Schema.Type.STRING));
+          }
+        }).orElseThrow(() -> new UnsupportedOperationException("Cannot convert Parquet type " + parquetType));
       } else {
         // if no original type then it's a record
         return convertFields(parquetGroupType.getName(), parquetGroupType.getFields());
@@ -381,44 +400,65 @@ public Schema convertBINARY(PrimitiveTypeName primitiveTypeName) {
     }
   }
 
-  private OriginalType convertLogicalType(LogicalType logicalType) {
+  private LogicalTypeAnnotation convertLogicalType(LogicalType logicalType) {
     if (logicalType == null) {
       return null;
     } else if (logicalType instanceof LogicalTypes.Decimal) {
-      return OriginalType.DECIMAL;
+      LogicalTypes.Decimal decimal = (LogicalTypes.Decimal) logicalType;
+      return decimalType(decimal.getScale(), decimal.getPrecision());
     } else if (logicalType instanceof LogicalTypes.Date) {
-      return OriginalType.DATE;
+      return dateType();
     } else if (logicalType instanceof LogicalTypes.TimeMillis) {
-      return OriginalType.TIME_MILLIS;
+      return timeType(true, MILLIS);
     } else if (logicalType instanceof LogicalTypes.TimeMicros) {
-      return OriginalType.TIME_MICROS;
+      return timeType(true, MICROS);
     } else if (logicalType instanceof LogicalTypes.TimestampMillis) {
-      return OriginalType.TIMESTAMP_MILLIS;
+      return timestampType(true, MILLIS);
     } else if (logicalType instanceof LogicalTypes.TimestampMicros) {
-      return OriginalType.TIMESTAMP_MICROS;
+      return timestampType(true, MICROS);
     }
     return null;
   }
 
-  private LogicalType convertOriginalType(OriginalType annotation, DecimalMetadata meta) {
+  private LogicalType convertLogicalType(LogicalTypeAnnotation annotation) {
     if (annotation == null) {
       return null;
     }
-    switch (annotation) {
-      case DECIMAL:
-        return LogicalTypes.decimal(meta.getPrecision(), meta.getScale());
-      case DATE:
-        return LogicalTypes.date();
-      case TIME_MILLIS:
-        return LogicalTypes.timeMillis();
-      case TIME_MICROS:
-        return LogicalTypes.timeMicros();
-      case TIMESTAMP_MILLIS:
-        return LogicalTypes.timestampMillis();
-      case TIMESTAMP_MICROS:
-        return LogicalTypes.timestampMicros();
-    }
-    return null;
+    return annotation.accept(new LogicalTypeAnnotation.LogicalTypeAnnotationVisitor<LogicalType>() {
+      @Override
+      public Optional<LogicalType> visit(LogicalTypeAnnotation.DecimalLogicalTypeAnnotation decimalLogicalType) {
+        return of(LogicalTypes.decimal(decimalLogicalType.getPrecision(), decimalLogicalType.getScale()));
+      }
+
+      @Override
+      public Optional<LogicalType> visit(LogicalTypeAnnotation.DateLogicalTypeAnnotation dateLogicalType) {
+        return of(LogicalTypes.date());
+      }
+
+      @Override
+      public Optional<LogicalType> visit(LogicalTypeAnnotation.TimeLogicalTypeAnnotation timeLogicalType) {
+        LogicalTypeAnnotation.TimeUnit unit = timeLogicalType.getUnit();
+        switch (unit) {
+          case MILLIS:
+            return of(LogicalTypes.timeMillis());
+          case MICROS:
+            return of(LogicalTypes.timeMicros());
+        }
+        return empty();
+      }
+
+      @Override
+      public Optional<LogicalType> visit(LogicalTypeAnnotation.TimestampLogicalTypeAnnotation timestampLogicalType) {
+        LogicalTypeAnnotation.TimeUnit unit = timestampLogicalType.getUnit();
+        switch (unit) {
+          case MILLIS:
+            return of(LogicalTypes.timestampMillis());
+          case MICROS:
+            return of(LogicalTypes.timestampMicros());
+        }
+        return empty();
+      }
+    }).orElse(null);
   }
 
   /**
diff --git a/parquet-avro/src/test/java/org/apache/parquet/avro/TestAvroSchemaConverter.java b/parquet-avro/src/test/java/org/apache/parquet/avro/TestAvroSchemaConverter.java
index 942e3b137..bfaeec3d6 100644
--- a/parquet-avro/src/test/java/org/apache/parquet/avro/TestAvroSchemaConverter.java
+++ b/parquet-avro/src/test/java/org/apache/parquet/avro/TestAvroSchemaConverter.java
@@ -1,4 +1,4 @@
-/* 
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -616,7 +616,7 @@ public void testTimeMillisType() throws Exception {
 
     testRoundTripConversion(expected,
         "message myrecord {\n" +
-            "  required int32 time (TIME_MILLIS);\n" +
+            "  required int32 time (TIME(MILLIS,true));\n" +
             "}\n");
 
     for (PrimitiveTypeName primitive : new PrimitiveTypeName[]
@@ -646,7 +646,7 @@ public void testTimeMicrosType() throws Exception {
 
     testRoundTripConversion(expected,
         "message myrecord {\n" +
-            "  required int64 time (TIME_MICROS);\n" +
+            "  required int64 time (TIME(MICROS,true));\n" +
             "}\n");
 
     for (PrimitiveTypeName primitive : new PrimitiveTypeName[]
@@ -676,7 +676,7 @@ public void testTimestampMillisType() throws Exception {
 
     testRoundTripConversion(expected,
         "message myrecord {\n" +
-            "  required int64 timestamp (TIMESTAMP_MILLIS);\n" +
+            "  required int64 timestamp (TIMESTAMP(MILLIS,true));\n" +
             "}\n");
 
     for (PrimitiveTypeName primitive : new PrimitiveTypeName[]
@@ -706,7 +706,7 @@ public void testTimestampMicrosType() throws Exception {
 
     testRoundTripConversion(expected,
         "message myrecord {\n" +
-            "  required int64 timestamp (TIMESTAMP_MICROS);\n" +
+            "  required int64 timestamp (TIMESTAMP(MICROS,true));\n" +
             "}\n");
 
     for (PrimitiveTypeName primitive : new PrimitiveTypeName[]
diff --git a/parquet-cascading-common23/src/main/java/org/apache/parquet/cascading/convert/TupleConverter.java b/parquet-cascading-common23/src/main/java/org/apache/parquet/cascading/convert/TupleConverter.java
index 3741165b0..4c1240b85 100644
--- a/parquet-cascading-common23/src/main/java/org/apache/parquet/cascading/convert/TupleConverter.java
+++ b/parquet-cascading-common23/src/main/java/org/apache/parquet/cascading/convert/TupleConverter.java
@@ -1,4 +1,4 @@
-/* 
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -27,10 +27,7 @@
 import org.apache.parquet.io.api.PrimitiveConverter;
 import org.apache.parquet.pig.TupleConversionException;
 import org.apache.parquet.schema.GroupType;
-import org.apache.parquet.schema.OriginalType;
-import org.apache.parquet.schema.PrimitiveType;
 import org.apache.parquet.schema.Type;
-import org.apache.parquet.schema.Type.Repetition;
 
 public class TupleConverter extends GroupConverter {
 
diff --git a/parquet-cli/src/main/java/org/apache/parquet/cli/Util.java b/parquet-cli/src/main/java/org/apache/parquet/cli/Util.java
index 98bc1e511..961c7f0c4 100644
--- a/parquet-cli/src/main/java/org/apache/parquet/cli/Util.java
+++ b/parquet-cli/src/main/java/org/apache/parquet/cli/Util.java
@@ -80,7 +80,12 @@ public static String humanReadable(long bytes) {
     }
   }
 
+  @Deprecated
   public static String minMaxAsString(Statistics stats, OriginalType annotation) {
+    return minMaxAsString(stats);
+  }
+
+  public static String minMaxAsString(Statistics stats) {
     if (stats == null) {
       return "no stats";
     }
@@ -90,7 +95,12 @@ public static String minMaxAsString(Statistics stats, OriginalType annotation) {
     return String.format("%s / %s", humanReadable(stats.minAsString(), 30), humanReadable(stats.maxAsString(), 30));
   }
 
+  @Deprecated
   public static String toString(Statistics stats, long count, OriginalType annotation) {
+    return toString(stats, count);
+  }
+
+  public static String toString(Statistics stats, long count) {
     if (stats == null) {
       return "no stats";
     }
diff --git a/parquet-cli/src/main/java/org/apache/parquet/cli/commands/ParquetMetadataCommand.java b/parquet-cli/src/main/java/org/apache/parquet/cli/commands/ParquetMetadataCommand.java
index 54fe6579b..a452369e2 100644
--- a/parquet-cli/src/main/java/org/apache/parquet/cli/commands/ParquetMetadataCommand.java
+++ b/parquet-cli/src/main/java/org/apache/parquet/cli/commands/ParquetMetadataCommand.java
@@ -169,12 +169,12 @@ private void printColumnChunk(Logger console, int width, ColumnChunkMetaData col
       console.info(String.format("%-" + width + "s  FIXED[%d] %s %-7s %-9d %-8s %-7s %s",
           name, type.getTypeLength(), shortCodec(codec), encodingSummary, count,
           humanReadable(perValue), stats == null || !stats.isNumNullsSet() ? "" : String.valueOf(stats.getNumNulls()),
-          minMaxAsString(stats, type.getOriginalType())));
+          minMaxAsString(stats)));
     } else {
       console.info(String.format("%-" + width + "s  %-9s %s %-7s %-9d %-10s %-7s %s",
           name, typeName, shortCodec(codec), encodingSummary, count, humanReadable(perValue),
           stats == null || !stats.isNumNullsSet() ? "" : String.valueOf(stats.getNumNulls()),
-          minMaxAsString(stats, type.getOriginalType())));
+          minMaxAsString(stats)));
     }
   }
 }
diff --git a/parquet-cli/src/main/java/org/apache/parquet/cli/commands/ShowDictionaryCommand.java b/parquet-cli/src/main/java/org/apache/parquet/cli/commands/ShowDictionaryCommand.java
index db427c9c7..20a694ff7 100644
--- a/parquet-cli/src/main/java/org/apache/parquet/cli/commands/ShowDictionaryCommand.java
+++ b/parquet-cli/src/main/java/org/apache/parquet/cli/commands/ShowDictionaryCommand.java
@@ -30,8 +30,8 @@
 import org.apache.parquet.column.page.DictionaryPage;
 import org.apache.parquet.column.page.DictionaryPageReadStore;
 import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.schema.LogicalTypeAnnotation;
 import org.apache.parquet.schema.MessageType;
-import org.apache.parquet.schema.OriginalType;
 import org.apache.parquet.schema.PrimitiveType;
 import org.slf4j.Logger;
 import java.io.IOException;
@@ -81,7 +81,7 @@ public int run() throws IOException {
       for (int i = 0; i <= dict.getMaxId(); i += 1) {
         switch(type.getPrimitiveTypeName()) {
           case BINARY:
-            if (type.getOriginalType() == OriginalType.UTF8) {
+            if (type.getLogicalTypeAnnotation() instanceof LogicalTypeAnnotation.StringLogicalTypeAnnotation) {
               console.info("{}: {}", String.format("%6d", i),
                   Util.humanReadable(dict.decodeToBinary(i).toStringUsingUTF8(), 70));
             } else {
diff --git a/parquet-cli/src/main/java/org/apache/parquet/cli/commands/ShowPagesCommand.java b/parquet-cli/src/main/java/org/apache/parquet/cli/commands/ShowPagesCommand.java
index 4d0e2c9ba..1ac03aad7 100644
--- a/parquet-cli/src/main/java/org/apache/parquet/cli/commands/ShowPagesCommand.java
+++ b/parquet-cli/src/main/java/org/apache/parquet/cli/commands/ShowPagesCommand.java
@@ -193,7 +193,7 @@ public String visit(DataPageV1 page) {
       int count = page.getValueCount();
       String numNulls = page.getStatistics().isNumNullsSet() ? Long.toString(page.getStatistics().getNumNulls()) : "";
       float perValue = ((float) totalSize) / count;
-      String minMax = minMaxAsString(page.getStatistics(), type.getOriginalType());
+      String minMax = minMaxAsString(page.getStatistics());
       return String.format("%3d-%-3d  %-5s %s %-2s %-7d %-10s %-10s %-8s %-7s %s",
           rowGroupNum, pageNum, "data", shortCodec, enc, count, humanReadable(perValue),
           humanReadable(totalSize), "", numNulls, minMax);
@@ -207,7 +207,7 @@ public String visit(DataPageV2 page) {
       int numRows = page.getRowCount();
       int numNulls = page.getNullCount();
       float perValue = ((float) totalSize) / count;
-      String minMax = minMaxAsString(page.getStatistics(), type.getOriginalType());
+      String minMax = minMaxAsString(page.getStatistics());
       String compression = (page.isCompressed() ? shortCodec : "_");
       return String.format("%3d-%-3d  %-5s %s %-2s %-7d %-10s %-10s %-8d %-7s %s",
           rowGroupNum, pageNum, "data", compression, enc, count, humanReadable(perValue),
diff --git a/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/ValidTypeMap.java b/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/ValidTypeMap.java
index b8f48bb0b..62c174e54 100644
--- a/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/ValidTypeMap.java
+++ b/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/ValidTypeMap.java
@@ -1,4 +1,4 @@
-/* 
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -25,7 +25,6 @@
 
 import org.apache.parquet.filter2.predicate.Operators.Column;
 import org.apache.parquet.hadoop.metadata.ColumnPath;
-import org.apache.parquet.schema.OriginalType;
 import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
 
 /**
diff --git a/parquet-column/src/main/java/org/apache/parquet/schema/ConversionPatterns.java b/parquet-column/src/main/java/org/apache/parquet/schema/ConversionPatterns.java
index 6db1e587c..a530db13c 100644
--- a/parquet-column/src/main/java/org/apache/parquet/schema/ConversionPatterns.java
+++ b/parquet-column/src/main/java/org/apache/parquet/schema/ConversionPatterns.java
@@ -1,4 +1,4 @@
-/* 
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -22,7 +22,7 @@
 import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
 import org.apache.parquet.schema.Type.Repetition;
 
-import static org.apache.parquet.schema.OriginalType.*;
+import static org.apache.parquet.schema.LogicalTypeAnnotation.stringType;
 
 /**
  * Utility functions to convert from Java-like map and list types
@@ -37,15 +37,15 @@
    *
    * @param repetition   repetition for the list or map
    * @param alias        name of the field
-   * @param originalType original type for the list or map
+   * @param logicalTypeAnnotation logical type for the list or map
    * @param nested       the nested repeated field
    * @return a group type
    */
-  private static GroupType listWrapper(Repetition repetition, String alias, OriginalType originalType, Type nested) {
+  private static GroupType listWrapper(Repetition repetition, String alias, LogicalTypeAnnotation logicalTypeAnnotation, Type nested) {
     if (!nested.isRepetition(Repetition.REPEATED)) {
       throw new IllegalArgumentException("Nested type should be repeated: " + nested);
     }
-    return new GroupType(repetition, alias, originalType, nested);
+    return new GroupType(repetition, alias, logicalTypeAnnotation, nested);
   }
 
   public static GroupType mapType(Repetition repetition, String alias, Type keyType, Type valueType) {
@@ -53,7 +53,7 @@ public static GroupType mapType(Repetition repetition, String alias, Type keyTyp
   }
 
   public static GroupType stringKeyMapType(Repetition repetition, String alias, String mapAlias, Type valueType) {
-    return mapType(repetition, alias, mapAlias, new PrimitiveType(Repetition.REQUIRED, PrimitiveTypeName.BINARY, "key", OriginalType.UTF8), valueType);
+    return mapType(repetition, alias, mapAlias, new PrimitiveType(Repetition.REQUIRED, PrimitiveTypeName.BINARY, "key", stringType()), valueType);
   }
 
   public static GroupType stringKeyMapType(Repetition repetition, String alias, Type valueType) {
@@ -66,11 +66,11 @@ public static GroupType mapType(Repetition repetition, String alias, String mapA
       return listWrapper(
               repetition,
               alias,
-              MAP,
+              LogicalTypeAnnotation.mapType(),
               new GroupType(
                       Repetition.REPEATED,
                       mapAlias,
-                      MAP_KEY_VALUE,
+                      LogicalTypeAnnotation.MapKeyValueTypeAnnotation.getInstance(),
                       keyType)
       );
     } else {
@@ -80,11 +80,11 @@ public static GroupType mapType(Repetition repetition, String alias, String mapA
       return listWrapper(
               repetition,
               alias,
-              MAP,
+              LogicalTypeAnnotation.mapType(),
               new GroupType(
                       Repetition.REPEATED,
                       mapAlias,
-                      MAP_KEY_VALUE,
+                      LogicalTypeAnnotation.MapKeyValueTypeAnnotation.getInstance(),
                       keyType,
                       valueType)
       );
@@ -103,7 +103,7 @@ public static GroupType listType(Repetition repetition, String alias, Type neste
     return listWrapper(
             repetition,
             alias,
-            LIST,
+            LogicalTypeAnnotation.listType(),
             nestedType
     );
   }
@@ -125,7 +125,7 @@ public static GroupType listOfElements(Repetition listRepetition, String name, T
     return listWrapper(
         listRepetition,
         name,
-        LIST,
+        LogicalTypeAnnotation.listType(),
         new GroupType(Repetition.REPEATED, "list", elementType)
     );
   }
diff --git a/parquet-column/src/main/java/org/apache/parquet/schema/GroupType.java b/parquet-column/src/main/java/org/apache/parquet/schema/GroupType.java
index 5cb40e5e3..64e706295 100644
--- a/parquet-column/src/main/java/org/apache/parquet/schema/GroupType.java
+++ b/parquet-column/src/main/java/org/apache/parquet/schema/GroupType.java
@@ -67,6 +67,16 @@ public GroupType(Repetition repetition, String name, OriginalType originalType,
     this(repetition, name, originalType, Arrays.asList(fields));
   }
 
+  /**
+   * @param repetition OPTIONAL, REPEATED, REQUIRED
+   * @param name the name of the field
+   * @param logicalTypeAnnotation (optional) the logical type to help with cross schema conversion (LIST, MAP, ...)
+   * @param fields the contained fields
+   */
+  GroupType(Repetition repetition, String name, LogicalTypeAnnotation logicalTypeAnnotation, Type... fields) {
+    this(repetition, name, logicalTypeAnnotation, Arrays.asList(fields));
+  }
+
   /**
    * @param repetition OPTIONAL, REPEATED, REQUIRED
    * @param name the name of the field
@@ -78,6 +88,16 @@ public GroupType(Repetition repetition, String name, OriginalType originalType,
     this(repetition, name, originalType, fields, null);
   }
 
+  /**
+   * @param repetition OPTIONAL, REPEATED, REQUIRED
+   * @param name the name of the field
+   * @param logicalTypeAnnotation (optional) the logical type to help with cross schema conversion (LIST, MAP, ...)
+   * @param fields the contained fields
+   */
+  GroupType(Repetition repetition, String name, LogicalTypeAnnotation logicalTypeAnnotation, List<Type> fields) {
+    this(repetition, name, logicalTypeAnnotation, fields, null);
+  }
+
   /**
    * @param repetition OPTIONAL, REPEATED, REQUIRED
    * @param name the name of the field
@@ -109,7 +129,7 @@ public GroupType(Repetition repetition, String name, OriginalType originalType,
    */
   @Override
   public GroupType withId(int id) {
-    return new GroupType(getRepetition(), getName(), getOriginalType(), fields, new ID(id));
+    return new GroupType(getRepetition(), getName(), getLogicalTypeAnnotation(), fields, new ID(id));
   }
 
   /**
@@ -117,7 +137,7 @@ public GroupType withId(int id) {
    * @return a group with the same attributes and new fields.
    */
   public GroupType withNewFields(List<Type> newFields) {
-    return new GroupType(getRepetition(), getName(), getOriginalType(), newFields, getId());
+    return new GroupType(getRepetition(), getName(), getLogicalTypeAnnotation(), newFields, getId());
   }
 
   /**
@@ -219,7 +239,7 @@ public void writeToStringBuilder(StringBuilder sb, String indent) {
         .append(getRepetition().name().toLowerCase(Locale.ENGLISH))
         .append(" group ")
         .append(getName())
-        .append(getOriginalType() == null ? "" : " (" + getOriginalType() +")")
+        .append(getLogicalTypeAnnotation() == null ? "" : " (" + getLogicalTypeAnnotation().toString() +")")
         .append(getId() == null ? "" : " = " + getId())
         .append(" {\n");
     membersDisplayString(sb, indent + "  ");
@@ -250,7 +270,7 @@ protected boolean typeEquals(Type other) {
    */
   @Override
   public int hashCode() {
-    return Objects.hash(getOriginalType(), getFields());
+    return Objects.hash(getLogicalTypeAnnotation(), getFields());
   }
 
   /**
@@ -261,7 +281,7 @@ protected boolean equals(Type otherType) {
     return
         !otherType.isPrimitive()
         && super.equals(otherType)
-        && getOriginalType() == otherType.getOriginalType()
+        && Objects.equals(getLogicalTypeAnnotation(),otherType.getLogicalTypeAnnotation())
         && getFields().equals(otherType.asGroupType().getFields());
   }
 
@@ -355,7 +375,7 @@ protected Type union(Type toMerge, boolean strict) {
     if (toMerge.isPrimitive()) {
       throw new IncompatibleSchemaModificationException("can not merge primitive type " + toMerge + " into group type " + this);
     }
-    return new GroupType(toMerge.getRepetition(), getName(), toMerge.getOriginalType(), mergeFields(toMerge.asGroupType()), getId());
+    return new GroupType(toMerge.getRepetition(), getName(), toMerge.getLogicalTypeAnnotation(), mergeFields(toMerge.asGroupType()), getId());
   }
 
   /**
@@ -383,8 +403,8 @@ protected Type union(Type toMerge, boolean strict) {
         if (fieldToMerge.getRepetition().isMoreRestrictiveThan(type.getRepetition())) {
           throw new IncompatibleSchemaModificationException("repetition constraint is more restrictive: can not merge type " + fieldToMerge + " into " + type);
         }
-        if (type.getOriginalType() != null && fieldToMerge.getOriginalType() != type.getOriginalType()) {
-          throw new IncompatibleSchemaModificationException("cannot merge original type " + fieldToMerge.getOriginalType() + " into " + type.getOriginalType());
+        if (type.getLogicalTypeAnnotation() != null && !type.getLogicalTypeAnnotation().equals(fieldToMerge.getLogicalTypeAnnotation())) {
+          throw new IncompatibleSchemaModificationException("cannot merge logical type " + fieldToMerge.getLogicalTypeAnnotation() + " into " + type.getLogicalTypeAnnotation());
         }
         merged = type.union(fieldToMerge, strict);
       } else {
diff --git a/parquet-column/src/main/java/org/apache/parquet/schema/LogicalTypeAnnotation.java b/parquet-column/src/main/java/org/apache/parquet/schema/LogicalTypeAnnotation.java
index 340a24af1..6046a3931 100644
--- a/parquet-column/src/main/java/org/apache/parquet/schema/LogicalTypeAnnotation.java
+++ b/parquet-column/src/main/java/org/apache/parquet/schema/LogicalTypeAnnotation.java
@@ -20,12 +20,18 @@
 
 import org.apache.parquet.Preconditions;
 
+import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Objects;
 import java.util.Optional;
+import java.util.Set;
 import java.util.function.Supplier;
 
+import static java.util.Arrays.asList;
 import static java.util.Optional.empty;
+import static org.apache.parquet.schema.ColumnOrder.ColumnOrderName.TYPE_DEFINED_ORDER;
+import static org.apache.parquet.schema.ColumnOrder.ColumnOrderName.UNDEFINED;
 
 public abstract class LogicalTypeAnnotation {
   enum LogicalTypeToken {
@@ -144,6 +150,10 @@ String typeParametersAsString() {
     return "";
   }
 
+  boolean isValidColumnOrder(ColumnOrder columnOrder) {
+    return columnOrder.getColumnOrderName() == UNDEFINED || columnOrder.getColumnOrderName() == TYPE_DEFINED_ORDER;
+  }
+
   @Override
   public String toString() {
     StringBuilder sb = new StringBuilder();
@@ -152,6 +162,10 @@ public String toString() {
     return sb.toString();
   }
 
+  PrimitiveStringifier valueStringifier(PrimitiveType primitiveType) {
+    throw new UnsupportedOperationException("Stringifier is not supported for the logical type: " + this);
+  }
+
   /**
    * Helper method to convert the old representation of logical types (OriginalType) to new logical type.
    */
@@ -290,6 +304,11 @@ public int hashCode() {
       // This type doesn't have any parameters, thus using class hashcode
       return getClass().hashCode();
     }
+
+    @Override
+    PrimitiveStringifier valueStringifier(PrimitiveType primitiveType) {
+      return PrimitiveStringifier.UTF8_STRINGIFIER;
+    }
   }
 
   public static class MapLogicalTypeAnnotation extends LogicalTypeAnnotation {
@@ -389,15 +408,22 @@ public int hashCode() {
       // This type doesn't have any parameters, thus using class hashcode
       return getClass().hashCode();
     }
+
+    @Override
+    PrimitiveStringifier valueStringifier(PrimitiveType primitiveType) {
+      return PrimitiveStringifier.UTF8_STRINGIFIER;
+    }
   }
 
   public static class DecimalLogicalTypeAnnotation extends LogicalTypeAnnotation {
+    private final PrimitiveStringifier stringifier;
     private final int scale;
     private final int precision;
 
     private DecimalLogicalTypeAnnotation(int scale, int precision) {
       this.scale = scale;
       this.precision = precision;
+      stringifier = PrimitiveStringifier.createDecimalStringifier(scale);
     }
 
     public int getPrecision() {
@@ -447,6 +473,11 @@ public boolean equals(Object obj) {
     public int hashCode() {
       return Objects.hash(scale, precision);
     }
+
+    @Override
+    PrimitiveStringifier valueStringifier(PrimitiveType primitiveType) {
+      return stringifier;
+    }
   }
 
   public static class DateLogicalTypeAnnotation extends LogicalTypeAnnotation {
@@ -480,6 +511,11 @@ public int hashCode() {
       // This type doesn't have any parameters, thus using class hashcode
       return getClass().hashCode();
     }
+
+    @Override
+    PrimitiveStringifier valueStringifier(PrimitiveType primitiveType) {
+      return PrimitiveStringifier.DATE_STRINGIFIER;
+    }
   }
 
   public enum TimeUnit {
@@ -550,6 +586,11 @@ public boolean equals(Object obj) {
     public int hashCode() {
       return Objects.hash(isAdjustedToUTC, unit);
     }
+
+    @Override
+    PrimitiveStringifier valueStringifier(PrimitiveType primitiveType) {
+      return PrimitiveStringifier.TIME_STRINGIFIER;
+    }
   }
 
   public static class TimestampLogicalTypeAnnotation extends LogicalTypeAnnotation {
@@ -615,14 +656,31 @@ public boolean equals(Object obj) {
     public int hashCode() {
       return Objects.hash(isAdjustedToUTC, unit);
     }
+
+    @Override
+    PrimitiveStringifier valueStringifier(PrimitiveType primitiveType) {
+      switch (unit) {
+        case MICROS:
+          return PrimitiveStringifier.TIMESTAMP_MICROS_STRINGIFIER;
+        case MILLIS:
+          return PrimitiveStringifier.TIMESTAMP_MILLIS_STRINGIFIER;
+        default:
+          return super.valueStringifier(primitiveType);
+      }
+    }
   }
 
   public static class IntLogicalTypeAnnotation extends LogicalTypeAnnotation {
+    private static final Set<Integer> VALID_BIT_WIDTH = Collections.unmodifiableSet(
+      new HashSet<>(asList(8, 16, 32, 64)));
+
     private final int bitWidth;
     private final boolean isSigned;
 
-
     private IntLogicalTypeAnnotation(int bitWidth, boolean isSigned) {
+      if (!VALID_BIT_WIDTH.contains(bitWidth)) {
+        throw new IllegalArgumentException("Invalid integer bit width: " + bitWidth);
+      }
       this.bitWidth = bitWidth;
       this.isSigned = isSigned;
     }
@@ -685,6 +743,11 @@ public boolean equals(Object obj) {
     public int hashCode() {
       return Objects.hash(bitWidth, isSigned);
     }
+
+    @Override
+    PrimitiveStringifier valueStringifier(PrimitiveType primitiveType) {
+      return isSigned ? PrimitiveStringifier.DEFAULT_STRINGIFIER : PrimitiveStringifier.UNSIGNED_STRINGIFIER;
+    }
   }
 
   public static class JsonLogicalTypeAnnotation extends LogicalTypeAnnotation {
@@ -718,6 +781,11 @@ public int hashCode() {
       // This type doesn't have any parameters, thus using class hashcode
       return getClass().hashCode();
     }
+
+    @Override
+    PrimitiveStringifier valueStringifier(PrimitiveType primitiveType) {
+      return PrimitiveStringifier.UTF8_STRINGIFIER;
+    }
   }
 
   public static class BsonLogicalTypeAnnotation extends LogicalTypeAnnotation {
@@ -751,6 +819,11 @@ public int hashCode() {
       // This type doesn't have any parameters, thus using class hashcode
       return getClass().hashCode();
     }
+
+    @Override
+    PrimitiveStringifier valueStringifier(PrimitiveType primitiveType) {
+      return PrimitiveStringifier.DEFAULT_STRINGIFIER;
+    }
   }
 
   // This logical type annotation is implemented to support backward compatibility with ConvertedType.
@@ -791,6 +864,16 @@ public int hashCode() {
       // This type doesn't have any parameters, thus using class hashcode
       return getClass().hashCode();
     }
+
+    @Override
+    PrimitiveStringifier valueStringifier(PrimitiveType primitiveType) {
+      return PrimitiveStringifier.INTERVAL_STRINGIFIER;
+    }
+
+    @Override
+    boolean isValidColumnOrder(ColumnOrder columnOrder) {
+      return columnOrder.getColumnOrderName() == UNDEFINED;
+    }
   }
 
   // This logical type annotation is implemented to support backward compatibility with ConvertedType.
@@ -845,55 +928,55 @@ public int hashCode() {
    * or {@link Optional#orElseThrow(Supplier)} to throw exception if omitting a type is not allowed.
    */
   public interface LogicalTypeAnnotationVisitor<T> {
-    default Optional<T> visit(StringLogicalTypeAnnotation logicalTypeAnnotation) {
+    default Optional<T> visit(StringLogicalTypeAnnotation stringLogicalType) {
       return empty();
     }
 
-    default Optional<T> visit(MapLogicalTypeAnnotation logicalTypeAnnotation) {
+    default Optional<T> visit(MapLogicalTypeAnnotation mapLogicalType) {
       return empty();
     }
 
-    default Optional<T> visit(ListLogicalTypeAnnotation logicalTypeAnnotation) {
+    default Optional<T> visit(ListLogicalTypeAnnotation listLogicalType) {
       return empty();
     }
 
-    default Optional<T> visit(EnumLogicalTypeAnnotation logicalTypeAnnotation) {
+    default Optional<T> visit(EnumLogicalTypeAnnotation enumLogicalType) {
       return empty();
     }
 
-    default Optional<T> visit(DecimalLogicalTypeAnnotation logicalTypeAnnotation) {
+    default Optional<T> visit(DecimalLogicalTypeAnnotation decimalLogicalType) {
       return empty();
     }
 
-    default Optional<T> visit(DateLogicalTypeAnnotation logicalTypeAnnotation) {
+    default Optional<T> visit(DateLogicalTypeAnnotation dateLogicalType) {
       return empty();
     }
 
-    default Optional<T> visit(TimeLogicalTypeAnnotation logicalTypeAnnotation) {
+    default Optional<T> visit(TimeLogicalTypeAnnotation timeLogicalType) {
       return empty();
     }
 
-    default Optional<T> visit(TimestampLogicalTypeAnnotation logicalTypeAnnotation) {
+    default Optional<T> visit(TimestampLogicalTypeAnnotation timestampLogicalType) {
       return empty();
     }
 
-    default Optional<T> visit(IntLogicalTypeAnnotation logicalTypeAnnotation) {
+    default Optional<T> visit(IntLogicalTypeAnnotation intLogicalType) {
       return empty();
     }
 
-    default Optional<T> visit(JsonLogicalTypeAnnotation logicalTypeAnnotation) {
+    default Optional<T> visit(JsonLogicalTypeAnnotation jsonLogicalType) {
       return empty();
     }
 
-    default Optional<T> visit(BsonLogicalTypeAnnotation logicalTypeAnnotation) {
+    default Optional<T> visit(BsonLogicalTypeAnnotation bsonLogicalType) {
       return empty();
     }
 
-    default Optional<T> visit(IntervalLogicalTypeAnnotation logicalTypeAnnotation) {
+    default Optional<T> visit(IntervalLogicalTypeAnnotation intervalLogicalType) {
       return empty();
     }
 
-    default Optional<T> visit(MapKeyValueTypeAnnotation logicalTypeAnnotation) {
+    default Optional<T> visit(MapKeyValueTypeAnnotation mapKeyValueLogicalType) {
       return empty();
     }
   }
diff --git a/parquet-column/src/main/java/org/apache/parquet/schema/MessageType.java b/parquet-column/src/main/java/org/apache/parquet/schema/MessageType.java
index d305eb88e..83f98d7ec 100644
--- a/parquet-column/src/main/java/org/apache/parquet/schema/MessageType.java
+++ b/parquet-column/src/main/java/org/apache/parquet/schema/MessageType.java
@@ -1,4 +1,4 @@
-/* 
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -62,7 +62,7 @@ public void accept(TypeVisitor visitor) {
   public void writeToStringBuilder(StringBuilder sb, String indent) {
     sb.append("message ")
         .append(getName())
-        .append(getOriginalType() == null ? "" : " (" + getOriginalType() +")")
+        .append(getLogicalTypeAnnotation() == null ? "" : " (" + getLogicalTypeAnnotation().toString() +")")
         .append(" {\n");
     membersDisplayString(sb, "  ");
     sb.append("}\n");
diff --git a/parquet-column/src/main/java/org/apache/parquet/schema/OriginalType.java b/parquet-column/src/main/java/org/apache/parquet/schema/OriginalType.java
index b00ae7e6c..78421b33f 100644
--- a/parquet-column/src/main/java/org/apache/parquet/schema/OriginalType.java
+++ b/parquet-column/src/main/java/org/apache/parquet/schema/OriginalType.java
@@ -1,4 +1,4 @@
-/* 
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -21,46 +21,24 @@
 public enum OriginalType {
   MAP,
   LIST,
-  UTF8(PrimitiveStringifier.UTF8_STRINGIFIER),
+  UTF8,
   MAP_KEY_VALUE,
-  ENUM(PrimitiveStringifier.UTF8_STRINGIFIER),
-  DECIMAL {
-    @Override
-    PrimitiveStringifier stringifier(PrimitiveType type) {
-      return PrimitiveStringifier.createDecimalStringifier(type.getDecimalMetadata().getScale());
-    }
-  },
-  DATE(PrimitiveStringifier.DATE_STRINGIFIER),
-  TIME_MILLIS(PrimitiveStringifier.TIME_STRINGIFIER),
-  TIME_MICROS(PrimitiveStringifier.TIME_STRINGIFIER),
-  TIMESTAMP_MILLIS(PrimitiveStringifier.TIMESTAMP_MILLIS_STRINGIFIER),
-  TIMESTAMP_MICROS(PrimitiveStringifier.TIMESTAMP_MICROS_STRINGIFIER),
-  UINT_8(PrimitiveStringifier.UNSIGNED_STRINGIFIER),
-  UINT_16(PrimitiveStringifier.UNSIGNED_STRINGIFIER),
-  UINT_32(PrimitiveStringifier.UNSIGNED_STRINGIFIER),
-  UINT_64(PrimitiveStringifier.UNSIGNED_STRINGIFIER),
-  INT_8(PrimitiveStringifier.DEFAULT_STRINGIFIER),
-  INT_16(PrimitiveStringifier.DEFAULT_STRINGIFIER),
-  INT_32(PrimitiveStringifier.DEFAULT_STRINGIFIER),
-  INT_64(PrimitiveStringifier.DEFAULT_STRINGIFIER),
-  JSON(PrimitiveStringifier.UTF8_STRINGIFIER),
-  BSON(PrimitiveStringifier.DEFAULT_STRINGIFIER),
-  INTERVAL(PrimitiveStringifier.INTERVAL_STRINGIFIER);
-
-  private final PrimitiveStringifier stringifier;
-
-  PrimitiveStringifier stringifier(PrimitiveType type) {
-    if (stringifier == null) {
-      throw new UnsupportedOperationException("Stringifier is not supported for the original type: " + this);
-    }
-    return stringifier;
-  }
-
-  OriginalType() {
-    this(null);
-  }
-
-  OriginalType(PrimitiveStringifier stringifier) {
-    this.stringifier = stringifier;
-  }
+  ENUM,
+  DECIMAL,
+  DATE,
+  TIME_MILLIS,
+  TIME_MICROS,
+  TIMESTAMP_MILLIS,
+  TIMESTAMP_MICROS,
+  UINT_8,
+  UINT_16,
+  UINT_32,
+  UINT_64,
+  INT_8,
+  INT_16,
+  INT_32,
+  INT_64,
+  JSON,
+  BSON,
+  INTERVAL
 }
diff --git a/parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveType.java b/parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveType.java
index 08adfbe99..6a7382eab 100644
--- a/parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveType.java
+++ b/parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveType.java
@@ -21,6 +21,8 @@
 import java.util.Arrays;
 import java.util.List;
 import java.util.Locale;
+import java.util.Objects;
+import java.util.Optional;
 
 import org.apache.parquet.Preconditions;
 import org.apache.parquet.ShouldNeverHappenException;
@@ -31,6 +33,11 @@
 import org.apache.parquet.io.api.RecordConsumer;
 import org.apache.parquet.schema.ColumnOrder.ColumnOrderName;
 
+import static java.util.Optional.empty;
+import static java.util.Optional.of;
+import static org.apache.parquet.schema.LogicalTypeAnnotation.TimeUnit.MICROS;
+import static org.apache.parquet.schema.LogicalTypeAnnotation.TimeUnit.MILLIS;
+
 
 /**
  * Representation of a Primitive type
@@ -85,23 +92,32 @@ public void addValueToPrimitiveConverter(
       }
 
       @Override
-      PrimitiveComparator<?> comparator(OriginalType logicalType) {
+      PrimitiveComparator<?> comparator(LogicalTypeAnnotation logicalType) {
         if (logicalType == null) {
           return PrimitiveComparator.SIGNED_INT64_COMPARATOR;
         }
-        switch (logicalType) {
-        case UINT_64:
-          return PrimitiveComparator.UNSIGNED_INT64_COMPARATOR;
-        case INT_64:
-        case DECIMAL:
-        case TIME_MICROS:
-        case TIMESTAMP_MILLIS:
-        case TIMESTAMP_MICROS:
-          return PrimitiveComparator.SIGNED_INT64_COMPARATOR;
-        default:
-          throw new ShouldNeverHappenException(
-              "No comparator logic implemented for INT64 logical type: " + logicalType);
-        }
+        return logicalType.accept(new LogicalTypeAnnotation.LogicalTypeAnnotationVisitor<PrimitiveComparator>() {
+          @Override
+          public Optional<PrimitiveComparator> visit(LogicalTypeAnnotation.IntLogicalTypeAnnotation intLogicalType) {
+            return intLogicalType.isSigned() ?
+              of(PrimitiveComparator.SIGNED_INT64_COMPARATOR) : of(PrimitiveComparator.UNSIGNED_INT64_COMPARATOR);
+          }
+
+          @Override
+          public Optional<PrimitiveComparator> visit(LogicalTypeAnnotation.DecimalLogicalTypeAnnotation decimalLogicalType) {
+            return of(PrimitiveComparator.SIGNED_INT64_COMPARATOR);
+          }
+
+          @Override
+          public Optional<PrimitiveComparator> visit(LogicalTypeAnnotation.TimeLogicalTypeAnnotation timeLogicalType) {
+            return of(PrimitiveComparator.SIGNED_INT64_COMPARATOR);
+          }
+
+          @Override
+          public Optional<PrimitiveComparator> visit(LogicalTypeAnnotation.TimestampLogicalTypeAnnotation timestampLogicalType) {
+            return of(PrimitiveComparator.SIGNED_INT64_COMPARATOR);
+          }
+        }).orElseThrow(() -> new ShouldNeverHappenException("No comparator logic implemented for INT64 logical type: " + logicalType));
       }
     },
     INT32("getInteger", Integer.TYPE) {
@@ -128,26 +144,39 @@ public void addValueToPrimitiveConverter(
       }
 
       @Override
-      PrimitiveComparator<?> comparator(OriginalType logicalType) {
+      PrimitiveComparator<?> comparator(LogicalTypeAnnotation logicalType) {
         if (logicalType == null) {
           return PrimitiveComparator.SIGNED_INT32_COMPARATOR;
         }
-        switch (logicalType) {
-        case UINT_8:
-        case UINT_16:
-        case UINT_32:
-          return PrimitiveComparator.UNSIGNED_INT32_COMPARATOR;
-        case INT_8:
-        case INT_16:
-        case INT_32:
-        case DECIMAL:
-        case DATE:
-        case TIME_MILLIS:
-          return PrimitiveComparator.SIGNED_INT32_COMPARATOR;
-        default:
-          throw new ShouldNeverHappenException(
-              "No comparator logic implemented for INT32 logical type: " + logicalType);
-        }
+        return logicalType.accept(new LogicalTypeAnnotation.LogicalTypeAnnotationVisitor<PrimitiveComparator>() {
+          @Override
+          public Optional<PrimitiveComparator> visit(LogicalTypeAnnotation.IntLogicalTypeAnnotation intLogicalType) {
+            if (intLogicalType.getBitWidth() == 64) {
+              return empty();
+            }
+            return intLogicalType.isSigned() ?
+              of(PrimitiveComparator.SIGNED_INT32_COMPARATOR) : of(PrimitiveComparator.UNSIGNED_INT32_COMPARATOR);
+          }
+
+          @Override
+          public Optional<PrimitiveComparator> visit(LogicalTypeAnnotation.DecimalLogicalTypeAnnotation decimalLogicalType) {
+            return of(PrimitiveComparator.SIGNED_INT32_COMPARATOR);
+          }
+
+          @Override
+          public Optional<PrimitiveComparator> visit(LogicalTypeAnnotation.DateLogicalTypeAnnotation dateLogicalType) {
+            return of(PrimitiveComparator.SIGNED_INT32_COMPARATOR);
+          }
+
+          @Override
+          public Optional<PrimitiveComparator> visit(LogicalTypeAnnotation.TimeLogicalTypeAnnotation timeLogicalType) {
+            if (timeLogicalType.getUnit() == MILLIS) {
+              return of(PrimitiveComparator.SIGNED_INT32_COMPARATOR);
+            }
+            return empty();
+          }
+        }).orElseThrow(
+          () -> new ShouldNeverHappenException("No comparator logic implemented for INT32 logical type: " + logicalType));
       }
     },
     BOOLEAN("getBoolean", Boolean.TYPE) {
@@ -174,7 +203,7 @@ public void addValueToPrimitiveConverter(
       }
 
       @Override
-      PrimitiveComparator<?> comparator(OriginalType logicalType) {
+      PrimitiveComparator<?> comparator(LogicalTypeAnnotation logicalType) {
         return PrimitiveComparator.BOOLEAN_COMPARATOR;
       }
     },
@@ -202,22 +231,36 @@ public void addValueToPrimitiveConverter(
       }
 
       @Override
-      PrimitiveComparator<?> comparator(OriginalType logicalType) {
+      PrimitiveComparator<?> comparator(LogicalTypeAnnotation logicalType) {
         if (logicalType == null) {
           return PrimitiveComparator.UNSIGNED_LEXICOGRAPHICAL_BINARY_COMPARATOR;
         }
-        switch (logicalType) {
-        case DECIMAL:
-          return PrimitiveComparator.BINARY_AS_SIGNED_INTEGER_COMPARATOR;
-        case UTF8:
-        case ENUM:
-        case JSON:
-        case BSON:
-          return PrimitiveComparator.UNSIGNED_LEXICOGRAPHICAL_BINARY_COMPARATOR;
-        default:
-          throw new ShouldNeverHappenException(
-              "No comparator logic implemented for BINARY logical type: " + logicalType);
-        }
+        return logicalType.accept(new LogicalTypeAnnotation.LogicalTypeAnnotationVisitor<PrimitiveComparator>() {
+          @Override
+          public Optional<PrimitiveComparator> visit(LogicalTypeAnnotation.DecimalLogicalTypeAnnotation decimalLogicalType) {
+            return of(PrimitiveComparator.BINARY_AS_SIGNED_INTEGER_COMPARATOR);
+          }
+
+          @Override
+          public Optional<PrimitiveComparator> visit(LogicalTypeAnnotation.StringLogicalTypeAnnotation stringLogicalType) {
+            return of(PrimitiveComparator.UNSIGNED_LEXICOGRAPHICAL_BINARY_COMPARATOR);
+          }
+
+          @Override
+          public Optional<PrimitiveComparator> visit(LogicalTypeAnnotation.EnumLogicalTypeAnnotation enumLogicalType) {
+            return of(PrimitiveComparator.UNSIGNED_LEXICOGRAPHICAL_BINARY_COMPARATOR);
+          }
+
+          @Override
+          public Optional<PrimitiveComparator> visit(LogicalTypeAnnotation.JsonLogicalTypeAnnotation jsonLogicalType) {
+            return of(PrimitiveComparator.UNSIGNED_LEXICOGRAPHICAL_BINARY_COMPARATOR);
+          }
+
+          @Override
+          public Optional<PrimitiveComparator> visit(LogicalTypeAnnotation.BsonLogicalTypeAnnotation bsonLogicalType) {
+            return of(PrimitiveComparator.UNSIGNED_LEXICOGRAPHICAL_BINARY_COMPARATOR);
+          }
+        }).orElseThrow(() -> new ShouldNeverHappenException("No comparator logic implemented for BINARY logical type: " + logicalType));
       }
     },
     FLOAT("getFloat", Float.TYPE) {
@@ -244,7 +287,7 @@ public void addValueToPrimitiveConverter(
       }
 
       @Override
-      PrimitiveComparator<?> comparator(OriginalType logicalType) {
+      PrimitiveComparator<?> comparator(LogicalTypeAnnotation logicalType) {
         return PrimitiveComparator.FLOAT_COMPARATOR;
       }
     },
@@ -272,7 +315,7 @@ public void addValueToPrimitiveConverter(
       }
 
       @Override
-      PrimitiveComparator<?> comparator(OriginalType logicalType) {
+      PrimitiveComparator<?> comparator(LogicalTypeAnnotation logicalType) {
         return PrimitiveComparator.DOUBLE_COMPARATOR;
       }
     },
@@ -298,7 +341,7 @@ public void addValueToPrimitiveConverter(
       }
 
       @Override
-      PrimitiveComparator<?> comparator(OriginalType logicalType) {
+      PrimitiveComparator<?> comparator(LogicalTypeAnnotation logicalType) {
         return PrimitiveComparator.BINARY_AS_SIGNED_INTEGER_COMPARATOR;
       }
     },
@@ -326,19 +369,23 @@ public void addValueToPrimitiveConverter(
       }
 
       @Override
-      PrimitiveComparator<?> comparator(OriginalType logicalType) {
+      PrimitiveComparator<?> comparator(LogicalTypeAnnotation logicalType) {
         if (logicalType == null) {
           return PrimitiveComparator.UNSIGNED_LEXICOGRAPHICAL_BINARY_COMPARATOR;
         }
-        switch (logicalType) {
-        case DECIMAL:
-          return PrimitiveComparator.BINARY_AS_SIGNED_INTEGER_COMPARATOR;
-        case INTERVAL:
-          return PrimitiveComparator.UNSIGNED_LEXICOGRAPHICAL_BINARY_COMPARATOR;
-        default:
-          throw new ShouldNeverHappenException(
-              "No comparator logic implemented for FIXED_LEN_BYTE_ARRAY logical type: " + logicalType);
-        }
+
+        return logicalType.accept(new LogicalTypeAnnotation.LogicalTypeAnnotationVisitor<PrimitiveComparator>() {
+          @Override
+          public Optional<PrimitiveComparator> visit(LogicalTypeAnnotation.DecimalLogicalTypeAnnotation decimalLogicalType) {
+            return of(PrimitiveComparator.BINARY_AS_SIGNED_INTEGER_COMPARATOR);
+          }
+
+          @Override
+          public Optional<PrimitiveComparator> visit(LogicalTypeAnnotation.IntervalLogicalTypeAnnotation intervalLogicalType) {
+            return of(PrimitiveComparator.UNSIGNED_LEXICOGRAPHICAL_BINARY_COMPARATOR);
+          }
+        }).orElseThrow(() -> new ShouldNeverHappenException(
+          "No comparator logic implemented for FIXED_LEN_BYTE_ARRAY logical type: " + logicalType));
       }
     };
 
@@ -370,7 +417,7 @@ abstract public void addValueToPrimitiveConverter(
 
     abstract public <T, E extends Exception> T convert(PrimitiveTypeNameConverter<T, E> converter) throws E;
 
-    abstract PrimitiveComparator<?> comparator(OriginalType logicalType);
+    abstract PrimitiveComparator<?> comparator(LogicalTypeAnnotation logicalType);
   }
 
   private final PrimitiveTypeName primitive;
@@ -474,7 +521,7 @@ public PrimitiveType(Repetition repetition, PrimitiveTypeName primitive,
     super(name, repetition, logicalTypeAnnotation, id);
     this.primitive = primitive;
     this.length = length;
-    if (getOriginalType() == OriginalType.DECIMAL) {
+    if (logicalTypeAnnotation instanceof LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) {
       LogicalTypeAnnotation.DecimalLogicalTypeAnnotation decimal = (LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) logicalTypeAnnotation;
       this.decimalMeta = new DecimalMetadata(decimal.getPrecision(), decimal.getScale());
     } else {
@@ -482,7 +529,7 @@ public PrimitiveType(Repetition repetition, PrimitiveTypeName primitive,
     }
 
     if (columnOrder == null) {
-      columnOrder = primitive == PrimitiveTypeName.INT96 || getOriginalType() == OriginalType.INTERVAL
+      columnOrder = primitive == PrimitiveTypeName.INT96 || logicalTypeAnnotation instanceof LogicalTypeAnnotation.IntervalLogicalTypeAnnotation
         ? ColumnOrder.undefined()
         : ColumnOrder.typeDefined();
     }
@@ -494,35 +541,9 @@ private ColumnOrder requireValidColumnOrder(ColumnOrder columnOrder) {
       Preconditions.checkArgument(columnOrder.getColumnOrderName() == ColumnOrderName.UNDEFINED,
           "The column order {} is not supported by INT96", columnOrder);
     }
-    if (getOriginalType() != null) {
-      // Explicitly listing all the logical types to avoid having unsupported column orders new types accidentally
-      switch (getOriginalType()) {
-        case INT_8:
-        case INT_16:
-        case INT_32:
-        case INT_64:
-        case UINT_8:
-        case UINT_16:
-        case UINT_32:
-        case UINT_64:
-        case UTF8:
-        case DECIMAL:
-        case DATE:
-        case TIME_MILLIS:
-        case TIME_MICROS:
-        case TIMESTAMP_MILLIS:
-        case TIMESTAMP_MICROS:
-        case ENUM:
-        case JSON:
-        case BSON:
-          // Currently any available column order is valid
-          break;
-        case INTERVAL:
-        default:
-          Preconditions.checkArgument(columnOrder.getColumnOrderName() == ColumnOrderName.UNDEFINED,
-              "The column order {} is not supported by {} ({})", columnOrder, primitive, getOriginalType());
-          break;
-      }
+    if (getLogicalTypeAnnotation() != null) {
+      Preconditions.checkArgument(getLogicalTypeAnnotation().isValidColumnOrder(columnOrder),
+        "The column order {} is not supported by {} ({})", columnOrder, primitive, getLogicalTypeAnnotation());
     }
     return columnOrder;
   }
@@ -533,7 +554,7 @@ private ColumnOrder requireValidColumnOrder(ColumnOrder columnOrder) {
    */
   @Override
   public PrimitiveType withId(int id) {
-    return new PrimitiveType(getRepetition(), primitive, length, getName(), getOriginalType(), decimalMeta, new ID(id),
+    return new PrimitiveType(getRepetition(), primitive, length, getName(), getLogicalTypeAnnotation(), new ID(id),
         columnOrder);
   }
 
@@ -712,7 +733,7 @@ protected Type union(Type toMerge, boolean strict) {
     if (strict) {
       // Can't merge primitive fields of different type names or different original types
       if (!primitive.equals(toMerge.asPrimitiveType().getPrimitiveTypeName()) ||
-          getOriginalType() != toMerge.getOriginalType()) {
+        !Objects.equals(getLogicalTypeAnnotation(), toMerge.getLogicalTypeAnnotation())) {
         reportSchemaMergeError(toMerge);
       }
 
@@ -734,7 +755,7 @@ protected Type union(Type toMerge, boolean strict) {
       builder.length(length);
     }
 
-    return builder.as(getOriginalType()).named(getName());
+    return builder.as(getLogicalTypeAnnotation()).named(getName());
   }
 
   /**
@@ -747,7 +768,7 @@ protected Type union(Type toMerge, boolean strict) {
    */
   @SuppressWarnings("unchecked")
   public <T> PrimitiveComparator<T> comparator() {
-    return (PrimitiveComparator<T>) getPrimitiveTypeName().comparator(getOriginalType());
+    return (PrimitiveComparator<T>) getPrimitiveTypeName().comparator(getLogicalTypeAnnotation());
   }
 
   /**
@@ -762,7 +783,7 @@ public ColumnOrder columnOrder() {
    */
   @SuppressWarnings("unchecked")
   public PrimitiveStringifier stringifier() {
-    OriginalType originalType = getOriginalType();
-    return originalType == null ? PrimitiveStringifier.DEFAULT_STRINGIFIER : originalType.stringifier(this);
+    LogicalTypeAnnotation logicalTypeAnnotation = getLogicalTypeAnnotation();
+    return logicalTypeAnnotation == null ? PrimitiveStringifier.DEFAULT_STRINGIFIER : logicalTypeAnnotation.valueStringifier(this);
   }
 }
diff --git a/parquet-column/src/main/java/org/apache/parquet/schema/Types.java b/parquet-column/src/main/java/org/apache/parquet/schema/Types.java
index 165a5acea..378d6653e 100644
--- a/parquet-column/src/main/java/org/apache/parquet/schema/Types.java
+++ b/parquet-column/src/main/java/org/apache/parquet/schema/Types.java
@@ -21,6 +21,7 @@
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.Optional;
 
 import org.apache.parquet.Preconditions;
 import org.apache.parquet.schema.ColumnOrder.ColumnOrderName;
@@ -441,16 +442,27 @@ protected PrimitiveType build(String name) {
 
       // validate type annotations and required metadata
       if (logicalTypeAnnotation != null) {
-        OriginalType originalType = logicalTypeAnnotation.toOriginalType();
-        switch (originalType) {
-          case UTF8:
-          case JSON:
-          case BSON:
-            Preconditions.checkState(
-                primitiveType == PrimitiveTypeName.BINARY,
-                originalType.toString() + " can only annotate binary fields");
-            break;
-          case DECIMAL:
+        logicalTypeAnnotation.accept(new LogicalTypeAnnotation.LogicalTypeAnnotationVisitor<Boolean>() {
+          @Override
+          public Optional<Boolean> visit(LogicalTypeAnnotation.StringLogicalTypeAnnotation stringLogicalType) {
+            checkBinaryPrimitiveType(stringLogicalType);
+            return Optional.of(true);
+          }
+
+          @Override
+          public Optional<Boolean> visit(LogicalTypeAnnotation.JsonLogicalTypeAnnotation jsonLogicalType) {
+            checkBinaryPrimitiveType(jsonLogicalType);
+            return Optional.of(true);
+          }
+
+          @Override
+          public Optional<Boolean> visit(LogicalTypeAnnotation.BsonLogicalTypeAnnotation bsonLogicalType) {
+            checkBinaryPrimitiveType(bsonLogicalType);
+            return Optional.of(true);
+          }
+
+          @Override
+          public Optional<Boolean> visit(LogicalTypeAnnotation.DecimalLogicalTypeAnnotation decimalLogicalType) {
             Preconditions.checkState(
                 (primitiveType == PrimitiveTypeName.INT32) ||
                 (primitiveType == PrimitiveTypeName.INT64) ||
@@ -478,40 +490,88 @@ protected PrimitiveType build(String name) {
                   "FIXED(" + length + ") cannot store " + meta.getPrecision() +
                   " digits (max " + maxPrecision(length) + ")");
             }
-            break;
-          case DATE:
-          case TIME_MILLIS:
-          case UINT_8:
-          case UINT_16:
-          case UINT_32:
-          case INT_8:
-          case INT_16:
-          case INT_32:
-            Preconditions.checkState(primitiveType == PrimitiveTypeName.INT32,
-                originalType.toString() + " can only annotate INT32");
-            break;
-          case TIME_MICROS:
-          case TIMESTAMP_MILLIS:
-          case TIMESTAMP_MICROS:
-          case UINT_64:
-          case INT_64:
-            Preconditions.checkState(primitiveType == PrimitiveTypeName.INT64,
-                originalType.toString() + " can only annotate INT64");
-            break;
-          case INTERVAL:
+            return Optional.of(true);
+          }
+
+          @Override
+          public Optional<Boolean> visit(LogicalTypeAnnotation.DateLogicalTypeAnnotation dateLogicalType) {
+            checkInt32PrimitiveType(dateLogicalType);
+            return Optional.of(true);
+          }
+
+          @Override
+          public Optional<Boolean> visit(LogicalTypeAnnotation.TimeLogicalTypeAnnotation timeLogicalType) {
+            LogicalTypeAnnotation.TimeUnit unit = timeLogicalType.getUnit();
+            switch (unit) {
+              case MILLIS:
+                checkInt32PrimitiveType(timeLogicalType);
+                break;
+              case MICROS:
+                checkInt64PrimitiveType(timeLogicalType);
+                break;
+              default:
+                throw new RuntimeException("Invalid time unit: " + unit);
+            }
+            return Optional.of(true);
+          }
+
+          @Override
+          public Optional<Boolean> visit(LogicalTypeAnnotation.IntLogicalTypeAnnotation intLogicalType) {
+            int bitWidth = intLogicalType.getBitWidth();
+            switch (bitWidth) {
+              case 8:
+              case 16:
+              case 32:
+                checkInt32PrimitiveType(intLogicalType);
+                break;
+              case 64:
+                checkInt64PrimitiveType(intLogicalType);
+                break;
+              default:
+                throw new RuntimeException("Invalid bit width: " + bitWidth);
+            }
+            return Optional.of(true);
+          }
+
+          @Override
+          public Optional<Boolean> visit(LogicalTypeAnnotation.TimestampLogicalTypeAnnotation timestampLogicalType) {
+            checkInt64PrimitiveType(timestampLogicalType);
+            return Optional.of(true);
+          }
+
+          @Override
+          public Optional<Boolean> visit(LogicalTypeAnnotation.IntervalLogicalTypeAnnotation intervalLogicalType) {
             Preconditions.checkState(
                 (primitiveType == PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY) &&
                 (length == 12),
                 "INTERVAL can only annotate FIXED_LEN_BYTE_ARRAY(12)");
-            break;
-          case ENUM:
+            return Optional.of(true);
+          }
+
+          @Override
+          public Optional<Boolean> visit(LogicalTypeAnnotation.EnumLogicalTypeAnnotation enumLogicalType) {
             Preconditions.checkState(
                 primitiveType == PrimitiveTypeName.BINARY,
                 "ENUM can only annotate binary fields");
-            break;
-          default:
-            throw new IllegalStateException(originalType + " can not be applied to a primitive type");
-        }
+            return Optional.of(true);
+          }
+
+          private void checkBinaryPrimitiveType(LogicalTypeAnnotation logicalTypeAnnotation) {
+            Preconditions.checkState(
+                primitiveType == PrimitiveTypeName.BINARY,
+              logicalTypeAnnotation.toString() + " can only annotate binary fields");
+          }
+
+          private void checkInt32PrimitiveType(LogicalTypeAnnotation logicalTypeAnnotation) {
+            Preconditions.checkState(primitiveType == PrimitiveTypeName.INT32,
+              logicalTypeAnnotation.toString() + " can only annotate INT32");
+          }
+
+          private void checkInt64PrimitiveType(LogicalTypeAnnotation logicalTypeAnnotation) {
+            Preconditions.checkState(primitiveType == PrimitiveTypeName.INT64,
+              logicalTypeAnnotation.toString() + " can only annotate INT64");
+          }
+        }).orElseThrow(() -> new IllegalStateException(logicalTypeAnnotation + " can not be applied to a primitive type"));
       }
 
       if (newLogicalTypeSet) {
@@ -531,7 +591,7 @@ private static long maxPrecision(int numBytes) {
 
     protected DecimalMetadata decimalMetadata() {
       DecimalMetadata meta = null;
-      if (OriginalType.DECIMAL == getOriginalType()) {
+      if (logicalTypeAnnotation instanceof LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) {
         LogicalTypeAnnotation.DecimalLogicalTypeAnnotation decimalType = (LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) logicalTypeAnnotation;
         if (newLogicalTypeSet) {
           if (scaleAlreadySet) {
diff --git a/parquet-column/src/test/java/org/apache/parquet/filter2/predicate/TestValidTypeMap.java b/parquet-column/src/test/java/org/apache/parquet/filter2/predicate/TestValidTypeMap.java
index d44136998..6e19dcadc 100644
--- a/parquet-column/src/test/java/org/apache/parquet/filter2/predicate/TestValidTypeMap.java
+++ b/parquet-column/src/test/java/org/apache/parquet/filter2/predicate/TestValidTypeMap.java
@@ -1,4 +1,4 @@
-/* 
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -28,7 +28,6 @@
 import org.apache.parquet.filter2.predicate.Operators.FloatColumn;
 import org.apache.parquet.filter2.predicate.Operators.IntColumn;
 import org.apache.parquet.filter2.predicate.Operators.LongColumn;
-import org.apache.parquet.schema.OriginalType;
 import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
 
 import static org.junit.Assert.assertEquals;
diff --git a/parquet-column/src/test/java/org/apache/parquet/schema/TestMessageType.java b/parquet-column/src/test/java/org/apache/parquet/schema/TestMessageType.java
index 05619385b..e511d4252 100644
--- a/parquet-column/src/test/java/org/apache/parquet/schema/TestMessageType.java
+++ b/parquet-column/src/test/java/org/apache/parquet/schema/TestMessageType.java
@@ -148,7 +148,7 @@ public void testMergeSchema() {
       t9.union(t10);
       fail("moving from BINARY (UTF8) to BINARY");
     } catch (IncompatibleSchemaModificationException e) {
-      assertEquals("cannot merge original type null into UTF8", e.getMessage());
+      assertEquals("cannot merge logical type null into STRING", e.getMessage());
     }
 
     MessageType t11 = Types.buildMessage()
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java
index 1442910c8..9478e9420 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java
@@ -18,6 +18,7 @@
  */
 package org.apache.parquet.format.converter;
 
+import static java.util.Optional.of;
 import static org.apache.parquet.format.Util.readFileMetaData;
 import static org.apache.parquet.format.Util.writePageHeader;
 
@@ -264,161 +265,161 @@ ConvertedType convertToConvertedType(LogicalTypeAnnotation logicalTypeAnnotation
 
   private static class ConvertedTypeConverterVisitor implements LogicalTypeAnnotation.LogicalTypeAnnotationVisitor<ConvertedType> {
     @Override
-    public Optional<ConvertedType> visit(LogicalTypeAnnotation.StringLogicalTypeAnnotation logicalTypeAnnotation) {
-      return Optional.of(ConvertedType.UTF8);
+    public Optional<ConvertedType> visit(LogicalTypeAnnotation.StringLogicalTypeAnnotation stringLogicalType) {
+      return of(ConvertedType.UTF8);
     }
 
     @Override
-    public Optional<ConvertedType> visit(LogicalTypeAnnotation.MapLogicalTypeAnnotation logicalTypeAnnotation) {
-      return Optional.of(ConvertedType.MAP);
+    public Optional<ConvertedType> visit(LogicalTypeAnnotation.MapLogicalTypeAnnotation mapLogicalType) {
+      return of(ConvertedType.MAP);
     }
 
     @Override
-    public Optional<ConvertedType> visit(LogicalTypeAnnotation.ListLogicalTypeAnnotation logicalTypeAnnotation) {
-      return Optional.of(ConvertedType.LIST);
+    public Optional<ConvertedType> visit(LogicalTypeAnnotation.ListLogicalTypeAnnotation listLogicalType) {
+      return of(ConvertedType.LIST);
     }
 
     @Override
-    public Optional<ConvertedType> visit(LogicalTypeAnnotation.EnumLogicalTypeAnnotation logicalTypeAnnotation) {
-      return Optional.of(ConvertedType.ENUM);
+    public Optional<ConvertedType> visit(LogicalTypeAnnotation.EnumLogicalTypeAnnotation enumLogicalType) {
+      return of(ConvertedType.ENUM);
     }
 
     @Override
-    public Optional<ConvertedType> visit(LogicalTypeAnnotation.DecimalLogicalTypeAnnotation logicalTypeAnnotation) {
-      return Optional.of(ConvertedType.DECIMAL);
+    public Optional<ConvertedType> visit(LogicalTypeAnnotation.DecimalLogicalTypeAnnotation decimalLogicalType) {
+      return of(ConvertedType.DECIMAL);
     }
 
     @Override
-    public Optional<ConvertedType> visit(LogicalTypeAnnotation.DateLogicalTypeAnnotation logicalTypeAnnotation) {
-      return Optional.of(ConvertedType.DATE);
+    public Optional<ConvertedType> visit(LogicalTypeAnnotation.DateLogicalTypeAnnotation dateLogicalType) {
+      return of(ConvertedType.DATE);
     }
 
     @Override
-    public Optional<ConvertedType> visit(LogicalTypeAnnotation.TimeLogicalTypeAnnotation logicalTypeAnnotation) {
-      switch (logicalTypeAnnotation.getUnit()) {
+    public Optional<ConvertedType> visit(LogicalTypeAnnotation.TimeLogicalTypeAnnotation timeLogicalType) {
+      switch (timeLogicalType.getUnit()) {
         case MILLIS:
-          return Optional.of(ConvertedType.TIME_MILLIS);
+          return of(ConvertedType.TIME_MILLIS);
         case MICROS:
-          return Optional.of(ConvertedType.TIME_MICROS);
+          return of(ConvertedType.TIME_MICROS);
         default:
-          throw new RuntimeException("Unknown converted type for " + logicalTypeAnnotation.toOriginalType());
+          throw new RuntimeException("Unknown converted type for " + timeLogicalType.toOriginalType());
       }
     }
 
     @Override
-    public Optional<ConvertedType> visit(LogicalTypeAnnotation.TimestampLogicalTypeAnnotation logicalTypeAnnotation) {
-      switch (logicalTypeAnnotation.getUnit()) {
+    public Optional<ConvertedType> visit(LogicalTypeAnnotation.TimestampLogicalTypeAnnotation timestampLogicalType) {
+      switch (timestampLogicalType.getUnit()) {
         case MICROS:
-          return Optional.of(ConvertedType.TIMESTAMP_MICROS);
+          return of(ConvertedType.TIMESTAMP_MICROS);
         case MILLIS:
-          return Optional.of(ConvertedType.TIMESTAMP_MILLIS);
+          return of(ConvertedType.TIMESTAMP_MILLIS);
         default:
-          throw new RuntimeException("Unknown converted type for " + logicalTypeAnnotation.toOriginalType());
+          throw new RuntimeException("Unknown converted type for " + timestampLogicalType.toOriginalType());
       }
     }
 
     @Override
-    public Optional<ConvertedType> visit(LogicalTypeAnnotation.IntLogicalTypeAnnotation logicalTypeAnnotation) {
-      boolean signed = logicalTypeAnnotation.isSigned();
-      switch (logicalTypeAnnotation.getBitWidth()) {
+    public Optional<ConvertedType> visit(LogicalTypeAnnotation.IntLogicalTypeAnnotation intLogicalType) {
+      boolean signed = intLogicalType.isSigned();
+      switch (intLogicalType.getBitWidth()) {
         case 8:
-          return Optional.of(signed ? ConvertedType.INT_8 : ConvertedType.UINT_8);
+          return of(signed ? ConvertedType.INT_8 : ConvertedType.UINT_8);
         case 16:
-          return Optional.of(signed ? ConvertedType.INT_16 : ConvertedType.UINT_16);
+          return of(signed ? ConvertedType.INT_16 : ConvertedType.UINT_16);
         case 32:
-          return Optional.of(signed ? ConvertedType.INT_32 : ConvertedType.UINT_32);
+          return of(signed ? ConvertedType.INT_32 : ConvertedType.UINT_32);
         case 64:
-          return Optional.of(signed ? ConvertedType.INT_64 : ConvertedType.UINT_64);
+          return of(signed ? ConvertedType.INT_64 : ConvertedType.UINT_64);
         default:
-          throw new RuntimeException("Unknown original type " + logicalTypeAnnotation.toOriginalType());
+          throw new RuntimeException("Unknown original type " + intLogicalType.toOriginalType());
       }
     }
 
     @Override
-    public Optional<ConvertedType> visit(LogicalTypeAnnotation.JsonLogicalTypeAnnotation logicalTypeAnnotation) {
-      return Optional.of(ConvertedType.JSON);
+    public Optional<ConvertedType> visit(LogicalTypeAnnotation.JsonLogicalTypeAnnotation jsonLogicalType) {
+      return of(ConvertedType.JSON);
     }
 
     @Override
-    public Optional<ConvertedType> visit(LogicalTypeAnnotation.BsonLogicalTypeAnnotation logicalTypeAnnotation) {
-      return Optional.of(ConvertedType.BSON);
+    public Optional<ConvertedType> visit(LogicalTypeAnnotation.BsonLogicalTypeAnnotation bsonLogicalType) {
+      return of(ConvertedType.BSON);
     }
 
     @Override
-    public Optional<ConvertedType> visit(LogicalTypeAnnotation.IntervalLogicalTypeAnnotation logicalTypeAnnotation) {
-      return Optional.of(ConvertedType.INTERVAL);
+    public Optional<ConvertedType> visit(LogicalTypeAnnotation.IntervalLogicalTypeAnnotation intervalLogicalType) {
+      return of(ConvertedType.INTERVAL);
     }
 
     @Override
-    public Optional<ConvertedType> visit(LogicalTypeAnnotation.MapKeyValueTypeAnnotation logicalTypeAnnotation) {
-      return Optional.of(ConvertedType.MAP_KEY_VALUE);
+    public Optional<ConvertedType> visit(LogicalTypeAnnotation.MapKeyValueTypeAnnotation mapKeyValueLogicalType) {
+      return of(ConvertedType.MAP_KEY_VALUE);
     }
   }
 
   private static class LogicalTypeConverterVisitor implements LogicalTypeAnnotation.LogicalTypeAnnotationVisitor<LogicalType> {
     @Override
-    public Optional<LogicalType> visit(LogicalTypeAnnotation.StringLogicalTypeAnnotation logicalTypeAnnotation) {
-      return Optional.of(LogicalType.STRING(new StringType()));
+    public Optional<LogicalType> visit(LogicalTypeAnnotation.StringLogicalTypeAnnotation stringLogicalType) {
+      return of(LogicalType.STRING(new StringType()));
     }
 
     @Override
-    public Optional<LogicalType> visit(LogicalTypeAnnotation.MapLogicalTypeAnnotation logicalTypeAnnotation) {
-      return Optional.of(LogicalType.MAP(new MapType()));
+    public Optional<LogicalType> visit(LogicalTypeAnnotation.MapLogicalTypeAnnotation mapLogicalType) {
+      return of(LogicalType.MAP(new MapType()));
     }
 
     @Override
-    public Optional<LogicalType> visit(LogicalTypeAnnotation.ListLogicalTypeAnnotation logicalTypeAnnotation) {
-      return Optional.of(LogicalType.LIST(new ListType()));
+    public Optional<LogicalType> visit(LogicalTypeAnnotation.ListLogicalTypeAnnotation listLogicalType) {
+      return of(LogicalType.LIST(new ListType()));
     }
 
     @Override
-    public Optional<LogicalType> visit(LogicalTypeAnnotation.EnumLogicalTypeAnnotation logicalTypeAnnotation) {
-      return Optional.of(LogicalType.ENUM(new EnumType()));
+    public Optional<LogicalType> visit(LogicalTypeAnnotation.EnumLogicalTypeAnnotation enumLogicalType) {
+      return of(LogicalType.ENUM(new EnumType()));
     }
 
     @Override
-    public Optional<LogicalType> visit(LogicalTypeAnnotation.DecimalLogicalTypeAnnotation logicalTypeAnnotation) {
-      return Optional.of(LogicalType.DECIMAL(new DecimalType(logicalTypeAnnotation.getScale(), logicalTypeAnnotation.getPrecision())));
+    public Optional<LogicalType> visit(LogicalTypeAnnotation.DecimalLogicalTypeAnnotation decimalLogicalType) {
+      return of(LogicalType.DECIMAL(new DecimalType(decimalLogicalType.getScale(), decimalLogicalType.getPrecision())));
     }
 
     @Override
-    public Optional<LogicalType> visit(LogicalTypeAnnotation.DateLogicalTypeAnnotation logicalTypeAnnotation) {
-      return Optional.of(LogicalType.DATE(new DateType()));
+    public Optional<LogicalType> visit(LogicalTypeAnnotation.DateLogicalTypeAnnotation dateLogicalType) {
+      return of(LogicalType.DATE(new DateType()));
     }
 
     @Override
-    public Optional<LogicalType> visit(LogicalTypeAnnotation.TimeLogicalTypeAnnotation logicalTypeAnnotation) {
-      return Optional.of(LogicalType.TIME(new TimeType(logicalTypeAnnotation.isAdjustedToUTC(), convertUnit(logicalTypeAnnotation.getUnit()))));
+    public Optional<LogicalType> visit(LogicalTypeAnnotation.TimeLogicalTypeAnnotation timeLogicalType) {
+      return of(LogicalType.TIME(new TimeType(timeLogicalType.isAdjustedToUTC(), convertUnit(timeLogicalType.getUnit()))));
     }
 
     @Override
-    public Optional<LogicalType> visit(LogicalTypeAnnotation.TimestampLogicalTypeAnnotation logicalTypeAnnotation) {
-      return Optional.of(LogicalType.TIMESTAMP(new TimestampType(logicalTypeAnnotation.isAdjustedToUTC(), convertUnit(logicalTypeAnnotation.getUnit()))));
+    public Optional<LogicalType> visit(LogicalTypeAnnotation.TimestampLogicalTypeAnnotation timestampLogicalType) {
+      return of(LogicalType.TIMESTAMP(new TimestampType(timestampLogicalType.isAdjustedToUTC(), convertUnit(timestampLogicalType.getUnit()))));
     }
 
     @Override
-    public Optional<LogicalType> visit(LogicalTypeAnnotation.IntLogicalTypeAnnotation logicalTypeAnnotation) {
-      return Optional.of(LogicalType.INTEGER(new IntType((byte) logicalTypeAnnotation.getBitWidth(), logicalTypeAnnotation.isSigned())));
+    public Optional<LogicalType> visit(LogicalTypeAnnotation.IntLogicalTypeAnnotation intLogicalType) {
+      return of(LogicalType.INTEGER(new IntType((byte) intLogicalType.getBitWidth(), intLogicalType.isSigned())));
     }
 
     @Override
-    public Optional<LogicalType> visit(LogicalTypeAnnotation.JsonLogicalTypeAnnotation logicalTypeAnnotation) {
-      return Optional.of(LogicalType.JSON(new JsonType()));
+    public Optional<LogicalType> visit(LogicalTypeAnnotation.JsonLogicalTypeAnnotation jsonLogicalType) {
+      return of(LogicalType.JSON(new JsonType()));
     }
 
     @Override
-    public Optional<LogicalType> visit(LogicalTypeAnnotation.BsonLogicalTypeAnnotation logicalTypeAnnotation) {
-      return Optional.of(LogicalType.BSON(new BsonType()));
+    public Optional<LogicalType> visit(LogicalTypeAnnotation.BsonLogicalTypeAnnotation bsonLogicalType) {
+      return of(LogicalType.BSON(new BsonType()));
     }
 
     @Override
-    public Optional<LogicalType> visit(LogicalTypeAnnotation.IntervalLogicalTypeAnnotation logicalTypeAnnotation) {
-      return Optional.of(LogicalType.UNKNOWN(new NullType()));
+    public Optional<LogicalType> visit(LogicalTypeAnnotation.IntervalLogicalTypeAnnotation intervalLogicalType) {
+      return of(LogicalType.UNKNOWN(new NullType()));
     }
 
     @Override
-    public Optional<LogicalType> visit(LogicalTypeAnnotation.MapKeyValueTypeAnnotation logicalTypeAnnotation) {
-      return Optional.of(LogicalType.UNKNOWN(new NullType()));
+    public Optional<LogicalType> visit(LogicalTypeAnnotation.MapKeyValueTypeAnnotation mapKeyValueLogicalType) {
+      return of(LogicalType.UNKNOWN(new NullType()));
     }
   }
 
@@ -669,9 +670,11 @@ private static boolean isMinMaxStatsSupported(PrimitiveType type) {
     UNKNOWN
   }
 
-  private static final Set<OriginalType> STRING_TYPES = Collections
+  private static final Set<Class> STRING_TYPES = Collections
       .unmodifiableSet(new HashSet<>(Arrays.asList(
-          OriginalType.UTF8, OriginalType.ENUM, OriginalType.JSON
+        LogicalTypeAnnotation.StringLogicalTypeAnnotation.class,
+        LogicalTypeAnnotation.EnumLogicalTypeAnnotation.class,
+        LogicalTypeAnnotation.JsonLogicalTypeAnnotation.class
       )));
 
   /**
@@ -688,10 +691,10 @@ private boolean overrideSortOrderToSigned(PrimitiveType type) {
     // even if the override is set, only return stats for string-ish types
     // a null type annotation is considered string-ish because some writers
     // failed to use the UTF8 annotation.
-    OriginalType annotation = type.getOriginalType();
+    LogicalTypeAnnotation annotation = type.getLogicalTypeAnnotation();
     return useSignedStringMinMax &&
         PrimitiveTypeName.BINARY == type.getPrimitiveTypeName() &&
-        (annotation == null || STRING_TYPES.contains(annotation));
+        (annotation == null || STRING_TYPES.contains(annotation.getClass()));
   }
 
   /**
@@ -718,36 +721,76 @@ private static SortOrder defaultSortOrder(PrimitiveTypeName primitive) {
    * @return the "correct" sort order of the type that applications assume
    */
   private static SortOrder sortOrder(PrimitiveType primitive) {
-    OriginalType annotation = primitive.getOriginalType();
+    LogicalTypeAnnotation annotation = primitive.getLogicalTypeAnnotation();
     if (annotation != null) {
-      switch (annotation) {
-        case INT_8:
-        case INT_16:
-        case INT_32:
-        case INT_64:
-        case DATE:
-        case TIME_MICROS:
-        case TIME_MILLIS:
-        case TIMESTAMP_MICROS:
-        case TIMESTAMP_MILLIS:
-          return SortOrder.SIGNED;
-        case UINT_8:
-        case UINT_16:
-        case UINT_32:
-        case UINT_64:
-        case ENUM:
-        case UTF8:
-        case BSON:
-        case JSON:
-          return SortOrder.UNSIGNED;
-        case DECIMAL:
-        case LIST:
-        case MAP:
-        case MAP_KEY_VALUE:
-        case INTERVAL:
-          return SortOrder.UNKNOWN;
-      }
+      return annotation.accept(new LogicalTypeAnnotation.LogicalTypeAnnotationVisitor<SortOrder>() {
+        @Override
+        public Optional<SortOrder> visit(LogicalTypeAnnotation.IntLogicalTypeAnnotation intLogicalType) {
+          return intLogicalType.isSigned() ? of(SortOrder.SIGNED) : of(SortOrder.UNSIGNED);
+        }
+
+        @Override
+        public Optional<SortOrder> visit(LogicalTypeAnnotation.IntervalLogicalTypeAnnotation intervalLogicalType) {
+          return of(SortOrder.UNKNOWN);
+        }
+
+        @Override
+        public Optional<SortOrder> visit(LogicalTypeAnnotation.DateLogicalTypeAnnotation dateLogicalType) {
+          return of(SortOrder.SIGNED);
+        }
+
+        @Override
+        public Optional<SortOrder> visit(LogicalTypeAnnotation.EnumLogicalTypeAnnotation enumLogicalType) {
+          return of(SortOrder.UNSIGNED);
+        }
+
+        @Override
+        public Optional<SortOrder> visit(LogicalTypeAnnotation.BsonLogicalTypeAnnotation bsonLogicalType) {
+          return of(SortOrder.UNSIGNED);
+        }
+
+        @Override
+        public Optional<SortOrder> visit(LogicalTypeAnnotation.JsonLogicalTypeAnnotation jsonLogicalType) {
+          return of(SortOrder.UNSIGNED);
+        }
+
+        @Override
+        public Optional<SortOrder> visit(LogicalTypeAnnotation.StringLogicalTypeAnnotation stringLogicalType) {
+          return of(SortOrder.UNSIGNED);
+        }
+
+        @Override
+        public Optional<SortOrder> visit(LogicalTypeAnnotation.DecimalLogicalTypeAnnotation decimalLogicalType) {
+          return of(SortOrder.UNKNOWN);
+        }
+
+        @Override
+        public Optional<SortOrder> visit(LogicalTypeAnnotation.MapKeyValueTypeAnnotation mapKeyValueLogicalType) {
+          return of(SortOrder.UNKNOWN);
+        }
+
+        @Override
+        public Optional<SortOrder> visit(LogicalTypeAnnotation.MapLogicalTypeAnnotation mapLogicalType) {
+          return of(SortOrder.UNKNOWN);
+        }
+
+        @Override
+        public Optional<SortOrder> visit(LogicalTypeAnnotation.ListLogicalTypeAnnotation listLogicalType) {
+          return of(SortOrder.UNKNOWN);
+        }
+
+        @Override
+        public Optional<SortOrder> visit(LogicalTypeAnnotation.TimeLogicalTypeAnnotation timeLogicalType) {
+          return of(SortOrder.SIGNED);
+        }
+
+        @Override
+        public Optional<SortOrder> visit(LogicalTypeAnnotation.TimestampLogicalTypeAnnotation timestampLogicalType) {
+          return of(SortOrder.SIGNED);
+        }
+      }).orElse(defaultSortOrder(primitive.getPrimitiveTypeName()));
     }
+
     return defaultSortOrder(primitive.getPrimitiveTypeName());
   }
 
diff --git a/parquet-hive/parquet-hive-storage-handler/src/main/java/org/apache/hadoop/hive/ql/io/parquet/convert/HiveSchemaConverter.java b/parquet-hive/parquet-hive-storage-handler/src/main/java/org/apache/hadoop/hive/ql/io/parquet/convert/HiveSchemaConverter.java
index 5d3ab488b..6d229a696 100644
--- a/parquet-hive/parquet-hive-storage-handler/src/main/java/org/apache/hadoop/hive/ql/io/parquet/convert/HiveSchemaConverter.java
+++ b/parquet-hive/parquet-hive-storage-handler/src/main/java/org/apache/hadoop/hive/ql/io/parquet/convert/HiveSchemaConverter.java
@@ -1,4 +1,4 @@
-/* 
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -30,12 +30,15 @@
 
 import org.apache.parquet.schema.ConversionPatterns;
 import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.LogicalTypeAnnotation;
 import org.apache.parquet.schema.MessageType;
-import org.apache.parquet.schema.OriginalType;
 import org.apache.parquet.schema.PrimitiveType;
 import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
 import org.apache.parquet.schema.Type;
 import org.apache.parquet.schema.Type.Repetition;
+import org.apache.parquet.schema.Types;
+
+import static org.apache.parquet.schema.LogicalTypeAnnotation.listType;
 
 public class HiveSchemaConverter {
 
@@ -105,7 +108,7 @@ private static Type convertType(final String name, final TypeInfo typeInfo, fina
   // 1 anonymous element "array_element"
   private static GroupType convertArrayType(final String name, final ListTypeInfo typeInfo) {
     final TypeInfo subType = typeInfo.getListElementTypeInfo();
-    return listWrapper(name, OriginalType.LIST, new GroupType(Repetition.REPEATED,
+    return listWrapper(name, listType(), new GroupType(Repetition.REPEATED,
         ParquetHiveSerDe.ARRAY.toString(), convertType("array_element", subType)));
   }
 
@@ -127,8 +130,8 @@ private static GroupType convertMapType(final String name, final MapTypeInfo typ
     return ConversionPatterns.mapType(Repetition.OPTIONAL, name, keyType, valueType);
   }
 
-  private static GroupType listWrapper(final String name, final OriginalType originalType,
+  private static GroupType listWrapper(final String name, final LogicalTypeAnnotation logicalTypeAnnotation,
       final GroupType groupType) {
-    return new GroupType(Repetition.OPTIONAL, name, originalType, groupType);
+    return Types.optionalGroup().addField(groupType).as(logicalTypeAnnotation).named(name);
   }
 }
diff --git a/parquet-pig/src/main/java/org/apache/parquet/pig/PigSchemaConverter.java b/parquet-pig/src/main/java/org/apache/parquet/pig/PigSchemaConverter.java
index 24f7ee8c9..19356616a 100644
--- a/parquet-pig/src/main/java/org/apache/parquet/pig/PigSchemaConverter.java
+++ b/parquet-pig/src/main/java/org/apache/parquet/pig/PigSchemaConverter.java
@@ -23,7 +23,10 @@
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import java.util.Optional;
 
+import org.apache.parquet.schema.LogicalTypeAnnotation;
+import org.apache.parquet.schema.Types;
 import org.apache.pig.LoadPushDown.RequiredField;
 import org.apache.pig.LoadPushDown.RequiredFieldList;
 import org.apache.pig.data.DataType;
@@ -38,7 +41,6 @@
 import org.apache.parquet.schema.ConversionPatterns;
 import org.apache.parquet.schema.GroupType;
 import org.apache.parquet.schema.MessageType;
-import org.apache.parquet.schema.OriginalType;
 import org.apache.parquet.schema.PrimitiveType;
 import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
 import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeNameConverter;
@@ -47,6 +49,9 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static java.util.Optional.of;
+import static org.apache.parquet.schema.LogicalTypeAnnotation.stringType;
+
 
 /**
  * Converts a Pig Schema into a Parquet schema
@@ -205,7 +210,7 @@ private FieldSchema getSimpleFieldSchema(final String fieldName, Type parquetTyp
       throws FrontendException {
     final PrimitiveTypeName parquetPrimitiveTypeName =
         parquetType.asPrimitiveType().getPrimitiveTypeName();
-    final OriginalType originalType = parquetType.getOriginalType();
+    final LogicalTypeAnnotation logicalTypeAnnotation = parquetType.getLogicalTypeAnnotation();
     return parquetPrimitiveTypeName.convert(
         new PrimitiveTypeNameConverter<Schema.FieldSchema, FrontendException>() {
       @Override
@@ -242,7 +247,7 @@ public FieldSchema convertINT96(PrimitiveTypeName primitiveTypeName)
       @Override
       public FieldSchema convertFIXED_LEN_BYTE_ARRAY(
         PrimitiveTypeName primitiveTypeName) throws FrontendException {
-        if (originalType == OriginalType.DECIMAL) {
+        if (logicalTypeAnnotation instanceof LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) {
           return new FieldSchema(fieldName, null, DataType.BIGDECIMAL);
         } else {
           return new FieldSchema(fieldName, null, DataType.BYTEARRAY);
@@ -258,7 +263,7 @@ public FieldSchema convertBOOLEAN(PrimitiveTypeName primitiveTypeName)
       @Override
       public FieldSchema convertBINARY(PrimitiveTypeName primitiveTypeName)
           throws FrontendException {
-        if (originalType != null && originalType == OriginalType.UTF8) {
+        if (logicalTypeAnnotation instanceof LogicalTypeAnnotation.StringLogicalTypeAnnotation) {
           return new FieldSchema(fieldName, null, DataType.CHARARRAY);
         } else {
           return new FieldSchema(fieldName, null, DataType.BYTEARRAY);
@@ -267,47 +272,71 @@ public FieldSchema convertBINARY(PrimitiveTypeName primitiveTypeName)
     });
   }
 
+  /*
+   * RuntimeException class to workaround throwing checked FrontendException in logical type visitors.
+   * Wrap the FrontendException inside the visitor in an inner catch block, and rethrow it outside of the visitor
+   */
+  private static final class FrontendExceptionWrapper extends RuntimeException {
+    final FrontendException frontendException;
+
+    FrontendExceptionWrapper(FrontendException frontendException) {
+      this.frontendException = frontendException;
+    }
+  }
+
   private FieldSchema getComplexFieldSchema(String fieldName, Type parquetType)
       throws FrontendException {
     GroupType parquetGroupType = parquetType.asGroupType();
-    OriginalType originalType = parquetGroupType.getOriginalType();
-    if (originalType !=  null) {
-      switch(originalType) {
-      case MAP:
-        // verify that its a map
-        if (parquetGroupType.getFieldCount() != 1 || parquetGroupType.getType(0).isPrimitive()) {
-          throw new SchemaConversionException("Invalid map type " + parquetGroupType);
-        }
-        GroupType mapKeyValType = parquetGroupType.getType(0).asGroupType();
-        if (!mapKeyValType.isRepetition(Repetition.REPEATED) ||
-            (mapKeyValType.getOriginalType() != null && !mapKeyValType.getOriginalType().equals(OriginalType.MAP_KEY_VALUE)) ||
-            mapKeyValType.getFieldCount()!=2) {
-          throw new SchemaConversionException("Invalid map type " + parquetGroupType);
-        }
-        // if value is not primitive wrap it in a tuple
-        Type valueType = mapKeyValType.getType(1);
-        Schema s = convertField(valueType);
-        s.getField(0).alias = null;
-        return new FieldSchema(fieldName, s, DataType.MAP);
-      case LIST:
-        Type type = parquetGroupType.getType(0);
-        if (parquetGroupType.getFieldCount()!= 1 || type.isPrimitive()) {
-          // an array is effectively a bag
-          Schema primitiveSchema = new Schema(getSimpleFieldSchema(parquetGroupType.getFieldName(0), type));
-          Schema tupleSchema = new Schema(new FieldSchema(ARRAY_VALUE_NAME, primitiveSchema, DataType.TUPLE));
-          return new FieldSchema(fieldName, tupleSchema, DataType.BAG);
-        }
-        GroupType tupleType = parquetGroupType.getType(0).asGroupType();
-        if (!tupleType.isRepetition(Repetition.REPEATED)) {
-          throw new SchemaConversionException("Invalid list type " + parquetGroupType);
-        }
-        Schema tupleSchema = new Schema(new FieldSchema(tupleType.getName(), convertFields(tupleType.getFields()), DataType.TUPLE));
-        return new FieldSchema(fieldName, tupleSchema, DataType.BAG);
-      case MAP_KEY_VALUE:
-      case ENUM:
-      case UTF8:
-      default:
-        throw new SchemaConversionException("Unexpected original type for " + parquetType + ": " + originalType);
+    LogicalTypeAnnotation logicalTypeAnnotation = parquetGroupType.getLogicalTypeAnnotation();
+    if (logicalTypeAnnotation !=  null) {
+      try {
+        return logicalTypeAnnotation.accept(new LogicalTypeAnnotation.LogicalTypeAnnotationVisitor<FieldSchema>() {
+          @Override
+          public Optional<FieldSchema> visit(LogicalTypeAnnotation.MapLogicalTypeAnnotation mapLogicalType) {
+            try {
+              // verify that its a map
+              if (parquetGroupType.getFieldCount() != 1 || parquetGroupType.getType(0).isPrimitive()) {
+                throw new SchemaConversionException("Invalid map type " + parquetGroupType);
+              }
+              GroupType mapKeyValType = parquetGroupType.getType(0).asGroupType();
+              if (!mapKeyValType.isRepetition(Repetition.REPEATED) ||
+                (mapKeyValType.getLogicalTypeAnnotation() != null && !mapKeyValType.getLogicalTypeAnnotation().equals(LogicalTypeAnnotation.MapKeyValueTypeAnnotation.getInstance())) ||
+                mapKeyValType.getFieldCount() != 2) {
+                throw new SchemaConversionException("Invalid map type " + parquetGroupType);
+              }
+              // if value is not primitive wrap it in a tuple
+              Type valueType = mapKeyValType.getType(1);
+              Schema s = convertField(valueType);
+              s.getField(0).alias = null;
+              return of(new FieldSchema(fieldName, s, DataType.MAP));
+            } catch (FrontendException e) {
+              throw new FrontendExceptionWrapper(e);
+            }
+          }
+
+          @Override
+          public Optional<FieldSchema> visit(LogicalTypeAnnotation.ListLogicalTypeAnnotation listLogicalType) {
+            try {
+              Type type = parquetGroupType.getType(0);
+              if (parquetGroupType.getFieldCount() != 1 || type.isPrimitive()) {
+                // an array is effectively a bag
+                Schema primitiveSchema = new Schema(getSimpleFieldSchema(parquetGroupType.getFieldName(0), type));
+                Schema tupleSchema = new Schema(new FieldSchema(ARRAY_VALUE_NAME, primitiveSchema, DataType.TUPLE));
+                return of(new FieldSchema(fieldName, tupleSchema, DataType.BAG));
+              }
+              GroupType tupleType = parquetGroupType.getType(0).asGroupType();
+              if (!tupleType.isRepetition(Repetition.REPEATED)) {
+                throw new SchemaConversionException("Invalid list type " + parquetGroupType);
+              }
+              Schema tupleSchema = new Schema(new FieldSchema(tupleType.getName(), convertFields(tupleType.getFields()), DataType.TUPLE));
+              return of(new FieldSchema(fieldName, tupleSchema, DataType.BAG));
+            } catch (FrontendException e) {
+              throw new FrontendExceptionWrapper(e);
+            }
+          }
+        }).orElseThrow(() -> new SchemaConversionException("Unexpected original type for " + parquetType + ": " + logicalTypeAnnotation));
+      } catch (FrontendExceptionWrapper e) {
+        throw e.frontendException;
       }
     } else {
       // if original type is not set, we assume it to be tuple
@@ -359,7 +388,7 @@ private Type convertWithName(FieldSchema fieldSchema, String name) {
       case DataType.BOOLEAN:
         return primitive(name, PrimitiveTypeName.BOOLEAN);
       case DataType.CHARARRAY:
-        return primitive(name, PrimitiveTypeName.BINARY, OriginalType.UTF8);
+        return primitive(name, PrimitiveTypeName.BINARY, stringType());
       case DataType.INTEGER:
         return primitive(name, PrimitiveTypeName.INT32);
       case DataType.LONG:
@@ -403,12 +432,12 @@ private String name(String fieldAlias, String defaultName) {
     return fieldAlias == null ? defaultName : fieldAlias;
   }
 
-  private Type primitive(String name, PrimitiveTypeName primitive, OriginalType originalType) {
-    return new PrimitiveType(Repetition.OPTIONAL, primitive, name, originalType);
+  private Type primitive(String name, PrimitiveTypeName primitive, LogicalTypeAnnotation logicalTypeAnnotation) {
+    return Types.primitive(primitive, Repetition.OPTIONAL).as(logicalTypeAnnotation).named(name);
   }
 
   private PrimitiveType primitive(String name, PrimitiveTypeName primitive) {
-    return new PrimitiveType(Repetition.OPTIONAL, primitive, name, null);
+    return Types.primitive(primitive, Repetition.OPTIONAL).named(name);
   }
 
   /**
@@ -511,7 +540,8 @@ private Type filterBag(GroupType bagType, FieldSchema bagFieldSchema) throws Fro
     }
     Type nested = bagType.getType(0);
     FieldSchema innerField = bagFieldSchema.schema.getField(0);
-    if (nested.isPrimitive() || nested.getOriginalType() == OriginalType.MAP || nested.getOriginalType() == OriginalType.LIST) {
+    if (nested.isPrimitive() || nested.getLogicalTypeAnnotation() instanceof LogicalTypeAnnotation.MapLogicalTypeAnnotation
+      || nested.getLogicalTypeAnnotation() instanceof LogicalTypeAnnotation.ListLogicalTypeAnnotation) {
       // Bags always contain tuples => we skip the extra tuple that was inserted in that case.
       innerField = innerField.schema.getField(0);
     }
diff --git a/parquet-pig/src/main/java/org/apache/parquet/pig/convert/TupleConverter.java b/parquet-pig/src/main/java/org/apache/parquet/pig/convert/TupleConverter.java
index 18ea9e451..48bb7539a 100644
--- a/parquet-pig/src/main/java/org/apache/parquet/pig/convert/TupleConverter.java
+++ b/parquet-pig/src/main/java/org/apache/parquet/pig/convert/TupleConverter.java
@@ -1,4 +1,4 @@
-/* 
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -23,6 +23,7 @@
 import java.util.List;
 import java.math.BigDecimal;
 
+import org.apache.parquet.schema.LogicalTypeAnnotation;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.data.DataByteArray;
 import org.apache.pig.data.DataType;
@@ -40,11 +41,8 @@
 import org.apache.parquet.io.api.GroupConverter;
 import org.apache.parquet.io.api.PrimitiveConverter;
 import org.apache.parquet.pig.TupleConversionException;
-import org.apache.parquet.pig.convert.DecimalUtils;
 import org.apache.parquet.schema.GroupType;
-import org.apache.parquet.schema.OriginalType;
 import org.apache.parquet.schema.PrimitiveType;
-import org.apache.parquet.schema.DecimalMetadata;
 import org.apache.parquet.schema.Type;
 import org.apache.parquet.schema.Type.Repetition;
 
@@ -74,7 +72,7 @@ public TupleConverter(GroupType parquetSchema, Schema pigSchema, boolean elephan
         FieldSchema field = pigSchema.getField(i);
         if(parquetSchema.containsField(field.alias) || columnIndexAccess) {
           Type type = getType(columnIndexAccess, field.alias, i);
-          
+
           if(type != null) {
             final int index = i;
             converters[c++] = newConverter(field, type, new ParentValueContainer() {
@@ -85,7 +83,7 @@ void add(Object value) {
             }, elephantBirdCompatible, columnIndexAccess);
           }
         }
-        
+
       }
     } catch (FrontendException e) {
       throw new ParquetDecodingException("can not initialize pig converter from:\n" + parquetSchema + "\n" + pigSchema, e);
@@ -100,10 +98,10 @@ private Type getType(boolean columnIndexAccess, String alias, int index) {
     } else {
       return parquetSchema.getType(parquetSchema.getFieldIndex(alias));
     }
-    
+
     return null;
   }
-  
+
   static Converter newConverter(FieldSchema pigField, Type type, final ParentValueContainer parent, boolean elephantBirdCompatible, boolean columnIndexAccess) {
     try {
       switch (pigField.type) {
@@ -122,7 +120,7 @@ public void end() {
       case DataType.CHARARRAY:
           //If the orignal type isn't a string, we don't want to use the dictionary because
           //a custom implementation will be needed for each type.  Just default to no dictionary.
-        return new FieldStringConverter(parent, type.getOriginalType() == OriginalType.UTF8);
+        return new FieldStringConverter(parent, type.getLogicalTypeAnnotation() instanceof LogicalTypeAnnotation.StringLogicalTypeAnnotation);
       case DataType.BYTEARRAY:
         return new FieldByteArrayConverter(parent);
       case DataType.INTEGER:
@@ -277,8 +275,6 @@ public void addDouble(double value) {
     public void addBoolean(boolean value) {
       parent.add(Boolean.toString(value));
     }
-    
-    
   }
 
   /**
@@ -403,7 +399,7 @@ final public void addLong(long value) {
 
     @Override
     public void addInt(int value) {
-      parent.add((long)value); 
+      parent.add((long)value);
     }
 
     @Override
@@ -425,7 +421,7 @@ public void addBoolean(boolean value) {
     public void addBinary(Binary value) {
       parent.add(Long.parseLong(value.toStringUsingUTF8()));
     }
-    
+
   }
 
   /**
@@ -511,8 +507,6 @@ public void addDouble(double value) {
     public void addBinary(Binary value) {
       parent.add(Boolean.parseBoolean(value.toStringUsingUTF8()));
     }
-
-    
   }
 
   /**
@@ -554,7 +548,8 @@ final public void addBinary(Binary value) {
 
       ParentValueContainer childsParent;
       FieldSchema pigField;
-      if (nestedType.isPrimitive() || nestedType.getOriginalType() == OriginalType.MAP || nestedType.getOriginalType() == OriginalType.LIST) {
+      if (nestedType.isPrimitive() || nestedType.getLogicalTypeAnnotation() instanceof LogicalTypeAnnotation.MapLogicalTypeAnnotation
+        || nestedType.getLogicalTypeAnnotation() instanceof LogicalTypeAnnotation.ListLogicalTypeAnnotation) {
         // Pig bags always contain tuples
         // In that case we need to wrap the value in an extra tuple
         childsParent = new ParentValueContainer() {
diff --git a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoMessageConverter.java b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoMessageConverter.java
index 979d78ea7..92d8b624d 100644
--- a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoMessageConverter.java
+++ b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoMessageConverter.java
@@ -1,4 +1,4 @@
-/* 
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -31,15 +31,17 @@
 import org.apache.parquet.io.api.PrimitiveConverter;
 import org.apache.parquet.schema.GroupType;
 import org.apache.parquet.schema.IncompatibleSchemaModificationException;
-import org.apache.parquet.schema.OriginalType;
+import org.apache.parquet.schema.LogicalTypeAnnotation;
 import org.apache.parquet.schema.Type;
 
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 
 import static com.google.protobuf.Descriptors.FieldDescriptor.JavaType;
+import static java.util.Optional.of;
 
 /**
  * Converts Protocol Buffer message (both top level and inner) to parquet.
@@ -128,13 +130,22 @@ public void add(Object value) {
       };
     }
 
-    if (OriginalType.LIST == parquetType.getOriginalType()) {
-      return new ListConverter(parentBuilder, fieldDescriptor, parquetType);
+    LogicalTypeAnnotation logicalTypeAnnotation = parquetType.getLogicalTypeAnnotation();
+    if (logicalTypeAnnotation == null) {
+      return newScalarConverter(parent, parentBuilder, fieldDescriptor, parquetType);
     }
-    if (OriginalType.MAP == parquetType.getOriginalType()) {
-      return new MapConverter(parentBuilder, fieldDescriptor, parquetType);
-    }
-    return newScalarConverter(parent, parentBuilder, fieldDescriptor, parquetType);
+
+    return logicalTypeAnnotation.accept(new LogicalTypeAnnotation.LogicalTypeAnnotationVisitor<Converter>() {
+      @Override
+      public Optional<Converter> visit(LogicalTypeAnnotation.ListLogicalTypeAnnotation listLogicalType) {
+        return of(new ListConverter(parentBuilder, fieldDescriptor, parquetType));
+      }
+
+      @Override
+      public Optional<Converter> visit(LogicalTypeAnnotation.MapLogicalTypeAnnotation mapLogicalType) {
+        return of(new MapConverter(parentBuilder, fieldDescriptor, parquetType));
+      }
+    }).orElse(newScalarConverter(parent, parentBuilder, fieldDescriptor, parquetType));
   }
 
   private Converter newScalarConverter(ParentValueContainer pvc, Message.Builder parentBuilder, Descriptors.FieldDescriptor fieldDescriptor, Type parquetType) {
@@ -376,9 +387,9 @@ public void addBinary(Binary binary) {
     private final Converter converter;
 
     public ListConverter(Message.Builder parentBuilder, Descriptors.FieldDescriptor fieldDescriptor, Type parquetType) {
-      OriginalType originalType = parquetType.getOriginalType();
-      if (originalType != OriginalType.LIST || parquetType.isPrimitive()) {
-        throw new ParquetDecodingException("Expected LIST wrapper. Found: " + originalType + " instead.");
+      LogicalTypeAnnotation logicalTypeAnnotation = parquetType.getLogicalTypeAnnotation();
+      if (!(logicalTypeAnnotation instanceof LogicalTypeAnnotation.ListLogicalTypeAnnotation) || parquetType.isPrimitive()) {
+        throw new ParquetDecodingException("Expected LIST wrapper. Found: " + logicalTypeAnnotation + " instead.");
       }
 
       GroupType rootWrapperType = parquetType.asGroupType();
@@ -435,9 +446,9 @@ public void end() {
     private final Converter converter;
 
     public MapConverter(Message.Builder parentBuilder, Descriptors.FieldDescriptor fieldDescriptor, Type parquetType) {
-      OriginalType originalType = parquetType.getOriginalType();
-      if (originalType != OriginalType.MAP) {
-        throw new ParquetDecodingException("Expected MAP wrapper. Found: " + originalType + " instead.");
+      LogicalTypeAnnotation logicalTypeAnnotation = parquetType.getLogicalTypeAnnotation();
+      if (!(logicalTypeAnnotation instanceof LogicalTypeAnnotation.MapLogicalTypeAnnotation)) {
+        throw new ParquetDecodingException("Expected MAP wrapper. Found: " + logicalTypeAnnotation + " instead.");
       }
 
       Type parquetSchema;
diff --git a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoSchemaConverter.java b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoSchemaConverter.java
index 0e1aa2010..db5be1409 100644
--- a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoSchemaConverter.java
+++ b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoSchemaConverter.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -23,8 +23,8 @@
 import com.google.protobuf.Descriptors.FieldDescriptor.JavaType;
 import com.google.protobuf.Message;
 import com.twitter.elephantbird.util.Protobufs;
+import org.apache.parquet.schema.LogicalTypeAnnotation;
 import org.apache.parquet.schema.MessageType;
-import org.apache.parquet.schema.OriginalType;
 import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
 import org.apache.parquet.schema.Type;
 import org.apache.parquet.schema.Types;
@@ -35,8 +35,10 @@
 
 import java.util.List;
 
-import static org.apache.parquet.schema.OriginalType.ENUM;
-import static org.apache.parquet.schema.OriginalType.UTF8;
+import static org.apache.parquet.schema.LogicalTypeAnnotation.enumType;
+import static org.apache.parquet.schema.LogicalTypeAnnotation.listType;
+import static org.apache.parquet.schema.LogicalTypeAnnotation.mapType;
+import static org.apache.parquet.schema.LogicalTypeAnnotation.stringType;
 import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.*;
 
 /**
@@ -101,20 +103,19 @@ public MessageType convert(Class<? extends Message> protobufClass) {
     ParquetType parquetType = getParquetType(descriptor);
     if (descriptor.isRepeated() && parquetSpecsCompliant) {
       // the old schema style did not include the LIST wrapper around repeated fields
-      return addRepeatedPrimitive(descriptor, parquetType.primitiveType, parquetType.originalType, builder);
+      return addRepeatedPrimitive(parquetType.primitiveType, parquetType.logicalTypeAnnotation, builder);
     }
 
-    return builder.primitive(parquetType.primitiveType, getRepetition(descriptor)).as(parquetType.originalType);
+    return builder.primitive(parquetType.primitiveType, getRepetition(descriptor)).as(parquetType.logicalTypeAnnotation);
   }
 
-  private <T> Builder<? extends Builder<?, GroupBuilder<T>>, GroupBuilder<T>> addRepeatedPrimitive(FieldDescriptor descriptor,
-                                                                                                   PrimitiveTypeName primitiveType,
-                                                                                                   OriginalType originalType,
+  private <T> Builder<? extends Builder<?, GroupBuilder<T>>, GroupBuilder<T>> addRepeatedPrimitive(PrimitiveTypeName primitiveType,
+                                                                                                   LogicalTypeAnnotation logicalTypeAnnotation,
                                                                                                    final GroupBuilder<T> builder) {
     return builder
-        .group(Type.Repetition.OPTIONAL).as(OriginalType.LIST)
+        .group(Type.Repetition.OPTIONAL).as(listType())
           .group(Type.Repetition.REPEATED)
-            .primitive(primitiveType, Type.Repetition.REQUIRED).as(originalType)
+            .primitive(primitiveType, Type.Repetition.REQUIRED).as(logicalTypeAnnotation)
           .named("element")
         .named("list");
   }
@@ -122,7 +123,7 @@ public MessageType convert(Class<? extends Message> protobufClass) {
   private <T> GroupBuilder<GroupBuilder<T>> addRepeatedMessage(FieldDescriptor descriptor, GroupBuilder<T> builder) {
     GroupBuilder<GroupBuilder<GroupBuilder<GroupBuilder<T>>>> result =
       builder
-        .group(Type.Repetition.OPTIONAL).as(OriginalType.LIST)
+        .group(Type.Repetition.OPTIONAL).as(listType())
         .group(Type.Repetition.REPEATED)
         .group(Type.Repetition.OPTIONAL);
 
@@ -156,9 +157,9 @@ public MessageType convert(Class<? extends Message> protobufClass) {
     ParquetType mapKeyParquetType = getParquetType(fields.get(0));
 
     GroupBuilder<GroupBuilder<GroupBuilder<T>>> group = builder
-      .group(Type.Repetition.OPTIONAL).as(OriginalType.MAP) // only optional maps are allowed in Proto3
+      .group(Type.Repetition.OPTIONAL).as(mapType()) // only optional maps are allowed in Proto3
       .group(Type.Repetition.REPEATED) // key_value wrapper
-      .primitive(mapKeyParquetType.primitiveType, Type.Repetition.REQUIRED).as(mapKeyParquetType.originalType).named("key");
+      .primitive(mapKeyParquetType.primitiveType, Type.Repetition.REQUIRED).as(mapKeyParquetType.logicalTypeAnnotation).named("key");
 
     return addField(fields.get(1), group).named("value")
       .named("key_value");
@@ -173,8 +174,8 @@ private ParquetType getParquetType(FieldDescriptor fieldDescriptor) {
       case DOUBLE: return ParquetType.of(DOUBLE);
       case BOOLEAN: return ParquetType.of(BOOLEAN);
       case FLOAT: return ParquetType.of(FLOAT);
-      case STRING: return ParquetType.of(BINARY, UTF8);
-      case ENUM: return ParquetType.of(BINARY, ENUM);
+      case STRING: return ParquetType.of(BINARY, stringType());
+      case ENUM: return ParquetType.of(BINARY, enumType());
       case BYTE_STRING: return ParquetType.of(BINARY);
       default:
         throw new UnsupportedOperationException("Cannot convert Protocol Buffer: unknown type " + javaType);
@@ -183,15 +184,15 @@ private ParquetType getParquetType(FieldDescriptor fieldDescriptor) {
 
   private static class ParquetType {
     PrimitiveTypeName primitiveType;
-    OriginalType originalType;
+    LogicalTypeAnnotation logicalTypeAnnotation;
 
-    private ParquetType(PrimitiveTypeName primitiveType, OriginalType originalType) {
+    private ParquetType(PrimitiveTypeName primitiveType, LogicalTypeAnnotation logicalTypeAnnotation) {
       this.primitiveType = primitiveType;
-      this.originalType = originalType;
+      this.logicalTypeAnnotation = logicalTypeAnnotation;
     }
 
-    public static ParquetType of(PrimitiveTypeName primitiveType, OriginalType originalType) {
-      return new ParquetType(primitiveType, originalType);
+    public static ParquetType of(PrimitiveTypeName primitiveType, LogicalTypeAnnotation logicalTypeAnnotation) {
+      return new ParquetType(primitiveType, logicalTypeAnnotation);
     }
 
     public static ParquetType of(PrimitiveTypeName primitiveType) {
diff --git a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoWriteSupport.java b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoWriteSupport.java
index 59c236f31..7436b04c6 100644
--- a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoWriteSupport.java
+++ b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoWriteSupport.java
@@ -1,4 +1,4 @@
-/* 
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -38,6 +38,9 @@
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
+
+import static java.util.Optional.ofNullable;
 
 /**
  * Implementation of {@link WriteSupport} for writing Protocol Buffers.
@@ -216,15 +219,21 @@ private FieldWriter createMessageWriter(FieldDescriptor fieldDescriptor, Type ty
     }
 
     private GroupType getGroupType(Type type) {
-      if (type.getOriginalType() == OriginalType.LIST) {
-        return type.asGroupType().getType("list").asGroupType().getType("element").asGroupType();
-      }
-
-      if (type.getOriginalType() == OriginalType.MAP) {
-        return type.asGroupType().getType("key_value").asGroupType().getType("value").asGroupType();
+      LogicalTypeAnnotation logicalTypeAnnotation = type.getLogicalTypeAnnotation();
+      if (logicalTypeAnnotation == null) {
+        return type.asGroupType();
       }
+      return logicalTypeAnnotation.accept(new LogicalTypeAnnotation.LogicalTypeAnnotationVisitor<GroupType>() {
+        @Override
+        public Optional<GroupType> visit(LogicalTypeAnnotation.ListLogicalTypeAnnotation listLogicalType) {
+          return ofNullable(type.asGroupType().getType("list").asGroupType().getType("element").asGroupType());
+        }
 
-      return type.asGroupType();
+        @Override
+        public Optional<GroupType> visit(LogicalTypeAnnotation.MapLogicalTypeAnnotation mapLogicalType) {
+          return ofNullable(type.asGroupType().getType("key_value").asGroupType().getType("value").asGroupType());
+        }
+      }).orElse(type.asGroupType());
     }
 
     private MapWriter createMapWriter(FieldDescriptor fieldDescriptor, Type type) {
diff --git a/parquet-thrift/src/main/java/org/apache/parquet/thrift/ThriftSchemaConvertVisitor.java b/parquet-thrift/src/main/java/org/apache/parquet/thrift/ThriftSchemaConvertVisitor.java
index 1185382e0..7bfcdb1ad 100644
--- a/parquet-thrift/src/main/java/org/apache/parquet/thrift/ThriftSchemaConvertVisitor.java
+++ b/parquet-thrift/src/main/java/org/apache/parquet/thrift/ThriftSchemaConvertVisitor.java
@@ -1,4 +1,4 @@
-/* 
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -23,8 +23,8 @@
 
 import org.apache.parquet.ShouldNeverHappenException;
 import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.LogicalTypeAnnotation;
 import org.apache.parquet.schema.MessageType;
-import org.apache.parquet.schema.OriginalType;
 import org.apache.parquet.schema.PrimitiveType;
 import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
 import org.apache.parquet.schema.Type;
@@ -55,8 +55,8 @@
 import static org.apache.parquet.Preconditions.checkNotNull;
 import static org.apache.parquet.schema.ConversionPatterns.listType;
 import static org.apache.parquet.schema.ConversionPatterns.mapType;
-import static org.apache.parquet.schema.OriginalType.ENUM;
-import static org.apache.parquet.schema.OriginalType.UTF8;
+import static org.apache.parquet.schema.LogicalTypeAnnotation.enumType;
+import static org.apache.parquet.schema.LogicalTypeAnnotation.stringType;
 import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
 import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BOOLEAN;
 import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.DOUBLE;
@@ -278,7 +278,7 @@ private ConvertedField visitPrimitiveType(PrimitiveTypeName type, State state) {
     return visitPrimitiveType(type, null, state);
   }
 
-  private ConvertedField visitPrimitiveType(PrimitiveTypeName type, OriginalType orig, State state) {
+  private ConvertedField visitPrimitiveType(PrimitiveTypeName type, LogicalTypeAnnotation orig, State state) {
     PrimitiveBuilder<PrimitiveType> b = primitive(type, state.repetition);
 
     if (orig != null) {
@@ -294,7 +294,7 @@ private ConvertedField visitPrimitiveType(PrimitiveTypeName type, OriginalType o
 
   @Override
   public ConvertedField visit(EnumType enumType, State state) {
-    return visitPrimitiveType(BINARY, ENUM, state);
+    return visitPrimitiveType(BINARY, enumType(), state);
   }
 
   @Override
@@ -329,7 +329,7 @@ public ConvertedField visit(I64Type i64Type, State state) {
 
   @Override
   public ConvertedField visit(StringType stringType, State state) {
-    return stringType.isBinary() ? visitPrimitiveType(BINARY, state) : visitPrimitiveType(BINARY, UTF8, state);
+    return stringType.isBinary() ? visitPrimitiveType(BINARY, state) : visitPrimitiveType(BINARY, stringType(), state);
   }
 
   private static boolean isUnion(StructOrUnionType s) {
diff --git a/parquet-tools/src/main/java/org/apache/parquet/tools/command/DumpCommand.java b/parquet-tools/src/main/java/org/apache/parquet/tools/command/DumpCommand.java
index 26b5562ff..27043b948 100644
--- a/parquet-tools/src/main/java/org/apache/parquet/tools/command/DumpCommand.java
+++ b/parquet-tools/src/main/java/org/apache/parquet/tools/command/DumpCommand.java
@@ -58,7 +58,6 @@
 import org.apache.parquet.io.api.PrimitiveConverter;
 import org.apache.parquet.schema.MessageType;
 import org.apache.parquet.schema.PrimitiveStringifier;
-import org.apache.parquet.tools.util.MetadataUtils;
 import org.apache.parquet.tools.util.PrettyPrintWriter;
 import org.apache.parquet.tools.util.PrettyPrintWriter.WhiteSpaceHandler;
 
diff --git a/parquet-tools/src/main/java/org/apache/parquet/tools/command/MetadataUtils.java b/parquet-tools/src/main/java/org/apache/parquet/tools/command/MetadataUtils.java
new file mode 100644
index 000000000..0bade3700
--- /dev/null
+++ b/parquet-tools/src/main/java/org/apache/parquet/tools/command/MetadataUtils.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.tools.command;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Strings;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.statistics.Statistics;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.FileMetaData;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.LogicalTypeAnnotation;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.OriginalType;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
+import org.apache.parquet.schema.Type;
+import org.apache.parquet.schema.Type.Repetition;
+import org.apache.parquet.tools.util.PrettyPrintWriter;
+
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+class MetadataUtils {
+  static void showDetails(PrettyPrintWriter out, ParquetMetadata meta, boolean showOriginalTypes) {
+    showDetails(out, meta.getFileMetaData(), showOriginalTypes);
+
+    long i = 1;
+    for (BlockMetaData bmeta : meta.getBlocks()) {
+      out.println();
+      showDetails(out, bmeta, i++);
+    }
+  }
+
+  static void showDetails(PrettyPrintWriter out, FileMetaData meta, boolean showOriginalTypes) {
+    out.format("creator: %s%n", meta.getCreatedBy());
+
+    Map<String,String> extra = meta.getKeyValueMetaData();
+    if (extra != null) {
+      for (Map.Entry<String,String> entry : meta.getKeyValueMetaData().entrySet()) {
+        out.print("extra: ");
+        out.incrementTabLevel();
+        out.format("%s = %s%n", entry.getKey(), entry.getValue());
+        out.decrementTabLevel();
+      }
+    }
+
+    out.println();
+    out.format("file schema: %s%n", meta.getSchema().getName());
+    out.rule('-');
+    showDetails(out, meta.getSchema(), showOriginalTypes);
+  }
+
+  private static void showDetails(PrettyPrintWriter out, BlockMetaData meta, Long num) {
+    long rows = meta.getRowCount();
+    long tbs = meta.getTotalByteSize();
+    long offset = meta.getStartingPos();
+
+    out.format("row group%s: RC:%d TS:%d OFFSET:%d%n", (num == null ? "" : " " + num), rows, tbs, offset);
+    out.rule('-');
+    showDetails(out, meta.getColumns());
+  }
+
+  static void showDetails(PrettyPrintWriter out, List<ColumnChunkMetaData> ccmeta) {
+    Map<String,Object> chunks = new LinkedHashMap<String,Object>();
+    for (ColumnChunkMetaData cmeta : ccmeta) {
+      String[] path = cmeta.getPath().toArray();
+
+      Map<String,Object> current = chunks;
+      for (int i = 0; i < path.length - 1; ++i) {
+        String next = path[i];
+        if (!current.containsKey(next)) {
+          current.put(next, new LinkedHashMap<String,Object>());
+        }
+
+        current = (Map<String,Object>)current.get(next);
+      }
+
+      current.put(path[path.length - 1], cmeta);
+    }
+
+    showColumnChunkDetails(out, chunks, 0);
+  }
+
+  private static void showColumnChunkDetails(PrettyPrintWriter out, Map<String,Object> current, int depth) {
+    for (Map.Entry<String,Object> entry : current.entrySet()) {
+      String name = Strings.repeat(".", depth) + entry.getKey();
+      Object value = entry.getValue();
+
+      if (value instanceof Map) {
+        out.println(name + ": ");
+        showColumnChunkDetails(out, (Map<String,Object>)value, depth + 1);
+      } else {
+        out.print(name + ": ");
+        showDetails(out, (ColumnChunkMetaData)value, false);
+      }
+    }
+  }
+
+  private static void showDetails(PrettyPrintWriter out, ColumnChunkMetaData meta, boolean name) {
+    long doff = meta.getDictionaryPageOffset();
+    long foff = meta.getFirstDataPageOffset();
+    long tsize = meta.getTotalSize();
+    long usize = meta.getTotalUncompressedSize();
+    long count = meta.getValueCount();
+    double ratio = usize / (double)tsize;
+    String encodings = Joiner.on(',').skipNulls().join(meta.getEncodings());
+
+    if (name) {
+      String path = Joiner.on('.').skipNulls().join(meta.getPath());
+      out.format("%s: ", path);
+    }
+
+    out.format(" %s", meta.getType());
+    out.format(" %s", meta.getCodec());
+    out.format(" DO:%d", doff);
+    out.format(" FPO:%d", foff);
+    out.format(" SZ:%d/%d/%.2f", tsize, usize, ratio);
+    out.format(" VC:%d", count);
+    if (!encodings.isEmpty()) out.format(" ENC:%s", encodings);
+    Statistics<?> stats = meta.getStatistics();
+    if (stats != null) {
+      out.format(" ST:[%s]", stats);
+    } else {
+      out.format(" ST:[none]");
+    }
+    out.println();
+  }
+
+  static void showDetails(PrettyPrintWriter out, MessageType type, boolean showOriginalTypes) {
+    List<String> cpath = new ArrayList<String>();
+    for (Type ftype : type.getFields()) {
+      showDetails(out, ftype, 0, type, cpath, showOriginalTypes);
+    }
+  }
+
+  private static void showDetails(PrettyPrintWriter out, GroupType type, int depth, MessageType container, List<String> cpath, boolean showOriginalTypes) {
+    String name = Strings.repeat(".", depth) + type.getName();
+    Repetition rep = type.getRepetition();
+    int fcount = type.getFieldCount();
+    out.format("%s: %s F:%d%n", name, rep, fcount);
+
+    cpath.add(type.getName());
+    for (Type ftype : type.getFields()) {
+      showDetails(out, ftype, depth + 1, container, cpath, showOriginalTypes);
+    }
+    cpath.remove(cpath.size() - 1);
+  }
+
+  private static void showDetails(PrettyPrintWriter out, PrimitiveType type, int depth, MessageType container, List<String> cpath, boolean showOriginalTypes) {
+    String name = Strings.repeat(".", depth) + type.getName();
+    Repetition rep = type.getRepetition();
+    PrimitiveTypeName ptype = type.getPrimitiveTypeName();
+
+    out.format("%s: %s %s", name, rep, ptype);
+    if (showOriginalTypes) {
+      OriginalType otype;
+      try {
+        otype = type.getOriginalType();
+      } catch (Exception e) {
+        otype = null;
+      }
+      if (otype != null) out.format(" O:%s", otype);
+    } else {
+      LogicalTypeAnnotation ltype = type.getLogicalTypeAnnotation();
+      if (ltype != null) out.format(" L:%s", ltype);
+    }
+
+    if (container != null) {
+      cpath.add(type.getName());
+      String[] paths = cpath.toArray(new String[cpath.size()]);
+      cpath.remove(cpath.size() - 1);
+
+      ColumnDescriptor desc = container.getColumnDescription(paths);
+
+      int defl = desc.getMaxDefinitionLevel();
+      int repl = desc.getMaxRepetitionLevel();
+      out.format(" R:%d D:%d", repl, defl);
+    }
+    out.println();
+  }
+
+  private static void showDetails(PrettyPrintWriter out, Type type, int depth, MessageType container, List<String> cpath, boolean showOriginalTypes) {
+    if (type instanceof GroupType) {
+      showDetails(out, type.asGroupType(), depth, container, cpath, showOriginalTypes);
+      return;
+    } else if (type instanceof PrimitiveType) {
+      showDetails(out, type.asPrimitiveType(), depth, container, cpath, showOriginalTypes);
+      return;
+    }
+  }
+}
diff --git a/parquet-tools/src/main/java/org/apache/parquet/tools/command/ShowMetaCommand.java b/parquet-tools/src/main/java/org/apache/parquet/tools/command/ShowMetaCommand.java
index 8d3555152..b07fa7a69 100644
--- a/parquet-tools/src/main/java/org/apache/parquet/tools/command/ShowMetaCommand.java
+++ b/parquet-tools/src/main/java/org/apache/parquet/tools/command/ShowMetaCommand.java
@@ -1,4 +1,4 @@
-/* 
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -19,13 +19,15 @@
 package org.apache.parquet.tools.command;
 
 import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
 
 import org.apache.parquet.hadoop.Footer;
 import org.apache.parquet.hadoop.ParquetFileReader;
-import org.apache.parquet.tools.util.MetadataUtils;
 import org.apache.parquet.tools.util.PrettyPrintWriter;
 import org.apache.parquet.tools.util.PrettyPrintWriter.WhiteSpaceHandler;
 
@@ -37,6 +39,15 @@
     "where <input> is the parquet file to print to stdout"
   };
 
+  public static final Options OPTIONS;
+  static {
+    OPTIONS = new Options();
+    Option originalType = OptionBuilder.withLongOpt("originalType")
+      .withDescription("Print logical types in OriginalType representation.")
+      .create('o');
+    OPTIONS.addOption(originalType);
+  }
+
   public ShowMetaCommand() {
     super(1, 1);
   }
@@ -51,13 +62,19 @@ public String getCommandDescription() {
     return "Prints the metadata of Parquet file(s)";
   }
 
+  @Override
+  public Options getOptions() {
+    return OPTIONS;
+  }
+
   @Override
   public void execute(CommandLine options) throws Exception {
     super.execute(options);
 
     String[] args = options.getArgs();
     String input = args[0];
-    
+    boolean showOriginalTypes = options.hasOption('o');
+
     Configuration conf = new Configuration();
     Path inputPath = new Path(input);
     FileStatus inputFileStatus = inputPath.getFileSystem(conf).getFileStatus(inputPath);
@@ -71,7 +88,7 @@ public void execute(CommandLine options) throws Exception {
 
     for(Footer f: footers) {
       out.format("file: %s%n" , f.getFile());
-      MetadataUtils.showDetails(out, f.getParquetMetadata());
+      MetadataUtils.showDetails(out, f.getParquetMetadata(), showOriginalTypes);
       out.flushColumns();
     }
   }
diff --git a/parquet-tools/src/main/java/org/apache/parquet/tools/command/ShowSchemaCommand.java b/parquet-tools/src/main/java/org/apache/parquet/tools/command/ShowSchemaCommand.java
index d83e5649e..6f83857b3 100644
--- a/parquet-tools/src/main/java/org/apache/parquet/tools/command/ShowSchemaCommand.java
+++ b/parquet-tools/src/main/java/org/apache/parquet/tools/command/ShowSchemaCommand.java
@@ -1,4 +1,4 @@
-/* 
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -32,7 +32,6 @@
 import org.apache.parquet.hadoop.util.HiddenFileFilter;
 import org.apache.parquet.schema.MessageType;
 import org.apache.parquet.tools.Main;
-import org.apache.parquet.tools.util.MetadataUtils;
 import org.apache.parquet.tools.util.PrettyPrintWriter;
 
 import static org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER;
@@ -49,7 +48,11 @@
     Option help = OptionBuilder.withLongOpt("detailed")
                                .withDescription("Show detailed information about the schema.")
                                .create('d');
+    Option originalType = OptionBuilder.withLongOpt("originalType")
+      .withDescription("Print logical types in OriginalType representation.")
+      .create('o');
     OPTIONS.addOption(help);
+    OPTIONS.addOption(originalType);
   }
 
   public ShowSchemaCommand() {
@@ -98,8 +101,9 @@ public void execute(CommandLine options) throws Exception {
 
     Main.out.println(schema);
     if (options.hasOption('d')) {
+      boolean showOriginalTypes = options.hasOption('o');
       PrettyPrintWriter out = PrettyPrintWriter.stdoutPrettyPrinter().build();
-      MetadataUtils.showDetails(out, metaData);
+      MetadataUtils.showDetails(out, metaData, showOriginalTypes);
     }
   }
 }
diff --git a/parquet-tools/src/main/java/org/apache/parquet/tools/read/SimpleRecordConverter.java b/parquet-tools/src/main/java/org/apache/parquet/tools/read/SimpleRecordConverter.java
index a119a347e..c07875ab8 100644
--- a/parquet-tools/src/main/java/org/apache/parquet/tools/read/SimpleRecordConverter.java
+++ b/parquet-tools/src/main/java/org/apache/parquet/tools/read/SimpleRecordConverter.java
@@ -1,4 +1,4 @@
-/* 
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -20,15 +20,18 @@
 
 import java.math.BigDecimal;
 import java.math.BigInteger;
+import java.util.Optional;
 
 import org.apache.parquet.io.api.Binary;
 import org.apache.parquet.io.api.Converter;
 import org.apache.parquet.io.api.GroupConverter;
 import org.apache.parquet.io.api.PrimitiveConverter;
 import org.apache.parquet.schema.GroupType;
-import org.apache.parquet.schema.OriginalType;
+import org.apache.parquet.schema.LogicalTypeAnnotation;
 import org.apache.parquet.schema.Type;
 
+import static java.util.Optional.of;
+
 public class SimpleRecordConverter extends GroupConverter {
   private final Converter converters[];
   private final String name;
@@ -51,31 +54,38 @@ public SimpleRecordConverter(GroupType schema, String name, SimpleRecordConverte
   }
 
   private Converter createConverter(Type field) {
-    OriginalType otype = field.getOriginalType();
+    LogicalTypeAnnotation ltype = field.getLogicalTypeAnnotation();
 
     if (field.isPrimitive()) {
-      if (otype != null) {
-        switch (otype) {
-          case MAP: break;
-          case LIST: break;
-          case UTF8: return new StringConverter(field.getName());
-          case MAP_KEY_VALUE: break;
-          case ENUM: break;
-          case DECIMAL:
-            int scale = field.asPrimitiveType().getDecimalMetadata().getScale();
-            return new DecimalConverter(field.getName(), scale);
-        }
+      if (ltype != null) {
+        return ltype.accept(new LogicalTypeAnnotation.LogicalTypeAnnotationVisitor<Converter>() {
+          @Override
+          public Optional<Converter> visit(LogicalTypeAnnotation.StringLogicalTypeAnnotation stringLogicalType) {
+            return of(new StringConverter(field.getName()));
+          }
+
+          @Override
+          public Optional<Converter> visit(LogicalTypeAnnotation.DecimalLogicalTypeAnnotation decimalLogicalType) {
+            int scale = decimalLogicalType.getScale();
+            return of(new DecimalConverter(field.getName(), scale));
+          }
+        }).orElse(new SimplePrimitiveConverter(field.getName()));
       }
-
-      return new SimplePrimitiveConverter(field.getName());
     }
 
     GroupType groupType = field.asGroupType();
-    if (otype != null) {
-      switch (otype) {
-        case MAP: return new SimpleMapRecordConverter(groupType, field.getName(), this);
-        case LIST: return new SimpleListRecordConverter(groupType, field.getName(), this);
-      }
+    if (ltype != null) {
+      return ltype.accept(new LogicalTypeAnnotation.LogicalTypeAnnotationVisitor<Converter>() {
+        @Override
+        public Optional<Converter> visit(LogicalTypeAnnotation.MapLogicalTypeAnnotation mapLogicalType) {
+          return of(new SimpleMapRecordConverter(groupType, field.getName(), SimpleRecordConverter.this));
+        }
+
+        @Override
+        public Optional<Converter> visit(LogicalTypeAnnotation.ListLogicalTypeAnnotation listLogicalType) {
+          return of(new SimpleListRecordConverter(groupType, field.getName(), SimpleRecordConverter.this));
+        }
+      }).orElse(new SimpleRecordConverter(groupType, field.getName(), this));
     }
     return new SimpleRecordConverter(groupType, field.getName(), this);
   }
diff --git a/parquet-tools/src/main/java/org/apache/parquet/tools/util/MetadataUtils.java b/parquet-tools/src/main/java/org/apache/parquet/tools/util/MetadataUtils.java
index 870b8c18a..206028a30 100644
--- a/parquet-tools/src/main/java/org/apache/parquet/tools/util/MetadataUtils.java
+++ b/parquet-tools/src/main/java/org/apache/parquet/tools/util/MetadataUtils.java
@@ -1,4 +1,4 @@
-/* 
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -40,6 +40,7 @@
 import org.apache.parquet.schema.Type;
 import org.apache.parquet.schema.Type.Repetition;
 
+@Deprecated
 public class MetadataUtils {
   public static final double BAD_COMPRESSION_RATIO_CUTOFF = 0.97;
   public static final double GOOD_COMPRESSION_RATIO_CUTOFF = 1.2;
@@ -163,7 +164,7 @@ public static void showDetails(PrettyPrintWriter out, ColumnDescriptor desc) {
     int defl = desc.getMaxDefinitionLevel();
     int repl = desc.getMaxRepetitionLevel();
 
-    out.format("column desc: %s T:%s R:%d D:%d%n", path, type, repl, defl); 
+    out.format("column desc: %s T:%s R:%d D:%d%n", path, type, repl, defl);
   }
 
   public static void showDetails(PrettyPrintWriter out, MessageType type) {


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Refactor modules to use the new logical type API
> ------------------------------------------------
>
>                 Key: PARQUET-1410
>                 URL: https://issues.apache.org/jira/browse/PARQUET-1410
>             Project: Parquet
>          Issue Type: Improvement
>          Components: parquet-mr
>            Reporter: Nandor Kollar
>            Assignee: Nandor Kollar
>            Priority: Major
>              Labels: pull-request-available
>
> Refactor parquet-mr modules to use the new logical type API for internal decisions (e.g.: replace the OriginalType-based switch cases to a more flexible solution, for example in type builder when checking if the proper annotation is present on physical type)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)