You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by zi...@apache.org on 2018/09/12 12:14:23 UTC

[parquet-mr] branch master updated: PARQUET-1410: Refactor modules to use the new logical type API (#520)

This is an automated email from the ASF dual-hosted git repository.

zivanfi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-mr.git


The following commit(s) were added to refs/heads/master by this push:
     new b4198be  PARQUET-1410: Refactor modules to use the new logical type API (#520)
b4198be is described below

commit b4198be200e7e2df82bc9a18d54c8cd16aa156ac
Author: nandorKollar <na...@users.noreply.github.com>
AuthorDate: Wed Sep 12 14:14:20 2018 +0200

    PARQUET-1410: Refactor modules to use the new logical type API (#520)
---
 .../parquet/arrow/schema/SchemaConverter.java      | 250 ++++++++++-----------
 .../parquet/arrow/schema/TestSchemaConverter.java  |  27 ++-
 .../apache/parquet/avro/AvroSchemaConverter.java   | 154 ++++++++-----
 .../parquet/avro/TestAvroSchemaConverter.java      |  14 +-
 .../parquet/cascading/convert/TupleConverter.java  |   9 +-
 .../src/main/java/org/apache/parquet/cli/Util.java |  10 +
 .../cli/commands/ParquetMetadataCommand.java       |   4 +-
 .../cli/commands/ShowDictionaryCommand.java        |   4 +-
 .../parquet/cli/commands/ShowPagesCommand.java     |   4 +-
 .../parquet/filter2/predicate/ValidTypeMap.java    |   7 +-
 .../apache/parquet/schema/ConversionPatterns.java  |  28 +--
 .../java/org/apache/parquet/schema/GroupType.java  |  36 ++-
 .../parquet/schema/LogicalTypeAnnotation.java      | 111 +++++++--
 .../org/apache/parquet/schema/MessageType.java     |   8 +-
 .../org/apache/parquet/schema/OriginalType.java    |  66 ++----
 .../org/apache/parquet/schema/PrimitiveType.java   | 213 ++++++++++--------
 .../main/java/org/apache/parquet/schema/Types.java | 136 +++++++----
 .../filter2/predicate/TestValidTypeMap.java        |   7 +-
 .../org/apache/parquet/schema/TestMessageType.java |   2 +-
 .../format/converter/ParquetMetadataConverter.java | 235 +++++++++++--------
 .../ql/io/parquet/convert/HiveSchemaConverter.java |  17 +-
 .../org/apache/parquet/pig/PigSchemaConverter.java | 124 ++++++----
 .../apache/parquet/pig/convert/TupleConverter.java |  31 ++-
 .../parquet/proto/ProtoMessageConverter.java       |  43 ++--
 .../apache/parquet/proto/ProtoSchemaConverter.java |  45 ++--
 .../apache/parquet/proto/ProtoWriteSupport.java    |  29 ++-
 .../parquet/thrift/ThriftSchemaConvertVisitor.java |  18 +-
 .../apache/parquet/tools/command/DumpCommand.java  |   1 -
 .../tools/{util => command}/MetadataUtils.java     |  93 +++-----
 .../parquet/tools/command/ShowMetaCommand.java     |  29 ++-
 .../parquet/tools/command/ShowSchemaCommand.java   |  14 +-
 .../parquet/tools/read/SimpleRecordConverter.java  |  56 +++--
 .../apache/parquet/tools/util/MetadataUtils.java   |   9 +-
 33 files changed, 1072 insertions(+), 762 deletions(-)

diff --git a/parquet-arrow/src/main/java/org/apache/parquet/arrow/schema/SchemaConverter.java b/parquet-arrow/src/main/java/org/apache/parquet/arrow/schema/SchemaConverter.java
index b0f122c..e02b03b 100644
--- a/parquet-arrow/src/main/java/org/apache/parquet/arrow/schema/SchemaConverter.java
+++ b/parquet-arrow/src/main/java/org/apache/parquet/arrow/schema/SchemaConverter.java
@@ -19,22 +19,16 @@
 package org.apache.parquet.arrow.schema;
 
 import static java.util.Arrays.asList;
-import static org.apache.parquet.schema.OriginalType.DATE;
-import static org.apache.parquet.schema.OriginalType.DECIMAL;
-import static org.apache.parquet.schema.OriginalType.INTERVAL;
-import static org.apache.parquet.schema.OriginalType.INT_16;
-import static org.apache.parquet.schema.OriginalType.INT_32;
-import static org.apache.parquet.schema.OriginalType.INT_64;
-import static org.apache.parquet.schema.OriginalType.INT_8;
-import static org.apache.parquet.schema.OriginalType.TIMESTAMP_MILLIS;
-import static org.apache.parquet.schema.OriginalType.TIMESTAMP_MICROS;
-import static org.apache.parquet.schema.OriginalType.TIME_MILLIS;
-import static org.apache.parquet.schema.OriginalType.TIME_MICROS;
-import static org.apache.parquet.schema.OriginalType.UINT_16;
-import static org.apache.parquet.schema.OriginalType.UINT_32;
-import static org.apache.parquet.schema.OriginalType.UINT_64;
-import static org.apache.parquet.schema.OriginalType.UINT_8;
-import static org.apache.parquet.schema.OriginalType.UTF8;
+import static java.util.Optional.empty;
+import static java.util.Optional.of;
+import static org.apache.parquet.schema.LogicalTypeAnnotation.TimeUnit.MICROS;
+import static org.apache.parquet.schema.LogicalTypeAnnotation.TimeUnit.MILLIS;
+import static org.apache.parquet.schema.LogicalTypeAnnotation.dateType;
+import static org.apache.parquet.schema.LogicalTypeAnnotation.decimalType;
+import static org.apache.parquet.schema.LogicalTypeAnnotation.intType;
+import static org.apache.parquet.schema.LogicalTypeAnnotation.stringType;
+import static org.apache.parquet.schema.LogicalTypeAnnotation.timeType;
+import static org.apache.parquet.schema.LogicalTypeAnnotation.timestampType;
 import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
 import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BOOLEAN;
 import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.DOUBLE;
@@ -48,6 +42,7 @@ import static org.apache.parquet.schema.Type.Repetition.REQUIRED;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Optional;
 
 import org.apache.arrow.vector.types.DateUnit;
 import org.apache.arrow.vector.types.FloatingPointPrecision;
@@ -75,10 +70,9 @@ import org.apache.parquet.arrow.schema.SchemaMapping.RepeatedTypeMapping;
 import org.apache.parquet.arrow.schema.SchemaMapping.StructTypeMapping;
 import org.apache.parquet.arrow.schema.SchemaMapping.TypeMapping;
 import org.apache.parquet.arrow.schema.SchemaMapping.UnionTypeMapping;
-import org.apache.parquet.schema.DecimalMetadata;
 import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.LogicalTypeAnnotation;
 import org.apache.parquet.schema.MessageType;
-import org.apache.parquet.schema.OriginalType;
 import org.apache.parquet.schema.PrimitiveType;
 import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
 import org.apache.parquet.schema.Type;
@@ -180,13 +174,11 @@ public class SchemaConverter {
         boolean signed = type.getIsSigned();
         switch (type.getBitWidth()) {
           case 8:
-            return primitive(INT32, signed ? INT_8 : UINT_8);
           case 16:
-            return primitive(INT32, signed ? INT_16 : UINT_16);
           case 32:
-            return primitive(INT32, signed ? INT_32 : UINT_32);
+            return primitive(INT32, intType(type.getBitWidth(), signed));
           case 64:
-            return primitive(INT64, signed ? INT_64 : UINT_64);
+            return primitive(INT64, intType(64, signed));
           default:
             throw new IllegalArgumentException("Illegal int type: " + field);
         }
@@ -209,7 +201,7 @@ public class SchemaConverter {
 
       @Override
       public TypeMapping visit(Utf8 type) {
-        return primitive(BINARY, UTF8);
+        return primitive(BINARY, stringType());
       }
 
       @Override
@@ -243,7 +235,7 @@ public class SchemaConverter {
 
       @Override
       public TypeMapping visit(Date type) {
-        return primitive(INT32, DATE);
+        return primitive(INT32, dateType());
       }
 
       @Override
@@ -251,9 +243,9 @@ public class SchemaConverter {
         int bitWidth = type.getBitWidth();
         TimeUnit timeUnit = type.getUnit();
         if (bitWidth == 32 && timeUnit == TimeUnit.MILLISECOND) {
-          return primitive(INT32, TIME_MILLIS);
+          return primitive(INT32, timeType(false, MILLIS));
         } else if (bitWidth == 64 && timeUnit == TimeUnit.MICROSECOND) {
-          return primitive(INT64, TIME_MICROS);
+          return primitive(INT64, timeType(false, MICROS));
         }
         throw new UnsupportedOperationException("Unsupported type " + type);
       }
@@ -262,20 +254,25 @@ public class SchemaConverter {
       public TypeMapping visit(Timestamp type) {
         TimeUnit timeUnit = type.getUnit();
         if (timeUnit == TimeUnit.MILLISECOND) {
-          return primitive(INT64, TIMESTAMP_MILLIS);
+          return primitive(INT64, timestampType(isUtcNormalized(type), MILLIS));
         } else if (timeUnit == TimeUnit.MICROSECOND) {
-          return primitive(INT64, TIMESTAMP_MICROS);
+          return primitive(INT64, timestampType(isUtcNormalized(type), MICROS));
         }
         throw new UnsupportedOperationException("Unsupported type " + type);
       }
 
+      private boolean isUtcNormalized(Timestamp timestamp) {
+        String timeZone = timestamp.getTimezone();
+        return timeZone != null && !timeZone.isEmpty();
+      }
+
       /**
        * See https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#interval
        */
       @Override
       public TypeMapping visit(Interval type) {
         // TODO(PARQUET-675): fix interval original types
-        return primitiveFLBA(12, INTERVAL);
+        return primitiveFLBA(12, LogicalTypeAnnotation.IntervalLogicalTypeAnnotation.getInstance());
       }
 
       @Override
@@ -288,18 +285,18 @@ public class SchemaConverter {
       }
 
       private TypeMapping decimal(PrimitiveTypeName type, int precision, int scale) {
-        return mapping(Types.optional(type).as(DECIMAL).precision(precision).scale(scale).named(fieldName));
+        return mapping(Types.optional(type).as(decimalType(scale, precision)).named(fieldName));
       }
 
       private TypeMapping primitive(PrimitiveTypeName type) {
         return mapping(Types.optional(type).named(fieldName));
       }
 
-      private TypeMapping primitive(PrimitiveTypeName type, OriginalType otype) {
+      private TypeMapping primitive(PrimitiveTypeName type, LogicalTypeAnnotation otype) {
         return mapping(Types.optional(type).as(otype).named(fieldName));
       }
 
-      private TypeMapping primitiveFLBA(int length, OriginalType otype) {
+      private TypeMapping primitiveFLBA(int length, LogicalTypeAnnotation otype) {
         return mapping(Types.optional(FIXED_LEN_BYTE_ARRAY).length(length).as(otype).named(fieldName));
       }
     });
@@ -363,21 +360,21 @@ public class SchemaConverter {
    * @return the mapping
    */
   private TypeMapping fromParquetGroup(GroupType type, String name) {
-    OriginalType ot = type.getOriginalType();
-    if (ot == null) {
+    LogicalTypeAnnotation logicalType = type.getLogicalTypeAnnotation();
+    if (logicalType == null) {
       List<TypeMapping> typeMappings = fromParquet(type.getFields());
       Field arrowField = new Field(name, type.isRepetition(OPTIONAL), new Struct(), fields(typeMappings));
       return new StructTypeMapping(arrowField, type, typeMappings);
     } else {
-      switch (ot) {
-        case LIST:
+      return logicalType.accept(new LogicalTypeAnnotation.LogicalTypeAnnotationVisitor<TypeMapping>() {
+        @Override
+        public Optional<TypeMapping> visit(LogicalTypeAnnotation.ListLogicalTypeAnnotation listLogicalType) {
           List3Levels list3Levels = new List3Levels(type);
           TypeMapping child = fromParquet(list3Levels.getElement(), null, list3Levels.getElement().getRepetition());
           Field arrowField = new Field(name, type.isRepetition(OPTIONAL), new ArrowType.List(), asList(child.getArrowField()));
-          return new ListTypeMapping(arrowField, list3Levels, child);
-        default:
-          throw new UnsupportedOperationException("Unsupported type " + type);
-      }
+          return of(new ListTypeMapping(arrowField, list3Levels, child));
+        }
+      }).orElseThrow(() -> new UnsupportedOperationException("Unsupported type " + type));
     }
   }
 
@@ -406,92 +403,82 @@ public class SchemaConverter {
 
       @Override
       public TypeMapping convertINT32(PrimitiveTypeName primitiveTypeName) throws RuntimeException {
-        OriginalType ot = type.getOriginalType();
-        if (ot == null) {
+        LogicalTypeAnnotation logicalTypeAnnotation = type.getLogicalTypeAnnotation();
+        if (logicalTypeAnnotation == null) {
           return integer(32, true);
         }
-        switch (ot) {
-          case INT_8:
-            return integer(8, true);
-          case INT_16:
-            return integer(16, true);
-          case INT_32:
-            return integer(32, true);
-          case UINT_8:
-            return integer(8, false);
-          case UINT_16:
-            return integer(16, false);
-          case UINT_32:
-            return integer(32, false);
-          case DECIMAL:
-            return decimal(type.getDecimalMetadata());
-          case DATE:
-            return field(new ArrowType.Date(DateUnit.DAY));
-          case TIME_MILLIS:
-            return field(new ArrowType.Time(TimeUnit.MILLISECOND, 32));
-          default:
-          case INT_64:
-          case UINT_64:
-          case UTF8:
-          case ENUM:
-          case BSON:
-          case INTERVAL:
-          case JSON:
-          case LIST:
-          case MAP:
-          case MAP_KEY_VALUE:
-          case TIMESTAMP_MICROS:
-          case TIMESTAMP_MILLIS:
-          case TIME_MICROS:
-            throw new IllegalArgumentException("illegal type " + type);
-        }
+        return logicalTypeAnnotation.accept(new LogicalTypeAnnotation.LogicalTypeAnnotationVisitor<TypeMapping>() {
+          @Override
+          public Optional<TypeMapping> visit(LogicalTypeAnnotation.DecimalLogicalTypeAnnotation decimalLogicalType) {
+            return of(decimal(decimalLogicalType.getPrecision(), decimalLogicalType.getScale()));
+          }
+
+          @Override
+          public Optional<TypeMapping> visit(LogicalTypeAnnotation.DateLogicalTypeAnnotation dateLogicalType) {
+            return of(field(new ArrowType.Date(DateUnit.DAY)));
+          }
+
+          @Override
+          public Optional<TypeMapping> visit(LogicalTypeAnnotation.TimeLogicalTypeAnnotation timeLogicalType) {
+            return timeLogicalType.getUnit() == MILLIS ? of(field(new ArrowType.Time(TimeUnit.MILLISECOND, 32))) : empty();
+          }
+
+          @Override
+          public Optional<TypeMapping> visit(LogicalTypeAnnotation.IntLogicalTypeAnnotation intLogicalType) {
+            if (intLogicalType.getBitWidth() == 64) {
+              return empty();
+            }
+            return of(integer(intLogicalType.getBitWidth(), intLogicalType.isSigned()));
+          }
+        }).orElseThrow(() -> new IllegalArgumentException("illegal type " + type));
       }
 
       @Override
       public TypeMapping convertINT64(PrimitiveTypeName primitiveTypeName) throws RuntimeException {
-        OriginalType ot = type.getOriginalType();
-        if (ot == null) {
+        LogicalTypeAnnotation logicalTypeAnnotation = type.getLogicalTypeAnnotation();
+        if (logicalTypeAnnotation == null) {
           return integer(64, true);
         }
-        switch (ot) {
-          case INT_8:
-            return integer(8, true);
-          case INT_16:
-            return integer(16, true);
-          case INT_32:
-            return integer(32, true);
-          case INT_64:
-            return integer(64, true);
-          case UINT_8:
-            return integer(8, false);
-          case UINT_16:
-            return integer(16, false);
-          case UINT_32:
-            return integer(32, false);
-          case UINT_64:
-            return integer(64, false);
-          case DECIMAL:
-            return decimal(type.getDecimalMetadata());
-          case DATE:
-            return field(new ArrowType.Date(DateUnit.DAY));
-          case TIMESTAMP_MICROS:
-            return field(new ArrowType.Timestamp(TimeUnit.MICROSECOND, "UTC"));
-          case TIMESTAMP_MILLIS:
-            return field(new ArrowType.Timestamp(TimeUnit.MILLISECOND, "UTC"));
-          case TIME_MICROS:
-            return field(new ArrowType.Time(TimeUnit.MICROSECOND, 64));
-          default:
-          case UTF8:
-          case ENUM:
-          case BSON:
-          case INTERVAL:
-          case JSON:
-          case LIST:
-          case MAP:
-          case MAP_KEY_VALUE:
-          case TIME_MILLIS:
-            throw new IllegalArgumentException("illegal type " + type);
-        }
+
+        return logicalTypeAnnotation.accept(new LogicalTypeAnnotation.LogicalTypeAnnotationVisitor<TypeMapping>() {
+          @Override
+          public Optional<TypeMapping> visit(LogicalTypeAnnotation.DateLogicalTypeAnnotation dateLogicalType) {
+            return of(field(new ArrowType.Date(DateUnit.DAY)));
+          }
+
+          @Override
+          public Optional<TypeMapping> visit(LogicalTypeAnnotation.DecimalLogicalTypeAnnotation decimalLogicalType) {
+            return of(decimal(decimalLogicalType.getPrecision(), decimalLogicalType.getScale()));
+          }
+
+          @Override
+          public Optional<TypeMapping> visit(LogicalTypeAnnotation.IntLogicalTypeAnnotation intLogicalType) {
+            return of(integer(intLogicalType.getBitWidth(), intLogicalType.isSigned()));
+          }
+
+          @Override
+          public Optional<TypeMapping> visit(LogicalTypeAnnotation.TimeLogicalTypeAnnotation timeLogicalType) {
+            if (timeLogicalType.getUnit() == MICROS) {
+              return of(field(new ArrowType.Time(TimeUnit.MICROSECOND, 64)));
+            }
+            return empty();
+          }
+
+          @Override
+          public Optional<TypeMapping> visit(LogicalTypeAnnotation.TimestampLogicalTypeAnnotation timestampLogicalType) {
+            switch (timestampLogicalType.getUnit()) {
+              case MICROS:
+                return of(field(new ArrowType.Timestamp(TimeUnit.MICROSECOND, getTimeZone(timestampLogicalType))));
+              case MILLIS:
+                return of(field(new ArrowType.Timestamp(TimeUnit.MILLISECOND, getTimeZone(timestampLogicalType))));
+            }
+            return empty();
+          }
+
+          private String getTimeZone(LogicalTypeAnnotation.TimestampLogicalTypeAnnotation timestampLogicalType) {
+            return timestampLogicalType.isAdjustedToUTC() ? "UTC" : null;
+          }
+        }).orElseThrow(() -> new IllegalArgumentException("illegal type " + type));
       }
 
       @Override
@@ -512,22 +499,25 @@ public class SchemaConverter {
 
       @Override
       public TypeMapping convertBINARY(PrimitiveTypeName primitiveTypeName) throws RuntimeException {
-        OriginalType ot = type.getOriginalType();
-        if (ot == null) {
+        LogicalTypeAnnotation logicalTypeAnnotation = type.getLogicalTypeAnnotation();
+        if (logicalTypeAnnotation == null) {
           return field(new ArrowType.Binary());
         }
-        switch (ot) {
-          case UTF8:
-            return field(new ArrowType.Utf8());
-          case DECIMAL:
-            return decimal(type.getDecimalMetadata());
-          default:
-            throw new IllegalArgumentException("illegal type " + type);
-        }
+        return logicalTypeAnnotation.accept(new LogicalTypeAnnotation.LogicalTypeAnnotationVisitor<TypeMapping>() {
+          @Override
+          public Optional<TypeMapping> visit(LogicalTypeAnnotation.StringLogicalTypeAnnotation stringLogicalType) {
+            return of(field(new ArrowType.Utf8()));
+          }
+
+          @Override
+          public Optional<TypeMapping> visit(LogicalTypeAnnotation.DecimalLogicalTypeAnnotation decimalLogicalType) {
+            return of(decimal(decimalLogicalType.getPrecision(), decimalLogicalType.getScale()));
+          }
+        }).orElseThrow(() -> new IllegalArgumentException("illegal type " + type));
       }
 
-      private TypeMapping decimal(DecimalMetadata decimalMetadata) {
-        return field(new ArrowType.Decimal(decimalMetadata.getPrecision(), decimalMetadata.getScale()));
+      private TypeMapping decimal(int precision, int scale) {
+        return field(new ArrowType.Decimal(precision, scale));
       }
 
       private TypeMapping integer(int width, boolean signed) {
diff --git a/parquet-arrow/src/test/java/org/apache/parquet/arrow/schema/TestSchemaConverter.java b/parquet-arrow/src/test/java/org/apache/parquet/arrow/schema/TestSchemaConverter.java
index 2d1f028..2817de2 100644
--- a/parquet-arrow/src/test/java/org/apache/parquet/arrow/schema/TestSchemaConverter.java
+++ b/parquet-arrow/src/test/java/org/apache/parquet/arrow/schema/TestSchemaConverter.java
@@ -19,6 +19,10 @@
 package org.apache.parquet.arrow.schema;
 
 import static java.util.Arrays.asList;
+import static org.apache.parquet.schema.LogicalTypeAnnotation.TimeUnit.MICROS;
+import static org.apache.parquet.schema.LogicalTypeAnnotation.TimeUnit.MILLIS;
+import static org.apache.parquet.schema.LogicalTypeAnnotation.timeType;
+import static org.apache.parquet.schema.LogicalTypeAnnotation.timestampType;
 import static org.apache.parquet.schema.OriginalType.DATE;
 import static org.apache.parquet.schema.OriginalType.DECIMAL;
 import static org.apache.parquet.schema.OriginalType.INTERVAL;
@@ -64,10 +68,9 @@ import org.apache.parquet.arrow.schema.SchemaMapping.UnionTypeMapping;
 import org.apache.parquet.example.Paper;
 import org.apache.parquet.schema.MessageType;
 import org.apache.parquet.schema.Types;
+import org.junit.Assert;
 import org.junit.Test;
 
-import junit.framework.Assert;
-
 /**
  * @see SchemaConverter
  */
@@ -90,7 +93,10 @@ public class TestSchemaConverter {
     field("f", new ArrowType.FixedSizeList(1), field(null, new ArrowType.Date(DateUnit.DAY))),
     field("g", new ArrowType.FloatingPoint(FloatingPointPrecision.SINGLE)),
     field("h", new ArrowType.Timestamp(TimeUnit.MILLISECOND, "UTC")),
-    field("i", new ArrowType.Interval(IntervalUnit.DAY_TIME))
+    field("j", new ArrowType.Timestamp(TimeUnit.MILLISECOND, null)),
+    field("k", new ArrowType.Timestamp(TimeUnit.MICROSECOND, "UTC")),
+    field("l", new ArrowType.Timestamp(TimeUnit.MICROSECOND, null)),
+    field("m", new ArrowType.Interval(IntervalUnit.DAY_TIME))
   ));
   private final MessageType complexParquetSchema = Types.buildMessage()
     .addField(Types.optional(INT32).as(INT_8).named("a"))
@@ -105,8 +111,11 @@ public class TestSchemaConverter {
       setElementType(Types.optional(INT32).as(DATE).named("element"))
       .named("f"))
     .addField(Types.optional(FLOAT).named("g"))
-    .addField(Types.optional(INT64).as(TIMESTAMP_MILLIS).named("h"))
-    .addField(Types.optional(FIXED_LEN_BYTE_ARRAY).length(12).as(INTERVAL).named("i"))
+    .addField(Types.optional(INT64).as(timestampType(true, MILLIS)).named("h"))
+    .addField(Types.optional(INT64).as(timestampType(false, MILLIS)).named("j"))
+    .addField(Types.optional(INT64).as(timestampType(true, MICROS)).named("k"))
+    .addField(Types.optional(INT64).as(timestampType(false, MICROS)).named("l"))
+    .addField(Types.optional(FIXED_LEN_BYTE_ARRAY).length(12).as(INTERVAL).named("m"))
     .named("root");
 
   private final Schema allTypesArrowSchema = new Schema(asList(
@@ -169,7 +178,7 @@ public class TestSchemaConverter {
     .addField(Types.optional(INT64).as(DECIMAL).precision(15).scale(5).named("k1"))
     .addField(Types.optional(BINARY).as(DECIMAL).precision(25).scale(5).named("k2"))
     .addField(Types.optional(INT32).as(DATE).named("l"))
-    .addField(Types.optional(INT32).as(TIME_MILLIS).named("m"))
+    .addField(Types.optional(INT32).as(timeType(false, MILLIS)).named("m"))
     .addField(Types.optional(INT64).as(TIMESTAMP_MILLIS).named("n"))
     .addField(Types.optional(FIXED_LEN_BYTE_ARRAY).length(12).as(INTERVAL).named("o"))
     .addField(Types.optional(FIXED_LEN_BYTE_ARRAY).length(12).as(INTERVAL).named("o1"))
@@ -365,7 +374,8 @@ public class TestSchemaConverter {
     MessageType expected = converter.fromArrow(new Schema(asList(
       field("a", new ArrowType.Time(TimeUnit.MILLISECOND, 32))
     ))).getParquetSchema();
-    Assert.assertEquals(expected, Types.buildMessage().addField(Types.optional(INT32).as(TIME_MILLIS).named("a")).named("root"));
+    Assert.assertEquals(expected,
+      Types.buildMessage().addField(Types.optional(INT32).as(timeType(false, MILLIS)).named("a")).named("root"));
   }
 
   @Test
@@ -373,7 +383,8 @@ public class TestSchemaConverter {
     MessageType expected = converter.fromArrow(new Schema(asList(
       field("a", new ArrowType.Time(TimeUnit.MICROSECOND, 64))
     ))).getParquetSchema();
-    Assert.assertEquals(expected, Types.buildMessage().addField(Types.optional(INT64).as(TIME_MICROS).named("a")).named("root"));
+    Assert.assertEquals(expected,
+      Types.buildMessage().addField(Types.optional(INT64).as(timeType(false, MICROS)).named("a")).named("root"));
   }
 
   @Test(expected = UnsupportedOperationException.class)
diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java
index 1bb12b9..558446e 100644
--- a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java
+++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java
@@ -24,10 +24,9 @@ import org.apache.avro.Schema;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.parquet.schema.ConversionPatterns;
-import org.apache.parquet.schema.DecimalMetadata;
 import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.LogicalTypeAnnotation;
 import org.apache.parquet.schema.MessageType;
-import org.apache.parquet.schema.OriginalType;
 import org.apache.parquet.schema.PrimitiveType;
 import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
 import org.apache.parquet.schema.Type;
@@ -36,11 +35,21 @@ import org.apache.parquet.schema.Types;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import java.util.Optional;
 
+import static java.util.Optional.empty;
+import static java.util.Optional.of;
 import static org.apache.avro.JsonProperties.NULL_VALUE;
 import static org.apache.parquet.avro.AvroWriteSupport.WRITE_OLD_LIST_STRUCTURE;
 import static org.apache.parquet.avro.AvroWriteSupport.WRITE_OLD_LIST_STRUCTURE_DEFAULT;
-import static org.apache.parquet.schema.OriginalType.*;
+import static org.apache.parquet.schema.LogicalTypeAnnotation.TimeUnit.MICROS;
+import static org.apache.parquet.schema.LogicalTypeAnnotation.TimeUnit.MILLIS;
+import static org.apache.parquet.schema.LogicalTypeAnnotation.dateType;
+import static org.apache.parquet.schema.LogicalTypeAnnotation.decimalType;
+import static org.apache.parquet.schema.LogicalTypeAnnotation.enumType;
+import static org.apache.parquet.schema.LogicalTypeAnnotation.stringType;
+import static org.apache.parquet.schema.LogicalTypeAnnotation.timeType;
+import static org.apache.parquet.schema.LogicalTypeAnnotation.timestampType;
 import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.*;
 import static org.apache.parquet.schema.Type.Repetition.REPEATED;
 
@@ -147,11 +156,11 @@ public class AvroSchemaConverter {
     } else if (type.equals(Schema.Type.BYTES)) {
       builder = Types.primitive(BINARY, repetition);
     } else if (type.equals(Schema.Type.STRING)) {
-      builder = Types.primitive(BINARY, repetition).as(UTF8);
+      builder = Types.primitive(BINARY, repetition).as(stringType());
     } else if (type.equals(Schema.Type.RECORD)) {
       return new GroupType(repetition, fieldName, convertFields(schema.getFields()));
     } else if (type.equals(Schema.Type.ENUM)) {
-      builder = Types.primitive(BINARY, repetition).as(ENUM);
+      builder = Types.primitive(BINARY, repetition).as(enumType());
     } else if (type.equals(Schema.Type.ARRAY)) {
       if (writeOldListStructure) {
         return ConversionPatterns.listType(repetition, fieldName,
@@ -178,12 +187,10 @@ public class AvroSchemaConverter {
     LogicalType logicalType = schema.getLogicalType();
     if (logicalType != null) {
       if (logicalType instanceof LogicalTypes.Decimal) {
-        builder = builder.as(DECIMAL)
-            .precision(((LogicalTypes.Decimal) logicalType).getPrecision())
-            .scale(((LogicalTypes.Decimal) logicalType).getScale());
-
+        LogicalTypes.Decimal decimal = (LogicalTypes.Decimal) logicalType;
+        builder = builder.as(decimalType(decimal.getScale(), decimal.getPrecision()));
       } else {
-        OriginalType annotation = convertLogicalType(logicalType);
+        LogicalTypeAnnotation annotation = convertLogicalType(logicalType);
         if (annotation != null) {
           builder.as(annotation);
         }
@@ -267,7 +274,7 @@ public class AvroSchemaConverter {
       final PrimitiveType asPrimitive = parquetType.asPrimitiveType();
       final PrimitiveTypeName parquetPrimitiveTypeName =
           asPrimitive.getPrimitiveTypeName();
-      final OriginalType annotation = parquetType.getOriginalType();
+      final LogicalTypeAnnotation annotation = parquetType.getLogicalTypeAnnotation();
       Schema schema = parquetPrimitiveTypeName.convert(
           new PrimitiveType.PrimitiveTypeNameConverter<Schema, RuntimeException>() {
             @Override
@@ -301,7 +308,8 @@ public class AvroSchemaConverter {
             }
             @Override
             public Schema convertBINARY(PrimitiveTypeName primitiveTypeName) {
-              if (annotation == OriginalType.UTF8 || annotation == OriginalType.ENUM) {
+              if (annotation instanceof LogicalTypeAnnotation.StringLogicalTypeAnnotation ||
+                annotation instanceof  LogicalTypeAnnotation.EnumLogicalTypeAnnotation) {
                 return Schema.create(Schema.Type.STRING);
               } else {
                 return Schema.create(Schema.Type.BYTES);
@@ -309,9 +317,8 @@ public class AvroSchemaConverter {
             }
           });
 
-      LogicalType logicalType = convertOriginalType(
-          annotation, asPrimitive.getDecimalMetadata());
-      if (logicalType != null && (annotation != DECIMAL ||
+      LogicalType logicalType = convertLogicalType(annotation);
+      if (logicalType != null && (!(annotation instanceof LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) ||
           parquetPrimitiveTypeName == BINARY ||
           parquetPrimitiveTypeName == FIXED_LEN_BYTE_ARRAY)) {
         schema = logicalType.addToSchema(schema);
@@ -321,10 +328,11 @@ public class AvroSchemaConverter {
 
     } else {
       GroupType parquetGroupType = parquetType.asGroupType();
-      OriginalType originalType = parquetGroupType.getOriginalType();
-      if (originalType != null) {
-        switch(originalType) {
-          case LIST:
+      LogicalTypeAnnotation logicalTypeAnnotation = parquetGroupType.getLogicalTypeAnnotation();
+      if (logicalTypeAnnotation != null) {
+        return logicalTypeAnnotation.accept(new LogicalTypeAnnotation.LogicalTypeAnnotationVisitor<Schema>() {
+          @Override
+          public Optional<Schema> visit(LogicalTypeAnnotation.ListLogicalTypeAnnotation listLogicalType) {
             if (parquetGroupType.getFieldCount()!= 1) {
               throw new UnsupportedOperationException("Invalid list type " + parquetGroupType);
             }
@@ -334,17 +342,29 @@ public class AvroSchemaConverter {
             }
             if (isElementType(repeatedType, parquetGroupType.getName())) {
               // repeated element types are always required
-              return Schema.createArray(convertField(repeatedType));
+              return of(Schema.createArray(convertField(repeatedType)));
             } else {
               Type elementType = repeatedType.asGroupType().getType(0);
               if (elementType.isRepetition(Type.Repetition.OPTIONAL)) {
-                return Schema.createArray(optional(convertField(elementType)));
+                return of(Schema.createArray(optional(convertField(elementType))));
               } else {
-                return Schema.createArray(convertField(elementType));
+                return of(Schema.createArray(convertField(elementType)));
               }
             }
-          case MAP_KEY_VALUE: // for backward-compatibility
-          case MAP:
+          }
+
+          @Override
+          // for backward-compatibility
+          public Optional<Schema> visit(LogicalTypeAnnotation.MapKeyValueTypeAnnotation mapKeyValueLogicalType) {
+            return visitMapOrMapKeyValue();
+          }
+
+          @Override
+          public Optional<Schema> visit(LogicalTypeAnnotation.MapLogicalTypeAnnotation mapLogicalType) {
+            return visitMapOrMapKeyValue();
+          }
+
+          private Optional<Schema> visitMapOrMapKeyValue() {
             if (parquetGroupType.getFieldCount() != 1 || parquetGroupType.getType(0).isPrimitive()) {
               throw new UnsupportedOperationException("Invalid map type " + parquetGroupType);
             }
@@ -356,24 +376,23 @@ public class AvroSchemaConverter {
             Type keyType = mapKeyValType.getType(0);
             if (!keyType.isPrimitive() ||
                 !keyType.asPrimitiveType().getPrimitiveTypeName().equals(PrimitiveTypeName.BINARY) ||
-                !keyType.getOriginalType().equals(OriginalType.UTF8)) {
+                !keyType.getLogicalTypeAnnotation().equals(stringType())) {
               throw new IllegalArgumentException("Map key type must be binary (UTF8): "
                   + keyType);
             }
             Type valueType = mapKeyValType.getType(1);
             if (valueType.isRepetition(Type.Repetition.OPTIONAL)) {
-              return Schema.createMap(optional(convertField(valueType)));
+              return of(Schema.createMap(optional(convertField(valueType))));
             } else {
-              return Schema.createMap(convertField(valueType));
+              return of(Schema.createMap(convertField(valueType)));
             }
-          case ENUM:
-            return Schema.create(Schema.Type.STRING);
-          case UTF8:
-          default:
-            throw new UnsupportedOperationException("Cannot convert Parquet type " +
-                parquetType);
+          }
 
-        }
+          @Override
+          public Optional<Schema> visit(LogicalTypeAnnotation.EnumLogicalTypeAnnotation enumLogicalType) {
+            return of(Schema.create(Schema.Type.STRING));
+          }
+        }).orElseThrow(() -> new UnsupportedOperationException("Cannot convert Parquet type " + parquetType));
       } else {
         // if no original type then it's a record
         return convertFields(parquetGroupType.getName(), parquetGroupType.getFields());
@@ -381,44 +400,65 @@ public class AvroSchemaConverter {
     }
   }
 
-  private OriginalType convertLogicalType(LogicalType logicalType) {
+  private LogicalTypeAnnotation convertLogicalType(LogicalType logicalType) {
     if (logicalType == null) {
       return null;
     } else if (logicalType instanceof LogicalTypes.Decimal) {
-      return OriginalType.DECIMAL;
+      LogicalTypes.Decimal decimal = (LogicalTypes.Decimal) logicalType;
+      return decimalType(decimal.getScale(), decimal.getPrecision());
     } else if (logicalType instanceof LogicalTypes.Date) {
-      return OriginalType.DATE;
+      return dateType();
     } else if (logicalType instanceof LogicalTypes.TimeMillis) {
-      return OriginalType.TIME_MILLIS;
+      return timeType(true, MILLIS);
     } else if (logicalType instanceof LogicalTypes.TimeMicros) {
-      return OriginalType.TIME_MICROS;
+      return timeType(true, MICROS);
     } else if (logicalType instanceof LogicalTypes.TimestampMillis) {
-      return OriginalType.TIMESTAMP_MILLIS;
+      return timestampType(true, MILLIS);
     } else if (logicalType instanceof LogicalTypes.TimestampMicros) {
-      return OriginalType.TIMESTAMP_MICROS;
+      return timestampType(true, MICROS);
     }
     return null;
   }
 
-  private LogicalType convertOriginalType(OriginalType annotation, DecimalMetadata meta) {
+  private LogicalType convertLogicalType(LogicalTypeAnnotation annotation) {
     if (annotation == null) {
       return null;
     }
-    switch (annotation) {
-      case DECIMAL:
-        return LogicalTypes.decimal(meta.getPrecision(), meta.getScale());
-      case DATE:
-        return LogicalTypes.date();
-      case TIME_MILLIS:
-        return LogicalTypes.timeMillis();
-      case TIME_MICROS:
-        return LogicalTypes.timeMicros();
-      case TIMESTAMP_MILLIS:
-        return LogicalTypes.timestampMillis();
-      case TIMESTAMP_MICROS:
-        return LogicalTypes.timestampMicros();
-    }
-    return null;
+    return annotation.accept(new LogicalTypeAnnotation.LogicalTypeAnnotationVisitor<LogicalType>() {
+      @Override
+      public Optional<LogicalType> visit(LogicalTypeAnnotation.DecimalLogicalTypeAnnotation decimalLogicalType) {
+        return of(LogicalTypes.decimal(decimalLogicalType.getPrecision(), decimalLogicalType.getScale()));
+      }
+
+      @Override
+      public Optional<LogicalType> visit(LogicalTypeAnnotation.DateLogicalTypeAnnotation dateLogicalType) {
+        return of(LogicalTypes.date());
+      }
+
+      @Override
+      public Optional<LogicalType> visit(LogicalTypeAnnotation.TimeLogicalTypeAnnotation timeLogicalType) {
+        LogicalTypeAnnotation.TimeUnit unit = timeLogicalType.getUnit();
+        switch (unit) {
+          case MILLIS:
+            return of(LogicalTypes.timeMillis());
+          case MICROS:
+            return of(LogicalTypes.timeMicros());
+        }
+        return empty();
+      }
+
+      @Override
+      public Optional<LogicalType> visit(LogicalTypeAnnotation.TimestampLogicalTypeAnnotation timestampLogicalType) {
+        LogicalTypeAnnotation.TimeUnit unit = timestampLogicalType.getUnit();
+        switch (unit) {
+          case MILLIS:
+            return of(LogicalTypes.timestampMillis());
+          case MICROS:
+            return of(LogicalTypes.timestampMicros());
+        }
+        return empty();
+      }
+    }).orElse(null);
   }
 
   /**
diff --git a/parquet-avro/src/test/java/org/apache/parquet/avro/TestAvroSchemaConverter.java b/parquet-avro/src/test/java/org/apache/parquet/avro/TestAvroSchemaConverter.java
index 942e3b1..bfaeec3 100644
--- a/parquet-avro/src/test/java/org/apache/parquet/avro/TestAvroSchemaConverter.java
+++ b/parquet-avro/src/test/java/org/apache/parquet/avro/TestAvroSchemaConverter.java
@@ -1,4 +1,4 @@
-/* 
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -616,7 +616,7 @@ public class TestAvroSchemaConverter {
 
     testRoundTripConversion(expected,
         "message myrecord {\n" +
-            "  required int32 time (TIME_MILLIS);\n" +
+            "  required int32 time (TIME(MILLIS,true));\n" +
             "}\n");
 
     for (PrimitiveTypeName primitive : new PrimitiveTypeName[]
@@ -646,7 +646,7 @@ public class TestAvroSchemaConverter {
 
     testRoundTripConversion(expected,
         "message myrecord {\n" +
-            "  required int64 time (TIME_MICROS);\n" +
+            "  required int64 time (TIME(MICROS,true));\n" +
             "}\n");
 
     for (PrimitiveTypeName primitive : new PrimitiveTypeName[]
@@ -676,7 +676,7 @@ public class TestAvroSchemaConverter {
 
     testRoundTripConversion(expected,
         "message myrecord {\n" +
-            "  required int64 timestamp (TIMESTAMP_MILLIS);\n" +
+            "  required int64 timestamp (TIMESTAMP(MILLIS,true));\n" +
             "}\n");
 
     for (PrimitiveTypeName primitive : new PrimitiveTypeName[]
@@ -706,7 +706,7 @@ public class TestAvroSchemaConverter {
 
     testRoundTripConversion(expected,
         "message myrecord {\n" +
-            "  required int64 timestamp (TIMESTAMP_MICROS);\n" +
+            "  required int64 timestamp (TIMESTAMP(MICROS,true));\n" +
             "}\n");
 
     for (PrimitiveTypeName primitive : new PrimitiveTypeName[]
diff --git a/parquet-cascading-common23/src/main/java/org/apache/parquet/cascading/convert/TupleConverter.java b/parquet-cascading-common23/src/main/java/org/apache/parquet/cascading/convert/TupleConverter.java
index 3741165..4c1240b 100644
--- a/parquet-cascading-common23/src/main/java/org/apache/parquet/cascading/convert/TupleConverter.java
+++ b/parquet-cascading-common23/src/main/java/org/apache/parquet/cascading/convert/TupleConverter.java
@@ -1,4 +1,4 @@
-/* 
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -27,10 +27,7 @@ import org.apache.parquet.io.api.GroupConverter;
 import org.apache.parquet.io.api.PrimitiveConverter;
 import org.apache.parquet.pig.TupleConversionException;
 import org.apache.parquet.schema.GroupType;
-import org.apache.parquet.schema.OriginalType;
-import org.apache.parquet.schema.PrimitiveType;
 import org.apache.parquet.schema.Type;
-import org.apache.parquet.schema.Type.Repetition;
 
 public class TupleConverter extends GroupConverter {
 
diff --git a/parquet-cli/src/main/java/org/apache/parquet/cli/Util.java b/parquet-cli/src/main/java/org/apache/parquet/cli/Util.java
index 98bc1e5..961c7f0 100644
--- a/parquet-cli/src/main/java/org/apache/parquet/cli/Util.java
+++ b/parquet-cli/src/main/java/org/apache/parquet/cli/Util.java
@@ -80,7 +80,12 @@ public class Util {
     }
   }
 
+  @Deprecated
   public static String minMaxAsString(Statistics stats, OriginalType annotation) {
+    return minMaxAsString(stats);
+  }
+
+  public static String minMaxAsString(Statistics stats) {
     if (stats == null) {
       return "no stats";
     }
@@ -90,7 +95,12 @@ public class Util {
     return String.format("%s / %s", humanReadable(stats.minAsString(), 30), humanReadable(stats.maxAsString(), 30));
   }
 
+  @Deprecated
   public static String toString(Statistics stats, long count, OriginalType annotation) {
+    return toString(stats, count);
+  }
+
+  public static String toString(Statistics stats, long count) {
     if (stats == null) {
       return "no stats";
     }
diff --git a/parquet-cli/src/main/java/org/apache/parquet/cli/commands/ParquetMetadataCommand.java b/parquet-cli/src/main/java/org/apache/parquet/cli/commands/ParquetMetadataCommand.java
index 54fe657..a452369 100644
--- a/parquet-cli/src/main/java/org/apache/parquet/cli/commands/ParquetMetadataCommand.java
+++ b/parquet-cli/src/main/java/org/apache/parquet/cli/commands/ParquetMetadataCommand.java
@@ -169,12 +169,12 @@ public class ParquetMetadataCommand extends BaseCommand {
       console.info(String.format("%-" + width + "s  FIXED[%d] %s %-7s %-9d %-8s %-7s %s",
           name, type.getTypeLength(), shortCodec(codec), encodingSummary, count,
           humanReadable(perValue), stats == null || !stats.isNumNullsSet() ? "" : String.valueOf(stats.getNumNulls()),
-          minMaxAsString(stats, type.getOriginalType())));
+          minMaxAsString(stats)));
     } else {
       console.info(String.format("%-" + width + "s  %-9s %s %-7s %-9d %-10s %-7s %s",
           name, typeName, shortCodec(codec), encodingSummary, count, humanReadable(perValue),
           stats == null || !stats.isNumNullsSet() ? "" : String.valueOf(stats.getNumNulls()),
-          minMaxAsString(stats, type.getOriginalType())));
+          minMaxAsString(stats)));
     }
   }
 }
diff --git a/parquet-cli/src/main/java/org/apache/parquet/cli/commands/ShowDictionaryCommand.java b/parquet-cli/src/main/java/org/apache/parquet/cli/commands/ShowDictionaryCommand.java
index db427c9..20a694f 100644
--- a/parquet-cli/src/main/java/org/apache/parquet/cli/commands/ShowDictionaryCommand.java
+++ b/parquet-cli/src/main/java/org/apache/parquet/cli/commands/ShowDictionaryCommand.java
@@ -30,8 +30,8 @@ import org.apache.parquet.column.Dictionary;
 import org.apache.parquet.column.page.DictionaryPage;
 import org.apache.parquet.column.page.DictionaryPageReadStore;
 import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.schema.LogicalTypeAnnotation;
 import org.apache.parquet.schema.MessageType;
-import org.apache.parquet.schema.OriginalType;
 import org.apache.parquet.schema.PrimitiveType;
 import org.slf4j.Logger;
 import java.io.IOException;
@@ -81,7 +81,7 @@ public class ShowDictionaryCommand extends BaseCommand {
       for (int i = 0; i <= dict.getMaxId(); i += 1) {
         switch(type.getPrimitiveTypeName()) {
           case BINARY:
-            if (type.getOriginalType() == OriginalType.UTF8) {
+            if (type.getLogicalTypeAnnotation() instanceof LogicalTypeAnnotation.StringLogicalTypeAnnotation) {
               console.info("{}: {}", String.format("%6d", i),
                   Util.humanReadable(dict.decodeToBinary(i).toStringUsingUTF8(), 70));
             } else {
diff --git a/parquet-cli/src/main/java/org/apache/parquet/cli/commands/ShowPagesCommand.java b/parquet-cli/src/main/java/org/apache/parquet/cli/commands/ShowPagesCommand.java
index 4d0e2c9..1ac03aa 100644
--- a/parquet-cli/src/main/java/org/apache/parquet/cli/commands/ShowPagesCommand.java
+++ b/parquet-cli/src/main/java/org/apache/parquet/cli/commands/ShowPagesCommand.java
@@ -193,7 +193,7 @@ public class ShowPagesCommand extends BaseCommand {
       int count = page.getValueCount();
       String numNulls = page.getStatistics().isNumNullsSet() ? Long.toString(page.getStatistics().getNumNulls()) : "";
       float perValue = ((float) totalSize) / count;
-      String minMax = minMaxAsString(page.getStatistics(), type.getOriginalType());
+      String minMax = minMaxAsString(page.getStatistics());
       return String.format("%3d-%-3d  %-5s %s %-2s %-7d %-10s %-10s %-8s %-7s %s",
           rowGroupNum, pageNum, "data", shortCodec, enc, count, humanReadable(perValue),
           humanReadable(totalSize), "", numNulls, minMax);
@@ -207,7 +207,7 @@ public class ShowPagesCommand extends BaseCommand {
       int numRows = page.getRowCount();
       int numNulls = page.getNullCount();
       float perValue = ((float) totalSize) / count;
-      String minMax = minMaxAsString(page.getStatistics(), type.getOriginalType());
+      String minMax = minMaxAsString(page.getStatistics());
       String compression = (page.isCompressed() ? shortCodec : "_");
       return String.format("%3d-%-3d  %-5s %s %-2s %-7d %-10s %-10s %-8d %-7s %s",
           rowGroupNum, pageNum, "data", compression, enc, count, humanReadable(perValue),
diff --git a/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/ValidTypeMap.java b/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/ValidTypeMap.java
index b8f48bb..62c174e 100644
--- a/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/ValidTypeMap.java
+++ b/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/ValidTypeMap.java
@@ -1,4 +1,4 @@
-/* 
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -25,7 +25,6 @@ import java.util.Set;
 
 import org.apache.parquet.filter2.predicate.Operators.Column;
 import org.apache.parquet.hadoop.metadata.ColumnPath;
-import org.apache.parquet.schema.OriginalType;
 import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
 
 /**
diff --git a/parquet-column/src/main/java/org/apache/parquet/schema/ConversionPatterns.java b/parquet-column/src/main/java/org/apache/parquet/schema/ConversionPatterns.java
index 6db1e58..a530db1 100644
--- a/parquet-column/src/main/java/org/apache/parquet/schema/ConversionPatterns.java
+++ b/parquet-column/src/main/java/org/apache/parquet/schema/ConversionPatterns.java
@@ -1,4 +1,4 @@
-/* 
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -22,7 +22,7 @@ import org.apache.parquet.Preconditions;
 import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
 import org.apache.parquet.schema.Type.Repetition;
 
-import static org.apache.parquet.schema.OriginalType.*;
+import static org.apache.parquet.schema.LogicalTypeAnnotation.stringType;
 
 /**
  * Utility functions to convert from Java-like map and list types
@@ -37,15 +37,15 @@ public abstract class ConversionPatterns {
    *
    * @param repetition   repetition for the list or map
    * @param alias        name of the field
-   * @param originalType original type for the list or map
+   * @param logicalTypeAnnotation logical type for the list or map
    * @param nested       the nested repeated field
    * @return a group type
    */
-  private static GroupType listWrapper(Repetition repetition, String alias, OriginalType originalType, Type nested) {
+  private static GroupType listWrapper(Repetition repetition, String alias, LogicalTypeAnnotation logicalTypeAnnotation, Type nested) {
     if (!nested.isRepetition(Repetition.REPEATED)) {
       throw new IllegalArgumentException("Nested type should be repeated: " + nested);
     }
-    return new GroupType(repetition, alias, originalType, nested);
+    return new GroupType(repetition, alias, logicalTypeAnnotation, nested);
   }
 
   public static GroupType mapType(Repetition repetition, String alias, Type keyType, Type valueType) {
@@ -53,7 +53,7 @@ public abstract class ConversionPatterns {
   }
 
   public static GroupType stringKeyMapType(Repetition repetition, String alias, String mapAlias, Type valueType) {
-    return mapType(repetition, alias, mapAlias, new PrimitiveType(Repetition.REQUIRED, PrimitiveTypeName.BINARY, "key", OriginalType.UTF8), valueType);
+    return mapType(repetition, alias, mapAlias, new PrimitiveType(Repetition.REQUIRED, PrimitiveTypeName.BINARY, "key", stringType()), valueType);
   }
 
   public static GroupType stringKeyMapType(Repetition repetition, String alias, Type valueType) {
@@ -66,11 +66,11 @@ public abstract class ConversionPatterns {
       return listWrapper(
               repetition,
               alias,
-              MAP,
+              LogicalTypeAnnotation.mapType(),
               new GroupType(
                       Repetition.REPEATED,
                       mapAlias,
-                      MAP_KEY_VALUE,
+                      LogicalTypeAnnotation.MapKeyValueTypeAnnotation.getInstance(),
                       keyType)
       );
     } else {
@@ -80,11 +80,11 @@ public abstract class ConversionPatterns {
       return listWrapper(
               repetition,
               alias,
-              MAP,
+              LogicalTypeAnnotation.mapType(),
               new GroupType(
                       Repetition.REPEATED,
                       mapAlias,
-                      MAP_KEY_VALUE,
+                      LogicalTypeAnnotation.MapKeyValueTypeAnnotation.getInstance(),
                       keyType,
                       valueType)
       );
@@ -103,7 +103,7 @@ public abstract class ConversionPatterns {
     return listWrapper(
             repetition,
             alias,
-            LIST,
+            LogicalTypeAnnotation.listType(),
             nestedType
     );
   }
@@ -125,7 +125,7 @@ public abstract class ConversionPatterns {
     return listWrapper(
         listRepetition,
         name,
-        LIST,
+        LogicalTypeAnnotation.listType(),
         new GroupType(Repetition.REPEATED, "list", elementType)
     );
   }
diff --git a/parquet-column/src/main/java/org/apache/parquet/schema/GroupType.java b/parquet-column/src/main/java/org/apache/parquet/schema/GroupType.java
index 5cb40e5..64e7062 100644
--- a/parquet-column/src/main/java/org/apache/parquet/schema/GroupType.java
+++ b/parquet-column/src/main/java/org/apache/parquet/schema/GroupType.java
@@ -70,6 +70,16 @@ public class GroupType extends Type {
   /**
    * @param repetition OPTIONAL, REPEATED, REQUIRED
    * @param name the name of the field
+   * @param logicalTypeAnnotation (optional) the logical type to help with cross schema conversion (LIST, MAP, ...)
+   * @param fields the contained fields
+   */
+  GroupType(Repetition repetition, String name, LogicalTypeAnnotation logicalTypeAnnotation, Type... fields) {
+    this(repetition, name, logicalTypeAnnotation, Arrays.asList(fields));
+  }
+
+  /**
+   * @param repetition OPTIONAL, REPEATED, REQUIRED
+   * @param name the name of the field
    * @param originalType (optional) the original type to help with cross schema conversion (LIST, MAP, ...)
    * @param fields the contained fields
    */
@@ -81,6 +91,16 @@ public class GroupType extends Type {
   /**
    * @param repetition OPTIONAL, REPEATED, REQUIRED
    * @param name the name of the field
+   * @param logicalTypeAnnotation (optional) the logical type to help with cross schema conversion (LIST, MAP, ...)
+   * @param fields the contained fields
+   */
+  GroupType(Repetition repetition, String name, LogicalTypeAnnotation logicalTypeAnnotation, List<Type> fields) {
+    this(repetition, name, logicalTypeAnnotation, fields, null);
+  }
+
+  /**
+   * @param repetition OPTIONAL, REPEATED, REQUIRED
+   * @param name the name of the field
    * @param originalType (optional) the original type to help with cross schema conversion (LIST, MAP, ...)
    * @param fields the contained fields
    * @param id the id of the field
@@ -109,7 +129,7 @@ public class GroupType extends Type {
    */
   @Override
   public GroupType withId(int id) {
-    return new GroupType(getRepetition(), getName(), getOriginalType(), fields, new ID(id));
+    return new GroupType(getRepetition(), getName(), getLogicalTypeAnnotation(), fields, new ID(id));
   }
 
   /**
@@ -117,7 +137,7 @@ public class GroupType extends Type {
    * @return a group with the same attributes and new fields.
    */
   public GroupType withNewFields(List<Type> newFields) {
-    return new GroupType(getRepetition(), getName(), getOriginalType(), newFields, getId());
+    return new GroupType(getRepetition(), getName(), getLogicalTypeAnnotation(), newFields, getId());
   }
 
   /**
@@ -219,7 +239,7 @@ public class GroupType extends Type {
         .append(getRepetition().name().toLowerCase(Locale.ENGLISH))
         .append(" group ")
         .append(getName())
-        .append(getOriginalType() == null ? "" : " (" + getOriginalType() +")")
+        .append(getLogicalTypeAnnotation() == null ? "" : " (" + getLogicalTypeAnnotation().toString() +")")
         .append(getId() == null ? "" : " = " + getId())
         .append(" {\n");
     membersDisplayString(sb, indent + "  ");
@@ -250,7 +270,7 @@ public class GroupType extends Type {
    */
   @Override
   public int hashCode() {
-    return Objects.hash(getOriginalType(), getFields());
+    return Objects.hash(getLogicalTypeAnnotation(), getFields());
   }
 
   /**
@@ -261,7 +281,7 @@ public class GroupType extends Type {
     return
         !otherType.isPrimitive()
         && super.equals(otherType)
-        && getOriginalType() == otherType.getOriginalType()
+        && Objects.equals(getLogicalTypeAnnotation(),otherType.getLogicalTypeAnnotation())
         && getFields().equals(otherType.asGroupType().getFields());
   }
 
@@ -355,7 +375,7 @@ public class GroupType extends Type {
     if (toMerge.isPrimitive()) {
       throw new IncompatibleSchemaModificationException("can not merge primitive type " + toMerge + " into group type " + this);
     }
-    return new GroupType(toMerge.getRepetition(), getName(), toMerge.getOriginalType(), mergeFields(toMerge.asGroupType()), getId());
+    return new GroupType(toMerge.getRepetition(), getName(), toMerge.getLogicalTypeAnnotation(), mergeFields(toMerge.asGroupType()), getId());
   }
 
   /**
@@ -383,8 +403,8 @@ public class GroupType extends Type {
         if (fieldToMerge.getRepetition().isMoreRestrictiveThan(type.getRepetition())) {
           throw new IncompatibleSchemaModificationException("repetition constraint is more restrictive: can not merge type " + fieldToMerge + " into " + type);
         }
-        if (type.getOriginalType() != null && fieldToMerge.getOriginalType() != type.getOriginalType()) {
-          throw new IncompatibleSchemaModificationException("cannot merge original type " + fieldToMerge.getOriginalType() + " into " + type.getOriginalType());
+        if (type.getLogicalTypeAnnotation() != null && !type.getLogicalTypeAnnotation().equals(fieldToMerge.getLogicalTypeAnnotation())) {
+          throw new IncompatibleSchemaModificationException("cannot merge logical type " + fieldToMerge.getLogicalTypeAnnotation() + " into " + type.getLogicalTypeAnnotation());
         }
         merged = type.union(fieldToMerge, strict);
       } else {
diff --git a/parquet-column/src/main/java/org/apache/parquet/schema/LogicalTypeAnnotation.java b/parquet-column/src/main/java/org/apache/parquet/schema/LogicalTypeAnnotation.java
index 340a24a..6046a39 100644
--- a/parquet-column/src/main/java/org/apache/parquet/schema/LogicalTypeAnnotation.java
+++ b/parquet-column/src/main/java/org/apache/parquet/schema/LogicalTypeAnnotation.java
@@ -20,12 +20,18 @@ package org.apache.parquet.schema;
 
 import org.apache.parquet.Preconditions;
 
+import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Objects;
 import java.util.Optional;
+import java.util.Set;
 import java.util.function.Supplier;
 
+import static java.util.Arrays.asList;
 import static java.util.Optional.empty;
+import static org.apache.parquet.schema.ColumnOrder.ColumnOrderName.TYPE_DEFINED_ORDER;
+import static org.apache.parquet.schema.ColumnOrder.ColumnOrderName.UNDEFINED;
 
 public abstract class LogicalTypeAnnotation {
   enum LogicalTypeToken {
@@ -144,6 +150,10 @@ public abstract class LogicalTypeAnnotation {
     return "";
   }
 
+  boolean isValidColumnOrder(ColumnOrder columnOrder) {
+    return columnOrder.getColumnOrderName() == UNDEFINED || columnOrder.getColumnOrderName() == TYPE_DEFINED_ORDER;
+  }
+
   @Override
   public String toString() {
     StringBuilder sb = new StringBuilder();
@@ -152,6 +162,10 @@ public abstract class LogicalTypeAnnotation {
     return sb.toString();
   }
 
+  PrimitiveStringifier valueStringifier(PrimitiveType primitiveType) {
+    throw new UnsupportedOperationException("Stringifier is not supported for the logical type: " + this);
+  }
+
   /**
    * Helper method to convert the old representation of logical types (OriginalType) to new logical type.
    */
@@ -290,6 +304,11 @@ public abstract class LogicalTypeAnnotation {
       // This type doesn't have any parameters, thus using class hashcode
       return getClass().hashCode();
     }
+
+    @Override
+    PrimitiveStringifier valueStringifier(PrimitiveType primitiveType) {
+      return PrimitiveStringifier.UTF8_STRINGIFIER;
+    }
   }
 
   public static class MapLogicalTypeAnnotation extends LogicalTypeAnnotation {
@@ -389,15 +408,22 @@ public abstract class LogicalTypeAnnotation {
       // This type doesn't have any parameters, thus using class hashcode
       return getClass().hashCode();
     }
+
+    @Override
+    PrimitiveStringifier valueStringifier(PrimitiveType primitiveType) {
+      return PrimitiveStringifier.UTF8_STRINGIFIER;
+    }
   }
 
   public static class DecimalLogicalTypeAnnotation extends LogicalTypeAnnotation {
+    private final PrimitiveStringifier stringifier;
     private final int scale;
     private final int precision;
 
     private DecimalLogicalTypeAnnotation(int scale, int precision) {
       this.scale = scale;
       this.precision = precision;
+      stringifier = PrimitiveStringifier.createDecimalStringifier(scale);
     }
 
     public int getPrecision() {
@@ -447,6 +473,11 @@ public abstract class LogicalTypeAnnotation {
     public int hashCode() {
       return Objects.hash(scale, precision);
     }
+
+    @Override
+    PrimitiveStringifier valueStringifier(PrimitiveType primitiveType) {
+      return stringifier;
+    }
   }
 
   public static class DateLogicalTypeAnnotation extends LogicalTypeAnnotation {
@@ -480,6 +511,11 @@ public abstract class LogicalTypeAnnotation {
       // This type doesn't have any parameters, thus using class hashcode
       return getClass().hashCode();
     }
+
+    @Override
+    PrimitiveStringifier valueStringifier(PrimitiveType primitiveType) {
+      return PrimitiveStringifier.DATE_STRINGIFIER;
+    }
   }
 
   public enum TimeUnit {
@@ -550,6 +586,11 @@ public abstract class LogicalTypeAnnotation {
     public int hashCode() {
       return Objects.hash(isAdjustedToUTC, unit);
     }
+
+    @Override
+    PrimitiveStringifier valueStringifier(PrimitiveType primitiveType) {
+      return PrimitiveStringifier.TIME_STRINGIFIER;
+    }
   }
 
   public static class TimestampLogicalTypeAnnotation extends LogicalTypeAnnotation {
@@ -615,14 +656,31 @@ public abstract class LogicalTypeAnnotation {
     public int hashCode() {
       return Objects.hash(isAdjustedToUTC, unit);
     }
+
+    @Override
+    PrimitiveStringifier valueStringifier(PrimitiveType primitiveType) {
+      switch (unit) {
+        case MICROS:
+          return PrimitiveStringifier.TIMESTAMP_MICROS_STRINGIFIER;
+        case MILLIS:
+          return PrimitiveStringifier.TIMESTAMP_MILLIS_STRINGIFIER;
+        default:
+          return super.valueStringifier(primitiveType);
+      }
+    }
   }
 
   public static class IntLogicalTypeAnnotation extends LogicalTypeAnnotation {
+    private static final Set<Integer> VALID_BIT_WIDTH = Collections.unmodifiableSet(
+      new HashSet<>(asList(8, 16, 32, 64)));
+
     private final int bitWidth;
     private final boolean isSigned;
 
-
     private IntLogicalTypeAnnotation(int bitWidth, boolean isSigned) {
+      if (!VALID_BIT_WIDTH.contains(bitWidth)) {
+        throw new IllegalArgumentException("Invalid integer bit width: " + bitWidth);
+      }
       this.bitWidth = bitWidth;
       this.isSigned = isSigned;
     }
@@ -685,6 +743,11 @@ public abstract class LogicalTypeAnnotation {
     public int hashCode() {
       return Objects.hash(bitWidth, isSigned);
     }
+
+    @Override
+    PrimitiveStringifier valueStringifier(PrimitiveType primitiveType) {
+      return isSigned ? PrimitiveStringifier.DEFAULT_STRINGIFIER : PrimitiveStringifier.UNSIGNED_STRINGIFIER;
+    }
   }
 
   public static class JsonLogicalTypeAnnotation extends LogicalTypeAnnotation {
@@ -718,6 +781,11 @@ public abstract class LogicalTypeAnnotation {
       // This type doesn't have any parameters, thus using class hashcode
       return getClass().hashCode();
     }
+
+    @Override
+    PrimitiveStringifier valueStringifier(PrimitiveType primitiveType) {
+      return PrimitiveStringifier.UTF8_STRINGIFIER;
+    }
   }
 
   public static class BsonLogicalTypeAnnotation extends LogicalTypeAnnotation {
@@ -751,6 +819,11 @@ public abstract class LogicalTypeAnnotation {
       // This type doesn't have any parameters, thus using class hashcode
       return getClass().hashCode();
     }
+
+    @Override
+    PrimitiveStringifier valueStringifier(PrimitiveType primitiveType) {
+      return PrimitiveStringifier.DEFAULT_STRINGIFIER;
+    }
   }
 
   // This logical type annotation is implemented to support backward compatibility with ConvertedType.
@@ -791,6 +864,16 @@ public abstract class LogicalTypeAnnotation {
       // This type doesn't have any parameters, thus using class hashcode
       return getClass().hashCode();
     }
+
+    @Override
+    PrimitiveStringifier valueStringifier(PrimitiveType primitiveType) {
+      return PrimitiveStringifier.INTERVAL_STRINGIFIER;
+    }
+
+    @Override
+    boolean isValidColumnOrder(ColumnOrder columnOrder) {
+      return columnOrder.getColumnOrderName() == UNDEFINED;
+    }
   }
 
   // This logical type annotation is implemented to support backward compatibility with ConvertedType.
@@ -845,55 +928,55 @@ public abstract class LogicalTypeAnnotation {
    * or {@link Optional#orElseThrow(Supplier)} to throw exception if omitting a type is not allowed.
    */
   public interface LogicalTypeAnnotationVisitor<T> {
-    default Optional<T> visit(StringLogicalTypeAnnotation logicalTypeAnnotation) {
+    default Optional<T> visit(StringLogicalTypeAnnotation stringLogicalType) {
       return empty();
     }
 
-    default Optional<T> visit(MapLogicalTypeAnnotation logicalTypeAnnotation) {
+    default Optional<T> visit(MapLogicalTypeAnnotation mapLogicalType) {
       return empty();
     }
 
-    default Optional<T> visit(ListLogicalTypeAnnotation logicalTypeAnnotation) {
+    default Optional<T> visit(ListLogicalTypeAnnotation listLogicalType) {
       return empty();
     }
 
-    default Optional<T> visit(EnumLogicalTypeAnnotation logicalTypeAnnotation) {
+    default Optional<T> visit(EnumLogicalTypeAnnotation enumLogicalType) {
       return empty();
     }
 
-    default Optional<T> visit(DecimalLogicalTypeAnnotation logicalTypeAnnotation) {
+    default Optional<T> visit(DecimalLogicalTypeAnnotation decimalLogicalType) {
       return empty();
     }
 
-    default Optional<T> visit(DateLogicalTypeAnnotation logicalTypeAnnotation) {
+    default Optional<T> visit(DateLogicalTypeAnnotation dateLogicalType) {
       return empty();
     }
 
-    default Optional<T> visit(TimeLogicalTypeAnnotation logicalTypeAnnotation) {
+    default Optional<T> visit(TimeLogicalTypeAnnotation timeLogicalType) {
       return empty();
     }
 
-    default Optional<T> visit(TimestampLogicalTypeAnnotation logicalTypeAnnotation) {
+    default Optional<T> visit(TimestampLogicalTypeAnnotation timestampLogicalType) {
       return empty();
     }
 
-    default Optional<T> visit(IntLogicalTypeAnnotation logicalTypeAnnotation) {
+    default Optional<T> visit(IntLogicalTypeAnnotation intLogicalType) {
       return empty();
     }
 
-    default Optional<T> visit(JsonLogicalTypeAnnotation logicalTypeAnnotation) {
+    default Optional<T> visit(JsonLogicalTypeAnnotation jsonLogicalType) {
       return empty();
     }
 
-    default Optional<T> visit(BsonLogicalTypeAnnotation logicalTypeAnnotation) {
+    default Optional<T> visit(BsonLogicalTypeAnnotation bsonLogicalType) {
       return empty();
     }
 
-    default Optional<T> visit(IntervalLogicalTypeAnnotation logicalTypeAnnotation) {
+    default Optional<T> visit(IntervalLogicalTypeAnnotation intervalLogicalType) {
       return empty();
     }
 
-    default Optional<T> visit(MapKeyValueTypeAnnotation logicalTypeAnnotation) {
+    default Optional<T> visit(MapKeyValueTypeAnnotation mapKeyValueLogicalType) {
       return empty();
     }
   }
diff --git a/parquet-column/src/main/java/org/apache/parquet/schema/MessageType.java b/parquet-column/src/main/java/org/apache/parquet/schema/MessageType.java
index d305eb8..83f98d7 100644
--- a/parquet-column/src/main/java/org/apache/parquet/schema/MessageType.java
+++ b/parquet-column/src/main/java/org/apache/parquet/schema/MessageType.java
@@ -1,4 +1,4 @@
-/* 
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -62,7 +62,7 @@ public final class MessageType extends GroupType {
   public void writeToStringBuilder(StringBuilder sb, String indent) {
     sb.append("message ")
         .append(getName())
-        .append(getOriginalType() == null ? "" : " (" + getOriginalType() +")")
+        .append(getLogicalTypeAnnotation() == null ? "" : " (" + getLogicalTypeAnnotation().toString() +")")
         .append(" {\n");
     membersDisplayString(sb, "  ");
     sb.append("}\n");
diff --git a/parquet-column/src/main/java/org/apache/parquet/schema/OriginalType.java b/parquet-column/src/main/java/org/apache/parquet/schema/OriginalType.java
index b00ae7e..78421b3 100644
--- a/parquet-column/src/main/java/org/apache/parquet/schema/OriginalType.java
+++ b/parquet-column/src/main/java/org/apache/parquet/schema/OriginalType.java
@@ -1,4 +1,4 @@
-/* 
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -21,46 +21,24 @@ package org.apache.parquet.schema;
 public enum OriginalType {
   MAP,
   LIST,
-  UTF8(PrimitiveStringifier.UTF8_STRINGIFIER),
+  UTF8,
   MAP_KEY_VALUE,
-  ENUM(PrimitiveStringifier.UTF8_STRINGIFIER),
-  DECIMAL {
-    @Override
-    PrimitiveStringifier stringifier(PrimitiveType type) {
-      return PrimitiveStringifier.createDecimalStringifier(type.getDecimalMetadata().getScale());
-    }
-  },
-  DATE(PrimitiveStringifier.DATE_STRINGIFIER),
-  TIME_MILLIS(PrimitiveStringifier.TIME_STRINGIFIER),
-  TIME_MICROS(PrimitiveStringifier.TIME_STRINGIFIER),
-  TIMESTAMP_MILLIS(PrimitiveStringifier.TIMESTAMP_MILLIS_STRINGIFIER),
-  TIMESTAMP_MICROS(PrimitiveStringifier.TIMESTAMP_MICROS_STRINGIFIER),
-  UINT_8(PrimitiveStringifier.UNSIGNED_STRINGIFIER),
-  UINT_16(PrimitiveStringifier.UNSIGNED_STRINGIFIER),
-  UINT_32(PrimitiveStringifier.UNSIGNED_STRINGIFIER),
-  UINT_64(PrimitiveStringifier.UNSIGNED_STRINGIFIER),
-  INT_8(PrimitiveStringifier.DEFAULT_STRINGIFIER),
-  INT_16(PrimitiveStringifier.DEFAULT_STRINGIFIER),
-  INT_32(PrimitiveStringifier.DEFAULT_STRINGIFIER),
-  INT_64(PrimitiveStringifier.DEFAULT_STRINGIFIER),
-  JSON(PrimitiveStringifier.UTF8_STRINGIFIER),
-  BSON(PrimitiveStringifier.DEFAULT_STRINGIFIER),
-  INTERVAL(PrimitiveStringifier.INTERVAL_STRINGIFIER);
-
-  private final PrimitiveStringifier stringifier;
-
-  PrimitiveStringifier stringifier(PrimitiveType type) {
-    if (stringifier == null) {
-      throw new UnsupportedOperationException("Stringifier is not supported for the original type: " + this);
-    }
-    return stringifier;
-  }
-
-  OriginalType() {
-    this(null);
-  }
-
-  OriginalType(PrimitiveStringifier stringifier) {
-    this.stringifier = stringifier;
-  }
+  ENUM,
+  DECIMAL,
+  DATE,
+  TIME_MILLIS,
+  TIME_MICROS,
+  TIMESTAMP_MILLIS,
+  TIMESTAMP_MICROS,
+  UINT_8,
+  UINT_16,
+  UINT_32,
+  UINT_64,
+  INT_8,
+  INT_16,
+  INT_32,
+  INT_64,
+  JSON,
+  BSON,
+  INTERVAL
 }
diff --git a/parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveType.java b/parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveType.java
index 08adfbe..6a7382e 100644
--- a/parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveType.java
+++ b/parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveType.java
@@ -21,6 +21,8 @@ package org.apache.parquet.schema;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Locale;
+import java.util.Objects;
+import java.util.Optional;
 
 import org.apache.parquet.Preconditions;
 import org.apache.parquet.ShouldNeverHappenException;
@@ -31,6 +33,11 @@ import org.apache.parquet.io.api.PrimitiveConverter;
 import org.apache.parquet.io.api.RecordConsumer;
 import org.apache.parquet.schema.ColumnOrder.ColumnOrderName;
 
+import static java.util.Optional.empty;
+import static java.util.Optional.of;
+import static org.apache.parquet.schema.LogicalTypeAnnotation.TimeUnit.MICROS;
+import static org.apache.parquet.schema.LogicalTypeAnnotation.TimeUnit.MILLIS;
+
 
 /**
  * Representation of a Primitive type
@@ -85,23 +92,32 @@ public final class PrimitiveType extends Type {
       }
 
       @Override
-      PrimitiveComparator<?> comparator(OriginalType logicalType) {
+      PrimitiveComparator<?> comparator(LogicalTypeAnnotation logicalType) {
         if (logicalType == null) {
           return PrimitiveComparator.SIGNED_INT64_COMPARATOR;
         }
-        switch (logicalType) {
-        case UINT_64:
-          return PrimitiveComparator.UNSIGNED_INT64_COMPARATOR;
-        case INT_64:
-        case DECIMAL:
-        case TIME_MICROS:
-        case TIMESTAMP_MILLIS:
-        case TIMESTAMP_MICROS:
-          return PrimitiveComparator.SIGNED_INT64_COMPARATOR;
-        default:
-          throw new ShouldNeverHappenException(
-              "No comparator logic implemented for INT64 logical type: " + logicalType);
-        }
+        return logicalType.accept(new LogicalTypeAnnotation.LogicalTypeAnnotationVisitor<PrimitiveComparator>() {
+          @Override
+          public Optional<PrimitiveComparator> visit(LogicalTypeAnnotation.IntLogicalTypeAnnotation intLogicalType) {
+            return intLogicalType.isSigned() ?
+              of(PrimitiveComparator.SIGNED_INT64_COMPARATOR) : of(PrimitiveComparator.UNSIGNED_INT64_COMPARATOR);
+          }
+
+          @Override
+          public Optional<PrimitiveComparator> visit(LogicalTypeAnnotation.DecimalLogicalTypeAnnotation decimalLogicalType) {
+            return of(PrimitiveComparator.SIGNED_INT64_COMPARATOR);
+          }
+
+          @Override
+          public Optional<PrimitiveComparator> visit(LogicalTypeAnnotation.TimeLogicalTypeAnnotation timeLogicalType) {
+            return of(PrimitiveComparator.SIGNED_INT64_COMPARATOR);
+          }
+
+          @Override
+          public Optional<PrimitiveComparator> visit(LogicalTypeAnnotation.TimestampLogicalTypeAnnotation timestampLogicalType) {
+            return of(PrimitiveComparator.SIGNED_INT64_COMPARATOR);
+          }
+        }).orElseThrow(() -> new ShouldNeverHappenException("No comparator logic implemented for INT64 logical type: " + logicalType));
       }
     },
     INT32("getInteger", Integer.TYPE) {
@@ -128,26 +144,39 @@ public final class PrimitiveType extends Type {
       }
 
       @Override
-      PrimitiveComparator<?> comparator(OriginalType logicalType) {
+      PrimitiveComparator<?> comparator(LogicalTypeAnnotation logicalType) {
         if (logicalType == null) {
           return PrimitiveComparator.SIGNED_INT32_COMPARATOR;
         }
-        switch (logicalType) {
-        case UINT_8:
-        case UINT_16:
-        case UINT_32:
-          return PrimitiveComparator.UNSIGNED_INT32_COMPARATOR;
-        case INT_8:
-        case INT_16:
-        case INT_32:
-        case DECIMAL:
-        case DATE:
-        case TIME_MILLIS:
-          return PrimitiveComparator.SIGNED_INT32_COMPARATOR;
-        default:
-          throw new ShouldNeverHappenException(
-              "No comparator logic implemented for INT32 logical type: " + logicalType);
-        }
+        return logicalType.accept(new LogicalTypeAnnotation.LogicalTypeAnnotationVisitor<PrimitiveComparator>() {
+          @Override
+          public Optional<PrimitiveComparator> visit(LogicalTypeAnnotation.IntLogicalTypeAnnotation intLogicalType) {
+            if (intLogicalType.getBitWidth() == 64) {
+              return empty();
+            }
+            return intLogicalType.isSigned() ?
+              of(PrimitiveComparator.SIGNED_INT32_COMPARATOR) : of(PrimitiveComparator.UNSIGNED_INT32_COMPARATOR);
+          }
+
+          @Override
+          public Optional<PrimitiveComparator> visit(LogicalTypeAnnotation.DecimalLogicalTypeAnnotation decimalLogicalType) {
+            return of(PrimitiveComparator.SIGNED_INT32_COMPARATOR);
+          }
+
+          @Override
+          public Optional<PrimitiveComparator> visit(LogicalTypeAnnotation.DateLogicalTypeAnnotation dateLogicalType) {
+            return of(PrimitiveComparator.SIGNED_INT32_COMPARATOR);
+          }
+
+          @Override
+          public Optional<PrimitiveComparator> visit(LogicalTypeAnnotation.TimeLogicalTypeAnnotation timeLogicalType) {
+            if (timeLogicalType.getUnit() == MILLIS) {
+              return of(PrimitiveComparator.SIGNED_INT32_COMPARATOR);
+            }
+            return empty();
+          }
+        }).orElseThrow(
+          () -> new ShouldNeverHappenException("No comparator logic implemented for INT32 logical type: " + logicalType));
       }
     },
     BOOLEAN("getBoolean", Boolean.TYPE) {
@@ -174,7 +203,7 @@ public final class PrimitiveType extends Type {
       }
 
       @Override
-      PrimitiveComparator<?> comparator(OriginalType logicalType) {
+      PrimitiveComparator<?> comparator(LogicalTypeAnnotation logicalType) {
         return PrimitiveComparator.BOOLEAN_COMPARATOR;
       }
     },
@@ -202,22 +231,36 @@ public final class PrimitiveType extends Type {
       }
 
       @Override
-      PrimitiveComparator<?> comparator(OriginalType logicalType) {
+      PrimitiveComparator<?> comparator(LogicalTypeAnnotation logicalType) {
         if (logicalType == null) {
           return PrimitiveComparator.UNSIGNED_LEXICOGRAPHICAL_BINARY_COMPARATOR;
         }
-        switch (logicalType) {
-        case DECIMAL:
-          return PrimitiveComparator.BINARY_AS_SIGNED_INTEGER_COMPARATOR;
-        case UTF8:
-        case ENUM:
-        case JSON:
-        case BSON:
-          return PrimitiveComparator.UNSIGNED_LEXICOGRAPHICAL_BINARY_COMPARATOR;
-        default:
-          throw new ShouldNeverHappenException(
-              "No comparator logic implemented for BINARY logical type: " + logicalType);
-        }
+        return logicalType.accept(new LogicalTypeAnnotation.LogicalTypeAnnotationVisitor<PrimitiveComparator>() {
+          @Override
+          public Optional<PrimitiveComparator> visit(LogicalTypeAnnotation.DecimalLogicalTypeAnnotation decimalLogicalType) {
+            return of(PrimitiveComparator.BINARY_AS_SIGNED_INTEGER_COMPARATOR);
+          }
+
+          @Override
+          public Optional<PrimitiveComparator> visit(LogicalTypeAnnotation.StringLogicalTypeAnnotation stringLogicalType) {
+            return of(PrimitiveComparator.UNSIGNED_LEXICOGRAPHICAL_BINARY_COMPARATOR);
+          }
+
+          @Override
+          public Optional<PrimitiveComparator> visit(LogicalTypeAnnotation.EnumLogicalTypeAnnotation enumLogicalType) {
+            return of(PrimitiveComparator.UNSIGNED_LEXICOGRAPHICAL_BINARY_COMPARATOR);
+          }
+
+          @Override
+          public Optional<PrimitiveComparator> visit(LogicalTypeAnnotation.JsonLogicalTypeAnnotation jsonLogicalType) {
+            return of(PrimitiveComparator.UNSIGNED_LEXICOGRAPHICAL_BINARY_COMPARATOR);
+          }
+
+          @Override
+          public Optional<PrimitiveComparator> visit(LogicalTypeAnnotation.BsonLogicalTypeAnnotation bsonLogicalType) {
+            return of(PrimitiveComparator.UNSIGNED_LEXICOGRAPHICAL_BINARY_COMPARATOR);
+          }
+        }).orElseThrow(() -> new ShouldNeverHappenException("No comparator logic implemented for BINARY logical type: " + logicalType));
       }
     },
     FLOAT("getFloat", Float.TYPE) {
@@ -244,7 +287,7 @@ public final class PrimitiveType extends Type {
       }
 
       @Override
-      PrimitiveComparator<?> comparator(OriginalType logicalType) {
+      PrimitiveComparator<?> comparator(LogicalTypeAnnotation logicalType) {
         return PrimitiveComparator.FLOAT_COMPARATOR;
       }
     },
@@ -272,7 +315,7 @@ public final class PrimitiveType extends Type {
       }
 
       @Override
-      PrimitiveComparator<?> comparator(OriginalType logicalType) {
+      PrimitiveComparator<?> comparator(LogicalTypeAnnotation logicalType) {
         return PrimitiveComparator.DOUBLE_COMPARATOR;
       }
     },
@@ -298,7 +341,7 @@ public final class PrimitiveType extends Type {
       }
 
       @Override
-      PrimitiveComparator<?> comparator(OriginalType logicalType) {
+      PrimitiveComparator<?> comparator(LogicalTypeAnnotation logicalType) {
         return PrimitiveComparator.BINARY_AS_SIGNED_INTEGER_COMPARATOR;
       }
     },
@@ -326,19 +369,23 @@ public final class PrimitiveType extends Type {
       }
 
       @Override
-      PrimitiveComparator<?> comparator(OriginalType logicalType) {
+      PrimitiveComparator<?> comparator(LogicalTypeAnnotation logicalType) {
         if (logicalType == null) {
           return PrimitiveComparator.UNSIGNED_LEXICOGRAPHICAL_BINARY_COMPARATOR;
         }
-        switch (logicalType) {
-        case DECIMAL:
-          return PrimitiveComparator.BINARY_AS_SIGNED_INTEGER_COMPARATOR;
-        case INTERVAL:
-          return PrimitiveComparator.UNSIGNED_LEXICOGRAPHICAL_BINARY_COMPARATOR;
-        default:
-          throw new ShouldNeverHappenException(
-              "No comparator logic implemented for FIXED_LEN_BYTE_ARRAY logical type: " + logicalType);
-        }
+
+        return logicalType.accept(new LogicalTypeAnnotation.LogicalTypeAnnotationVisitor<PrimitiveComparator>() {
+          @Override
+          public Optional<PrimitiveComparator> visit(LogicalTypeAnnotation.DecimalLogicalTypeAnnotation decimalLogicalType) {
+            return of(PrimitiveComparator.BINARY_AS_SIGNED_INTEGER_COMPARATOR);
+          }
+
+          @Override
+          public Optional<PrimitiveComparator> visit(LogicalTypeAnnotation.IntervalLogicalTypeAnnotation intervalLogicalType) {
+            return of(PrimitiveComparator.UNSIGNED_LEXICOGRAPHICAL_BINARY_COMPARATOR);
+          }
+        }).orElseThrow(() -> new ShouldNeverHappenException(
+          "No comparator logic implemented for FIXED_LEN_BYTE_ARRAY logical type: " + logicalType));
       }
     };
 
@@ -370,7 +417,7 @@ public final class PrimitiveType extends Type {
 
     abstract public <T, E extends Exception> T convert(PrimitiveTypeNameConverter<T, E> converter) throws E;
 
-    abstract PrimitiveComparator<?> comparator(OriginalType logicalType);
+    abstract PrimitiveComparator<?> comparator(LogicalTypeAnnotation logicalType);
   }
 
   private final PrimitiveTypeName primitive;
@@ -474,7 +521,7 @@ public final class PrimitiveType extends Type {
     super(name, repetition, logicalTypeAnnotation, id);
     this.primitive = primitive;
     this.length = length;
-    if (getOriginalType() == OriginalType.DECIMAL) {
+    if (logicalTypeAnnotation instanceof LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) {
       LogicalTypeAnnotation.DecimalLogicalTypeAnnotation decimal = (LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) logicalTypeAnnotation;
       this.decimalMeta = new DecimalMetadata(decimal.getPrecision(), decimal.getScale());
     } else {
@@ -482,7 +529,7 @@ public final class PrimitiveType extends Type {
     }
 
     if (columnOrder == null) {
-      columnOrder = primitive == PrimitiveTypeName.INT96 || getOriginalType() == OriginalType.INTERVAL
+      columnOrder = primitive == PrimitiveTypeName.INT96 || logicalTypeAnnotation instanceof LogicalTypeAnnotation.IntervalLogicalTypeAnnotation
         ? ColumnOrder.undefined()
         : ColumnOrder.typeDefined();
     }
@@ -494,35 +541,9 @@ public final class PrimitiveType extends Type {
       Preconditions.checkArgument(columnOrder.getColumnOrderName() == ColumnOrderName.UNDEFINED,
           "The column order {} is not supported by INT96", columnOrder);
     }
-    if (getOriginalType() != null) {
-      // Explicitly listing all the logical types to avoid having unsupported column orders new types accidentally
-      switch (getOriginalType()) {
-        case INT_8:
-        case INT_16:
-        case INT_32:
-        case INT_64:
-        case UINT_8:
-        case UINT_16:
-        case UINT_32:
-        case UINT_64:
-        case UTF8:
-        case DECIMAL:
-        case DATE:
-        case TIME_MILLIS:
-        case TIME_MICROS:
-        case TIMESTAMP_MILLIS:
-        case TIMESTAMP_MICROS:
-        case ENUM:
-        case JSON:
-        case BSON:
-          // Currently any available column order is valid
-          break;
-        case INTERVAL:
-        default:
-          Preconditions.checkArgument(columnOrder.getColumnOrderName() == ColumnOrderName.UNDEFINED,
-              "The column order {} is not supported by {} ({})", columnOrder, primitive, getOriginalType());
-          break;
-      }
+    if (getLogicalTypeAnnotation() != null) {
+      Preconditions.checkArgument(getLogicalTypeAnnotation().isValidColumnOrder(columnOrder),
+        "The column order {} is not supported by {} ({})", columnOrder, primitive, getLogicalTypeAnnotation());
     }
     return columnOrder;
   }
@@ -533,7 +554,7 @@ public final class PrimitiveType extends Type {
    */
   @Override
   public PrimitiveType withId(int id) {
-    return new PrimitiveType(getRepetition(), primitive, length, getName(), getOriginalType(), decimalMeta, new ID(id),
+    return new PrimitiveType(getRepetition(), primitive, length, getName(), getLogicalTypeAnnotation(), new ID(id),
         columnOrder);
   }
 
@@ -712,7 +733,7 @@ public final class PrimitiveType extends Type {
     if (strict) {
       // Can't merge primitive fields of different type names or different original types
       if (!primitive.equals(toMerge.asPrimitiveType().getPrimitiveTypeName()) ||
-          getOriginalType() != toMerge.getOriginalType()) {
+        !Objects.equals(getLogicalTypeAnnotation(), toMerge.getLogicalTypeAnnotation())) {
         reportSchemaMergeError(toMerge);
       }
 
@@ -734,7 +755,7 @@ public final class PrimitiveType extends Type {
       builder.length(length);
     }
 
-    return builder.as(getOriginalType()).named(getName());
+    return builder.as(getLogicalTypeAnnotation()).named(getName());
   }
 
   /**
@@ -747,7 +768,7 @@ public final class PrimitiveType extends Type {
    */
   @SuppressWarnings("unchecked")
   public <T> PrimitiveComparator<T> comparator() {
-    return (PrimitiveComparator<T>) getPrimitiveTypeName().comparator(getOriginalType());
+    return (PrimitiveComparator<T>) getPrimitiveTypeName().comparator(getLogicalTypeAnnotation());
   }
 
   /**
@@ -762,7 +783,7 @@ public final class PrimitiveType extends Type {
    */
   @SuppressWarnings("unchecked")
   public PrimitiveStringifier stringifier() {
-    OriginalType originalType = getOriginalType();
-    return originalType == null ? PrimitiveStringifier.DEFAULT_STRINGIFIER : originalType.stringifier(this);
+    LogicalTypeAnnotation logicalTypeAnnotation = getLogicalTypeAnnotation();
+    return logicalTypeAnnotation == null ? PrimitiveStringifier.DEFAULT_STRINGIFIER : logicalTypeAnnotation.valueStringifier(this);
   }
 }
diff --git a/parquet-column/src/main/java/org/apache/parquet/schema/Types.java b/parquet-column/src/main/java/org/apache/parquet/schema/Types.java
index 165a5ac..378d665 100644
--- a/parquet-column/src/main/java/org/apache/parquet/schema/Types.java
+++ b/parquet-column/src/main/java/org/apache/parquet/schema/Types.java
@@ -21,6 +21,7 @@ package org.apache.parquet.schema;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.Optional;
 
 import org.apache.parquet.Preconditions;
 import org.apache.parquet.schema.ColumnOrder.ColumnOrderName;
@@ -441,16 +442,27 @@ public class Types {
 
       // validate type annotations and required metadata
       if (logicalTypeAnnotation != null) {
-        OriginalType originalType = logicalTypeAnnotation.toOriginalType();
-        switch (originalType) {
-          case UTF8:
-          case JSON:
-          case BSON:
-            Preconditions.checkState(
-                primitiveType == PrimitiveTypeName.BINARY,
-                originalType.toString() + " can only annotate binary fields");
-            break;
-          case DECIMAL:
+        logicalTypeAnnotation.accept(new LogicalTypeAnnotation.LogicalTypeAnnotationVisitor<Boolean>() {
+          @Override
+          public Optional<Boolean> visit(LogicalTypeAnnotation.StringLogicalTypeAnnotation stringLogicalType) {
+            checkBinaryPrimitiveType(stringLogicalType);
+            return Optional.of(true);
+          }
+
+          @Override
+          public Optional<Boolean> visit(LogicalTypeAnnotation.JsonLogicalTypeAnnotation jsonLogicalType) {
+            checkBinaryPrimitiveType(jsonLogicalType);
+            return Optional.of(true);
+          }
+
+          @Override
+          public Optional<Boolean> visit(LogicalTypeAnnotation.BsonLogicalTypeAnnotation bsonLogicalType) {
+            checkBinaryPrimitiveType(bsonLogicalType);
+            return Optional.of(true);
+          }
+
+          @Override
+          public Optional<Boolean> visit(LogicalTypeAnnotation.DecimalLogicalTypeAnnotation decimalLogicalType) {
             Preconditions.checkState(
                 (primitiveType == PrimitiveTypeName.INT32) ||
                 (primitiveType == PrimitiveTypeName.INT64) ||
@@ -478,40 +490,88 @@ public class Types {
                   "FIXED(" + length + ") cannot store " + meta.getPrecision() +
                   " digits (max " + maxPrecision(length) + ")");
             }
-            break;
-          case DATE:
-          case TIME_MILLIS:
-          case UINT_8:
-          case UINT_16:
-          case UINT_32:
-          case INT_8:
-          case INT_16:
-          case INT_32:
-            Preconditions.checkState(primitiveType == PrimitiveTypeName.INT32,
-                originalType.toString() + " can only annotate INT32");
-            break;
-          case TIME_MICROS:
-          case TIMESTAMP_MILLIS:
-          case TIMESTAMP_MICROS:
-          case UINT_64:
-          case INT_64:
-            Preconditions.checkState(primitiveType == PrimitiveTypeName.INT64,
-                originalType.toString() + " can only annotate INT64");
-            break;
-          case INTERVAL:
+            return Optional.of(true);
+          }
+
+          @Override
+          public Optional<Boolean> visit(LogicalTypeAnnotation.DateLogicalTypeAnnotation dateLogicalType) {
+            checkInt32PrimitiveType(dateLogicalType);
+            return Optional.of(true);
+          }
+
+          @Override
+          public Optional<Boolean> visit(LogicalTypeAnnotation.TimeLogicalTypeAnnotation timeLogicalType) {
+            LogicalTypeAnnotation.TimeUnit unit = timeLogicalType.getUnit();
+            switch (unit) {
+              case MILLIS:
+                checkInt32PrimitiveType(timeLogicalType);
+                break;
+              case MICROS:
+                checkInt64PrimitiveType(timeLogicalType);
+                break;
+              default:
+                throw new RuntimeException("Invalid time unit: " + unit);
+            }
+            return Optional.of(true);
+          }
+
+          @Override
+          public Optional<Boolean> visit(LogicalTypeAnnotation.IntLogicalTypeAnnotation intLogicalType) {
+            int bitWidth = intLogicalType.getBitWidth();
+            switch (bitWidth) {
+              case 8:
+              case 16:
+              case 32:
+                checkInt32PrimitiveType(intLogicalType);
+                break;
+              case 64:
+                checkInt64PrimitiveType(intLogicalType);
+                break;
+              default:
+                throw new RuntimeException("Invalid bit width: " + bitWidth);
+            }
+            return Optional.of(true);
+          }
+
+          @Override
+          public Optional<Boolean> visit(LogicalTypeAnnotation.TimestampLogicalTypeAnnotation timestampLogicalType) {
+            checkInt64PrimitiveType(timestampLogicalType);
+            return Optional.of(true);
+          }
+
+          @Override
+          public Optional<Boolean> visit(LogicalTypeAnnotation.IntervalLogicalTypeAnnotation intervalLogicalType) {
             Preconditions.checkState(
                 (primitiveType == PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY) &&
                 (length == 12),
                 "INTERVAL can only annotate FIXED_LEN_BYTE_ARRAY(12)");
-            break;
-          case ENUM:
+            return Optional.of(true);
+          }
+
+          @Override
+          public Optional<Boolean> visit(LogicalTypeAnnotation.EnumLogicalTypeAnnotation enumLogicalType) {
             Preconditions.checkState(
                 primitiveType == PrimitiveTypeName.BINARY,
                 "ENUM can only annotate binary fields");
-            break;
-          default:
-            throw new IllegalStateException(originalType + " can not be applied to a primitive type");
-        }
+            return Optional.of(true);
+          }
+
+          private void checkBinaryPrimitiveType(LogicalTypeAnnotation logicalTypeAnnotation) {
+            Preconditions.checkState(
+                primitiveType == PrimitiveTypeName.BINARY,
+              logicalTypeAnnotation.toString() + " can only annotate binary fields");
+          }
+
+          private void checkInt32PrimitiveType(LogicalTypeAnnotation logicalTypeAnnotation) {
+            Preconditions.checkState(primitiveType == PrimitiveTypeName.INT32,
+              logicalTypeAnnotation.toString() + " can only annotate INT32");
+          }
+
+          private void checkInt64PrimitiveType(LogicalTypeAnnotation logicalTypeAnnotation) {
+            Preconditions.checkState(primitiveType == PrimitiveTypeName.INT64,
+              logicalTypeAnnotation.toString() + " can only annotate INT64");
+          }
+        }).orElseThrow(() -> new IllegalStateException(logicalTypeAnnotation + " can not be applied to a primitive type"));
       }
 
       if (newLogicalTypeSet) {
@@ -531,7 +591,7 @@ public class Types {
 
     protected DecimalMetadata decimalMetadata() {
       DecimalMetadata meta = null;
-      if (OriginalType.DECIMAL == getOriginalType()) {
+      if (logicalTypeAnnotation instanceof LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) {
         LogicalTypeAnnotation.DecimalLogicalTypeAnnotation decimalType = (LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) logicalTypeAnnotation;
         if (newLogicalTypeSet) {
           if (scaleAlreadySet) {
diff --git a/parquet-column/src/test/java/org/apache/parquet/filter2/predicate/TestValidTypeMap.java b/parquet-column/src/test/java/org/apache/parquet/filter2/predicate/TestValidTypeMap.java
index d441369..6e19dca 100644
--- a/parquet-column/src/test/java/org/apache/parquet/filter2/predicate/TestValidTypeMap.java
+++ b/parquet-column/src/test/java/org/apache/parquet/filter2/predicate/TestValidTypeMap.java
@@ -1,4 +1,4 @@
-/* 
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -28,7 +28,6 @@ import org.apache.parquet.filter2.predicate.Operators.DoubleColumn;
 import org.apache.parquet.filter2.predicate.Operators.FloatColumn;
 import org.apache.parquet.filter2.predicate.Operators.IntColumn;
 import org.apache.parquet.filter2.predicate.Operators.LongColumn;
-import org.apache.parquet.schema.OriginalType;
 import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
 
 import static org.junit.Assert.assertEquals;
diff --git a/parquet-column/src/test/java/org/apache/parquet/schema/TestMessageType.java b/parquet-column/src/test/java/org/apache/parquet/schema/TestMessageType.java
index 0561938..e511d42 100644
--- a/parquet-column/src/test/java/org/apache/parquet/schema/TestMessageType.java
+++ b/parquet-column/src/test/java/org/apache/parquet/schema/TestMessageType.java
@@ -148,7 +148,7 @@ public class TestMessageType {
       t9.union(t10);
       fail("moving from BINARY (UTF8) to BINARY");
     } catch (IncompatibleSchemaModificationException e) {
-      assertEquals("cannot merge original type null into UTF8", e.getMessage());
+      assertEquals("cannot merge logical type null into STRING", e.getMessage());
     }
 
     MessageType t11 = Types.buildMessage()
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java
index 1442910..9478e94 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java
@@ -18,6 +18,7 @@
  */
 package org.apache.parquet.format.converter;
 
+import static java.util.Optional.of;
 import static org.apache.parquet.format.Util.readFileMetaData;
 import static org.apache.parquet.format.Util.writePageHeader;
 
@@ -264,161 +265,161 @@ public class ParquetMetadataConverter {
 
   private static class ConvertedTypeConverterVisitor implements LogicalTypeAnnotation.LogicalTypeAnnotationVisitor<ConvertedType> {
     @Override
-    public Optional<ConvertedType> visit(LogicalTypeAnnotation.StringLogicalTypeAnnotation logicalTypeAnnotation) {
-      return Optional.of(ConvertedType.UTF8);
+    public Optional<ConvertedType> visit(LogicalTypeAnnotation.StringLogicalTypeAnnotation stringLogicalType) {
+      return of(ConvertedType.UTF8);
     }
 
     @Override
-    public Optional<ConvertedType> visit(LogicalTypeAnnotation.MapLogicalTypeAnnotation logicalTypeAnnotation) {
-      return Optional.of(ConvertedType.MAP);
+    public Optional<ConvertedType> visit(LogicalTypeAnnotation.MapLogicalTypeAnnotation mapLogicalType) {
+      return of(ConvertedType.MAP);
     }
 
     @Override
-    public Optional<ConvertedType> visit(LogicalTypeAnnotation.ListLogicalTypeAnnotation logicalTypeAnnotation) {
-      return Optional.of(ConvertedType.LIST);
+    public Optional<ConvertedType> visit(LogicalTypeAnnotation.ListLogicalTypeAnnotation listLogicalType) {
+      return of(ConvertedType.LIST);
     }
 
     @Override
-    public Optional<ConvertedType> visit(LogicalTypeAnnotation.EnumLogicalTypeAnnotation logicalTypeAnnotation) {
-      return Optional.of(ConvertedType.ENUM);
+    public Optional<ConvertedType> visit(LogicalTypeAnnotation.EnumLogicalTypeAnnotation enumLogicalType) {
+      return of(ConvertedType.ENUM);
     }
 
     @Override
-    public Optional<ConvertedType> visit(LogicalTypeAnnotation.DecimalLogicalTypeAnnotation logicalTypeAnnotation) {
-      return Optional.of(ConvertedType.DECIMAL);
+    public Optional<ConvertedType> visit(LogicalTypeAnnotation.DecimalLogicalTypeAnnotation decimalLogicalType) {
+      return of(ConvertedType.DECIMAL);
     }
 
     @Override
-    public Optional<ConvertedType> visit(LogicalTypeAnnotation.DateLogicalTypeAnnotation logicalTypeAnnotation) {
-      return Optional.of(ConvertedType.DATE);
+    public Optional<ConvertedType> visit(LogicalTypeAnnotation.DateLogicalTypeAnnotation dateLogicalType) {
+      return of(ConvertedType.DATE);
     }
 
     @Override
-    public Optional<ConvertedType> visit(LogicalTypeAnnotation.TimeLogicalTypeAnnotation logicalTypeAnnotation) {
-      switch (logicalTypeAnnotation.getUnit()) {
+    public Optional<ConvertedType> visit(LogicalTypeAnnotation.TimeLogicalTypeAnnotation timeLogicalType) {
+      switch (timeLogicalType.getUnit()) {
         case MILLIS:
-          return Optional.of(ConvertedType.TIME_MILLIS);
+          return of(ConvertedType.TIME_MILLIS);
         case MICROS:
-          return Optional.of(ConvertedType.TIME_MICROS);
+          return of(ConvertedType.TIME_MICROS);
         default:
-          throw new RuntimeException("Unknown converted type for " + logicalTypeAnnotation.toOriginalType());
+          throw new RuntimeException("Unknown converted type for " + timeLogicalType.toOriginalType());
       }
     }
 
     @Override
-    public Optional<ConvertedType> visit(LogicalTypeAnnotation.TimestampLogicalTypeAnnotation logicalTypeAnnotation) {
-      switch (logicalTypeAnnotation.getUnit()) {
+    public Optional<ConvertedType> visit(LogicalTypeAnnotation.TimestampLogicalTypeAnnotation timestampLogicalType) {
+      switch (timestampLogicalType.getUnit()) {
         case MICROS:
-          return Optional.of(ConvertedType.TIMESTAMP_MICROS);
+          return of(ConvertedType.TIMESTAMP_MICROS);
         case MILLIS:
-          return Optional.of(ConvertedType.TIMESTAMP_MILLIS);
+          return of(ConvertedType.TIMESTAMP_MILLIS);
         default:
-          throw new RuntimeException("Unknown converted type for " + logicalTypeAnnotation.toOriginalType());
+          throw new RuntimeException("Unknown converted type for " + timestampLogicalType.toOriginalType());
       }
     }
 
     @Override
-    public Optional<ConvertedType> visit(LogicalTypeAnnotation.IntLogicalTypeAnnotation logicalTypeAnnotation) {
-      boolean signed = logicalTypeAnnotation.isSigned();
-      switch (logicalTypeAnnotation.getBitWidth()) {
+    public Optional<ConvertedType> visit(LogicalTypeAnnotation.IntLogicalTypeAnnotation intLogicalType) {
+      boolean signed = intLogicalType.isSigned();
+      switch (intLogicalType.getBitWidth()) {
         case 8:
-          return Optional.of(signed ? ConvertedType.INT_8 : ConvertedType.UINT_8);
+          return of(signed ? ConvertedType.INT_8 : ConvertedType.UINT_8);
         case 16:
-          return Optional.of(signed ? ConvertedType.INT_16 : ConvertedType.UINT_16);
+          return of(signed ? ConvertedType.INT_16 : ConvertedType.UINT_16);
         case 32:
-          return Optional.of(signed ? ConvertedType.INT_32 : ConvertedType.UINT_32);
+          return of(signed ? ConvertedType.INT_32 : ConvertedType.UINT_32);
         case 64:
-          return Optional.of(signed ? ConvertedType.INT_64 : ConvertedType.UINT_64);
+          return of(signed ? ConvertedType.INT_64 : ConvertedType.UINT_64);
         default:
-          throw new RuntimeException("Unknown original type " + logicalTypeAnnotation.toOriginalType());
+          throw new RuntimeException("Unknown original type " + intLogicalType.toOriginalType());
       }
     }
 
     @Override
-    public Optional<ConvertedType> visit(LogicalTypeAnnotation.JsonLogicalTypeAnnotation logicalTypeAnnotation) {
-      return Optional.of(ConvertedType.JSON);
+    public Optional<ConvertedType> visit(LogicalTypeAnnotation.JsonLogicalTypeAnnotation jsonLogicalType) {
+      return of(ConvertedType.JSON);
     }
 
     @Override
-    public Optional<ConvertedType> visit(LogicalTypeAnnotation.BsonLogicalTypeAnnotation logicalTypeAnnotation) {
-      return Optional.of(ConvertedType.BSON);
+    public Optional<ConvertedType> visit(LogicalTypeAnnotation.BsonLogicalTypeAnnotation bsonLogicalType) {
+      return of(ConvertedType.BSON);
     }
 
     @Override
-    public Optional<ConvertedType> visit(LogicalTypeAnnotation.IntervalLogicalTypeAnnotation logicalTypeAnnotation) {
-      return Optional.of(ConvertedType.INTERVAL);
+    public Optional<ConvertedType> visit(LogicalTypeAnnotation.IntervalLogicalTypeAnnotation intervalLogicalType) {
+      return of(ConvertedType.INTERVAL);
     }
 
     @Override
-    public Optional<ConvertedType> visit(LogicalTypeAnnotation.MapKeyValueTypeAnnotation logicalTypeAnnotation) {
-      return Optional.of(ConvertedType.MAP_KEY_VALUE);
+    public Optional<ConvertedType> visit(LogicalTypeAnnotation.MapKeyValueTypeAnnotation mapKeyValueLogicalType) {
+      return of(ConvertedType.MAP_KEY_VALUE);
     }
   }
 
   private static class LogicalTypeConverterVisitor implements LogicalTypeAnnotation.LogicalTypeAnnotationVisitor<LogicalType> {
     @Override
-    public Optional<LogicalType> visit(LogicalTypeAnnotation.StringLogicalTypeAnnotation logicalTypeAnnotation) {
-      return Optional.of(LogicalType.STRING(new StringType()));
+    public Optional<LogicalType> visit(LogicalTypeAnnotation.StringLogicalTypeAnnotation stringLogicalType) {
+      return of(LogicalType.STRING(new StringType()));
     }
 
     @Override
-    public Optional<LogicalType> visit(LogicalTypeAnnotation.MapLogicalTypeAnnotation logicalTypeAnnotation) {
-      return Optional.of(LogicalType.MAP(new MapType()));
+    public Optional<LogicalType> visit(LogicalTypeAnnotation.MapLogicalTypeAnnotation mapLogicalType) {
+      return of(LogicalType.MAP(new MapType()));
     }
 
     @Override
-    public Optional<LogicalType> visit(LogicalTypeAnnotation.ListLogicalTypeAnnotation logicalTypeAnnotation) {
-      return Optional.of(LogicalType.LIST(new ListType()));
+    public Optional<LogicalType> visit(LogicalTypeAnnotation.ListLogicalTypeAnnotation listLogicalType) {
+      return of(LogicalType.LIST(new ListType()));
     }
 
     @Override
-    public Optional<LogicalType> visit(LogicalTypeAnnotation.EnumLogicalTypeAnnotation logicalTypeAnnotation) {
-      return Optional.of(LogicalType.ENUM(new EnumType()));
+    public Optional<LogicalType> visit(LogicalTypeAnnotation.EnumLogicalTypeAnnotation enumLogicalType) {
+      return of(LogicalType.ENUM(new EnumType()));
     }
 
     @Override
-    public Optional<LogicalType> visit(LogicalTypeAnnotation.DecimalLogicalTypeAnnotation logicalTypeAnnotation) {
-      return Optional.of(LogicalType.DECIMAL(new DecimalType(logicalTypeAnnotation.getScale(), logicalTypeAnnotation.getPrecision())));
+    public Optional<LogicalType> visit(LogicalTypeAnnotation.DecimalLogicalTypeAnnotation decimalLogicalType) {
+      return of(LogicalType.DECIMAL(new DecimalType(decimalLogicalType.getScale(), decimalLogicalType.getPrecision())));
     }
 
     @Override
-    public Optional<LogicalType> visit(LogicalTypeAnnotation.DateLogicalTypeAnnotation logicalTypeAnnotation) {
-      return Optional.of(LogicalType.DATE(new DateType()));
+    public Optional<LogicalType> visit(LogicalTypeAnnotation.DateLogicalTypeAnnotation dateLogicalType) {
+      return of(LogicalType.DATE(new DateType()));
     }
 
     @Override
-    public Optional<LogicalType> visit(LogicalTypeAnnotation.TimeLogicalTypeAnnotation logicalTypeAnnotation) {
-      return Optional.of(LogicalType.TIME(new TimeType(logicalTypeAnnotation.isAdjustedToUTC(), convertUnit(logicalTypeAnnotation.getUnit()))));
+    public Optional<LogicalType> visit(LogicalTypeAnnotation.TimeLogicalTypeAnnotation timeLogicalType) {
+      return of(LogicalType.TIME(new TimeType(timeLogicalType.isAdjustedToUTC(), convertUnit(timeLogicalType.getUnit()))));
     }
 
     @Override
-    public Optional<LogicalType> visit(LogicalTypeAnnotation.TimestampLogicalTypeAnnotation logicalTypeAnnotation) {
-      return Optional.of(LogicalType.TIMESTAMP(new TimestampType(logicalTypeAnnotation.isAdjustedToUTC(), convertUnit(logicalTypeAnnotation.getUnit()))));
+    public Optional<LogicalType> visit(LogicalTypeAnnotation.TimestampLogicalTypeAnnotation timestampLogicalType) {
+      return of(LogicalType.TIMESTAMP(new TimestampType(timestampLogicalType.isAdjustedToUTC(), convertUnit(timestampLogicalType.getUnit()))));
     }
 
     @Override
-    public Optional<LogicalType> visit(LogicalTypeAnnotation.IntLogicalTypeAnnotation logicalTypeAnnotation) {
-      return Optional.of(LogicalType.INTEGER(new IntType((byte) logicalTypeAnnotation.getBitWidth(), logicalTypeAnnotation.isSigned())));
+    public Optional<LogicalType> visit(LogicalTypeAnnotation.IntLogicalTypeAnnotation intLogicalType) {
+      return of(LogicalType.INTEGER(new IntType((byte) intLogicalType.getBitWidth(), intLogicalType.isSigned())));
     }
 
     @Override
-    public Optional<LogicalType> visit(LogicalTypeAnnotation.JsonLogicalTypeAnnotation logicalTypeAnnotation) {
-      return Optional.of(LogicalType.JSON(new JsonType()));
+    public Optional<LogicalType> visit(LogicalTypeAnnotation.JsonLogicalTypeAnnotation jsonLogicalType) {
+      return of(LogicalType.JSON(new JsonType()));
     }
 
     @Override
-    public Optional<LogicalType> visit(LogicalTypeAnnotation.BsonLogicalTypeAnnotation logicalTypeAnnotation) {
-      return Optional.of(LogicalType.BSON(new BsonType()));
+    public Optional<LogicalType> visit(LogicalTypeAnnotation.BsonLogicalTypeAnnotation bsonLogicalType) {
+      return of(LogicalType.BSON(new BsonType()));
     }
 
     @Override
-    public Optional<LogicalType> visit(LogicalTypeAnnotation.IntervalLogicalTypeAnnotation logicalTypeAnnotation) {
-      return Optional.of(LogicalType.UNKNOWN(new NullType()));
+    public Optional<LogicalType> visit(LogicalTypeAnnotation.IntervalLogicalTypeAnnotation intervalLogicalType) {
+      return of(LogicalType.UNKNOWN(new NullType()));
     }
 
     @Override
-    public Optional<LogicalType> visit(LogicalTypeAnnotation.MapKeyValueTypeAnnotation logicalTypeAnnotation) {
-      return Optional.of(LogicalType.UNKNOWN(new NullType()));
+    public Optional<LogicalType> visit(LogicalTypeAnnotation.MapKeyValueTypeAnnotation mapKeyValueLogicalType) {
+      return of(LogicalType.UNKNOWN(new NullType()));
     }
   }
 
@@ -669,9 +670,11 @@ public class ParquetMetadataConverter {
     UNKNOWN
   }
 
-  private static final Set<OriginalType> STRING_TYPES = Collections
+  private static final Set<Class> STRING_TYPES = Collections
       .unmodifiableSet(new HashSet<>(Arrays.asList(
-          OriginalType.UTF8, OriginalType.ENUM, OriginalType.JSON
+        LogicalTypeAnnotation.StringLogicalTypeAnnotation.class,
+        LogicalTypeAnnotation.EnumLogicalTypeAnnotation.class,
+        LogicalTypeAnnotation.JsonLogicalTypeAnnotation.class
       )));
 
   /**
@@ -688,10 +691,10 @@ public class ParquetMetadataConverter {
     // even if the override is set, only return stats for string-ish types
     // a null type annotation is considered string-ish because some writers
     // failed to use the UTF8 annotation.
-    OriginalType annotation = type.getOriginalType();
+    LogicalTypeAnnotation annotation = type.getLogicalTypeAnnotation();
     return useSignedStringMinMax &&
         PrimitiveTypeName.BINARY == type.getPrimitiveTypeName() &&
-        (annotation == null || STRING_TYPES.contains(annotation));
+        (annotation == null || STRING_TYPES.contains(annotation.getClass()));
   }
 
   /**
@@ -718,36 +721,76 @@ public class ParquetMetadataConverter {
    * @return the "correct" sort order of the type that applications assume
    */
   private static SortOrder sortOrder(PrimitiveType primitive) {
-    OriginalType annotation = primitive.getOriginalType();
+    LogicalTypeAnnotation annotation = primitive.getLogicalTypeAnnotation();
     if (annotation != null) {
-      switch (annotation) {
-        case INT_8:
-        case INT_16:
-        case INT_32:
-        case INT_64:
-        case DATE:
-        case TIME_MICROS:
-        case TIME_MILLIS:
-        case TIMESTAMP_MICROS:
-        case TIMESTAMP_MILLIS:
-          return SortOrder.SIGNED;
-        case UINT_8:
-        case UINT_16:
-        case UINT_32:
-        case UINT_64:
-        case ENUM:
-        case UTF8:
-        case BSON:
-        case JSON:
-          return SortOrder.UNSIGNED;
-        case DECIMAL:
-        case LIST:
-        case MAP:
-        case MAP_KEY_VALUE:
-        case INTERVAL:
-          return SortOrder.UNKNOWN;
-      }
+      return annotation.accept(new LogicalTypeAnnotation.LogicalTypeAnnotationVisitor<SortOrder>() {
+        @Override
+        public Optional<SortOrder> visit(LogicalTypeAnnotation.IntLogicalTypeAnnotation intLogicalType) {
+          return intLogicalType.isSigned() ? of(SortOrder.SIGNED) : of(SortOrder.UNSIGNED);
+        }
+
+        @Override
+        public Optional<SortOrder> visit(LogicalTypeAnnotation.IntervalLogicalTypeAnnotation intervalLogicalType) {
+          return of(SortOrder.UNKNOWN);
+        }
+
+        @Override
+        public Optional<SortOrder> visit(LogicalTypeAnnotation.DateLogicalTypeAnnotation dateLogicalType) {
+          return of(SortOrder.SIGNED);
+        }
+
+        @Override
+        public Optional<SortOrder> visit(LogicalTypeAnnotation.EnumLogicalTypeAnnotation enumLogicalType) {
+          return of(SortOrder.UNSIGNED);
+        }
+
+        @Override
+        public Optional<SortOrder> visit(LogicalTypeAnnotation.BsonLogicalTypeAnnotation bsonLogicalType) {
+          return of(SortOrder.UNSIGNED);
+        }
+
+        @Override
+        public Optional<SortOrder> visit(LogicalTypeAnnotation.JsonLogicalTypeAnnotation jsonLogicalType) {
+          return of(SortOrder.UNSIGNED);
+        }
+
+        @Override
+        public Optional<SortOrder> visit(LogicalTypeAnnotation.StringLogicalTypeAnnotation stringLogicalType) {
+          return of(SortOrder.UNSIGNED);
+        }
+
+        @Override
+        public Optional<SortOrder> visit(LogicalTypeAnnotation.DecimalLogicalTypeAnnotation decimalLogicalType) {
+          return of(SortOrder.UNKNOWN);
+        }
+
+        @Override
+        public Optional<SortOrder> visit(LogicalTypeAnnotation.MapKeyValueTypeAnnotation mapKeyValueLogicalType) {
+          return of(SortOrder.UNKNOWN);
+        }
+
+        @Override
+        public Optional<SortOrder> visit(LogicalTypeAnnotation.MapLogicalTypeAnnotation mapLogicalType) {
+          return of(SortOrder.UNKNOWN);
+        }
+
+        @Override
+        public Optional<SortOrder> visit(LogicalTypeAnnotation.ListLogicalTypeAnnotation listLogicalType) {
+          return of(SortOrder.UNKNOWN);
+        }
+
+        @Override
+        public Optional<SortOrder> visit(LogicalTypeAnnotation.TimeLogicalTypeAnnotation timeLogicalType) {
+          return of(SortOrder.SIGNED);
+        }
+
+        @Override
+        public Optional<SortOrder> visit(LogicalTypeAnnotation.TimestampLogicalTypeAnnotation timestampLogicalType) {
+          return of(SortOrder.SIGNED);
+        }
+      }).orElse(defaultSortOrder(primitive.getPrimitiveTypeName()));
     }
+
     return defaultSortOrder(primitive.getPrimitiveTypeName());
   }
 
diff --git a/parquet-hive/parquet-hive-storage-handler/src/main/java/org/apache/hadoop/hive/ql/io/parquet/convert/HiveSchemaConverter.java b/parquet-hive/parquet-hive-storage-handler/src/main/java/org/apache/hadoop/hive/ql/io/parquet/convert/HiveSchemaConverter.java
index 5d3ab48..6d229a6 100644
--- a/parquet-hive/parquet-hive-storage-handler/src/main/java/org/apache/hadoop/hive/ql/io/parquet/convert/HiveSchemaConverter.java
+++ b/parquet-hive/parquet-hive-storage-handler/src/main/java/org/apache/hadoop/hive/ql/io/parquet/convert/HiveSchemaConverter.java
@@ -1,4 +1,4 @@
-/* 
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -30,12 +30,15 @@ import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
 
 import org.apache.parquet.schema.ConversionPatterns;
 import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.LogicalTypeAnnotation;
 import org.apache.parquet.schema.MessageType;
-import org.apache.parquet.schema.OriginalType;
 import org.apache.parquet.schema.PrimitiveType;
 import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
 import org.apache.parquet.schema.Type;
 import org.apache.parquet.schema.Type.Repetition;
+import org.apache.parquet.schema.Types;
+
+import static org.apache.parquet.schema.LogicalTypeAnnotation.listType;
 
 public class HiveSchemaConverter {
 
@@ -105,7 +108,7 @@ public class HiveSchemaConverter {
   // 1 anonymous element "array_element"
   private static GroupType convertArrayType(final String name, final ListTypeInfo typeInfo) {
     final TypeInfo subType = typeInfo.getListElementTypeInfo();
-    return listWrapper(name, OriginalType.LIST, new GroupType(Repetition.REPEATED,
+    return listWrapper(name, listType(), new GroupType(Repetition.REPEATED,
         ParquetHiveSerDe.ARRAY.toString(), convertType("array_element", subType)));
   }
 
@@ -127,8 +130,8 @@ public class HiveSchemaConverter {
     return ConversionPatterns.mapType(Repetition.OPTIONAL, name, keyType, valueType);
   }
 
-  private static GroupType listWrapper(final String name, final OriginalType originalType,
+  private static GroupType listWrapper(final String name, final LogicalTypeAnnotation logicalTypeAnnotation,
       final GroupType groupType) {
-    return new GroupType(Repetition.OPTIONAL, name, originalType, groupType);
+    return Types.optionalGroup().addField(groupType).as(logicalTypeAnnotation).named(name);
   }
 }
diff --git a/parquet-pig/src/main/java/org/apache/parquet/pig/PigSchemaConverter.java b/parquet-pig/src/main/java/org/apache/parquet/pig/PigSchemaConverter.java
index 24f7ee8..1935661 100644
--- a/parquet-pig/src/main/java/org/apache/parquet/pig/PigSchemaConverter.java
+++ b/parquet-pig/src/main/java/org/apache/parquet/pig/PigSchemaConverter.java
@@ -23,7 +23,10 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import java.util.Optional;
 
+import org.apache.parquet.schema.LogicalTypeAnnotation;
+import org.apache.parquet.schema.Types;
 import org.apache.pig.LoadPushDown.RequiredField;
 import org.apache.pig.LoadPushDown.RequiredFieldList;
 import org.apache.pig.data.DataType;
@@ -38,7 +41,6 @@ import org.apache.pig.parser.ParserException;
 import org.apache.parquet.schema.ConversionPatterns;
 import org.apache.parquet.schema.GroupType;
 import org.apache.parquet.schema.MessageType;
-import org.apache.parquet.schema.OriginalType;
 import org.apache.parquet.schema.PrimitiveType;
 import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
 import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeNameConverter;
@@ -47,6 +49,9 @@ import org.apache.parquet.schema.Type.Repetition;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static java.util.Optional.of;
+import static org.apache.parquet.schema.LogicalTypeAnnotation.stringType;
+
 
 /**
  * Converts a Pig Schema into a Parquet schema
@@ -205,7 +210,7 @@ public class PigSchemaConverter {
       throws FrontendException {
     final PrimitiveTypeName parquetPrimitiveTypeName =
         parquetType.asPrimitiveType().getPrimitiveTypeName();
-    final OriginalType originalType = parquetType.getOriginalType();
+    final LogicalTypeAnnotation logicalTypeAnnotation = parquetType.getLogicalTypeAnnotation();
     return parquetPrimitiveTypeName.convert(
         new PrimitiveTypeNameConverter<Schema.FieldSchema, FrontendException>() {
       @Override
@@ -242,7 +247,7 @@ public class PigSchemaConverter {
       @Override
       public FieldSchema convertFIXED_LEN_BYTE_ARRAY(
         PrimitiveTypeName primitiveTypeName) throws FrontendException {
-        if (originalType == OriginalType.DECIMAL) {
+        if (logicalTypeAnnotation instanceof LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) {
           return new FieldSchema(fieldName, null, DataType.BIGDECIMAL);
         } else {
           return new FieldSchema(fieldName, null, DataType.BYTEARRAY);
@@ -258,7 +263,7 @@ public class PigSchemaConverter {
       @Override
       public FieldSchema convertBINARY(PrimitiveTypeName primitiveTypeName)
           throws FrontendException {
-        if (originalType != null && originalType == OriginalType.UTF8) {
+        if (logicalTypeAnnotation instanceof LogicalTypeAnnotation.StringLogicalTypeAnnotation) {
           return new FieldSchema(fieldName, null, DataType.CHARARRAY);
         } else {
           return new FieldSchema(fieldName, null, DataType.BYTEARRAY);
@@ -267,47 +272,71 @@ public class PigSchemaConverter {
     });
   }
 
+  /*
+   * RuntimeException class to workaround throwing checked FrontendException in logical type visitors.
+   * Wrap the FrontendException inside the visitor in an inner catch block, and rethrow it outside of the visitor
+   */
+  private static final class FrontendExceptionWrapper extends RuntimeException {
+    final FrontendException frontendException;
+
+    FrontendExceptionWrapper(FrontendException frontendException) {
+      this.frontendException = frontendException;
+    }
+  }
+
   private FieldSchema getComplexFieldSchema(String fieldName, Type parquetType)
       throws FrontendException {
     GroupType parquetGroupType = parquetType.asGroupType();
-    OriginalType originalType = parquetGroupType.getOriginalType();
-    if (originalType !=  null) {
-      switch(originalType) {
-      case MAP:
-        // verify that its a map
-        if (parquetGroupType.getFieldCount() != 1 || parquetGroupType.getType(0).isPrimitive()) {
-          throw new SchemaConversionException("Invalid map type " + parquetGroupType);
-        }
-        GroupType mapKeyValType = parquetGroupType.getType(0).asGroupType();
-        if (!mapKeyValType.isRepetition(Repetition.REPEATED) ||
-            (mapKeyValType.getOriginalType() != null && !mapKeyValType.getOriginalType().equals(OriginalType.MAP_KEY_VALUE)) ||
-            mapKeyValType.getFieldCount()!=2) {
-          throw new SchemaConversionException("Invalid map type " + parquetGroupType);
-        }
-        // if value is not primitive wrap it in a tuple
-        Type valueType = mapKeyValType.getType(1);
-        Schema s = convertField(valueType);
-        s.getField(0).alias = null;
-        return new FieldSchema(fieldName, s, DataType.MAP);
-      case LIST:
-        Type type = parquetGroupType.getType(0);
-        if (parquetGroupType.getFieldCount()!= 1 || type.isPrimitive()) {
-          // an array is effectively a bag
-          Schema primitiveSchema = new Schema(getSimpleFieldSchema(parquetGroupType.getFieldName(0), type));
-          Schema tupleSchema = new Schema(new FieldSchema(ARRAY_VALUE_NAME, primitiveSchema, DataType.TUPLE));
-          return new FieldSchema(fieldName, tupleSchema, DataType.BAG);
-        }
-        GroupType tupleType = parquetGroupType.getType(0).asGroupType();
-        if (!tupleType.isRepetition(Repetition.REPEATED)) {
-          throw new SchemaConversionException("Invalid list type " + parquetGroupType);
-        }
-        Schema tupleSchema = new Schema(new FieldSchema(tupleType.getName(), convertFields(tupleType.getFields()), DataType.TUPLE));
-        return new FieldSchema(fieldName, tupleSchema, DataType.BAG);
-      case MAP_KEY_VALUE:
-      case ENUM:
-      case UTF8:
-      default:
-        throw new SchemaConversionException("Unexpected original type for " + parquetType + ": " + originalType);
+    LogicalTypeAnnotation logicalTypeAnnotation = parquetGroupType.getLogicalTypeAnnotation();
+    if (logicalTypeAnnotation !=  null) {
+      try {
+        return logicalTypeAnnotation.accept(new LogicalTypeAnnotation.LogicalTypeAnnotationVisitor<FieldSchema>() {
+          @Override
+          public Optional<FieldSchema> visit(LogicalTypeAnnotation.MapLogicalTypeAnnotation mapLogicalType) {
+            try {
+              // verify that its a map
+              if (parquetGroupType.getFieldCount() != 1 || parquetGroupType.getType(0).isPrimitive()) {
+                throw new SchemaConversionException("Invalid map type " + parquetGroupType);
+              }
+              GroupType mapKeyValType = parquetGroupType.getType(0).asGroupType();
+              if (!mapKeyValType.isRepetition(Repetition.REPEATED) ||
+                (mapKeyValType.getLogicalTypeAnnotation() != null && !mapKeyValType.getLogicalTypeAnnotation().equals(LogicalTypeAnnotation.MapKeyValueTypeAnnotation.getInstance())) ||
+                mapKeyValType.getFieldCount() != 2) {
+                throw new SchemaConversionException("Invalid map type " + parquetGroupType);
+              }
+              // if value is not primitive wrap it in a tuple
+              Type valueType = mapKeyValType.getType(1);
+              Schema s = convertField(valueType);
+              s.getField(0).alias = null;
+              return of(new FieldSchema(fieldName, s, DataType.MAP));
+            } catch (FrontendException e) {
+              throw new FrontendExceptionWrapper(e);
+            }
+          }
+
+          @Override
+          public Optional<FieldSchema> visit(LogicalTypeAnnotation.ListLogicalTypeAnnotation listLogicalType) {
+            try {
+              Type type = parquetGroupType.getType(0);
+              if (parquetGroupType.getFieldCount() != 1 || type.isPrimitive()) {
+                // an array is effectively a bag
+                Schema primitiveSchema = new Schema(getSimpleFieldSchema(parquetGroupType.getFieldName(0), type));
+                Schema tupleSchema = new Schema(new FieldSchema(ARRAY_VALUE_NAME, primitiveSchema, DataType.TUPLE));
+                return of(new FieldSchema(fieldName, tupleSchema, DataType.BAG));
+              }
+              GroupType tupleType = parquetGroupType.getType(0).asGroupType();
+              if (!tupleType.isRepetition(Repetition.REPEATED)) {
+                throw new SchemaConversionException("Invalid list type " + parquetGroupType);
+              }
+              Schema tupleSchema = new Schema(new FieldSchema(tupleType.getName(), convertFields(tupleType.getFields()), DataType.TUPLE));
+              return of(new FieldSchema(fieldName, tupleSchema, DataType.BAG));
+            } catch (FrontendException e) {
+              throw new FrontendExceptionWrapper(e);
+            }
+          }
+        }).orElseThrow(() -> new SchemaConversionException("Unexpected original type for " + parquetType + ": " + logicalTypeAnnotation));
+      } catch (FrontendExceptionWrapper e) {
+        throw e.frontendException;
       }
     } else {
       // if original type is not set, we assume it to be tuple
@@ -359,7 +388,7 @@ public class PigSchemaConverter {
       case DataType.BOOLEAN:
         return primitive(name, PrimitiveTypeName.BOOLEAN);
       case DataType.CHARARRAY:
-        return primitive(name, PrimitiveTypeName.BINARY, OriginalType.UTF8);
+        return primitive(name, PrimitiveTypeName.BINARY, stringType());
       case DataType.INTEGER:
         return primitive(name, PrimitiveTypeName.INT32);
       case DataType.LONG:
@@ -403,12 +432,12 @@ public class PigSchemaConverter {
     return fieldAlias == null ? defaultName : fieldAlias;
   }
 
-  private Type primitive(String name, PrimitiveTypeName primitive, OriginalType originalType) {
-    return new PrimitiveType(Repetition.OPTIONAL, primitive, name, originalType);
+  private Type primitive(String name, PrimitiveTypeName primitive, LogicalTypeAnnotation logicalTypeAnnotation) {
+    return Types.primitive(primitive, Repetition.OPTIONAL).as(logicalTypeAnnotation).named(name);
   }
 
   private PrimitiveType primitive(String name, PrimitiveTypeName primitive) {
-    return new PrimitiveType(Repetition.OPTIONAL, primitive, name, null);
+    return Types.primitive(primitive, Repetition.OPTIONAL).named(name);
   }
 
   /**
@@ -511,7 +540,8 @@ public class PigSchemaConverter {
     }
     Type nested = bagType.getType(0);
     FieldSchema innerField = bagFieldSchema.schema.getField(0);
-    if (nested.isPrimitive() || nested.getOriginalType() == OriginalType.MAP || nested.getOriginalType() == OriginalType.LIST) {
+    if (nested.isPrimitive() || nested.getLogicalTypeAnnotation() instanceof LogicalTypeAnnotation.MapLogicalTypeAnnotation
+      || nested.getLogicalTypeAnnotation() instanceof LogicalTypeAnnotation.ListLogicalTypeAnnotation) {
       // Bags always contain tuples => we skip the extra tuple that was inserted in that case.
       innerField = innerField.schema.getField(0);
     }
diff --git a/parquet-pig/src/main/java/org/apache/parquet/pig/convert/TupleConverter.java b/parquet-pig/src/main/java/org/apache/parquet/pig/convert/TupleConverter.java
index 18ea9e4..48bb753 100644
--- a/parquet-pig/src/main/java/org/apache/parquet/pig/convert/TupleConverter.java
+++ b/parquet-pig/src/main/java/org/apache/parquet/pig/convert/TupleConverter.java
@@ -1,4 +1,4 @@
-/* 
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -23,6 +23,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.math.BigDecimal;
 
+import org.apache.parquet.schema.LogicalTypeAnnotation;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.data.DataByteArray;
 import org.apache.pig.data.DataType;
@@ -40,11 +41,8 @@ import org.apache.parquet.io.api.Converter;
 import org.apache.parquet.io.api.GroupConverter;
 import org.apache.parquet.io.api.PrimitiveConverter;
 import org.apache.parquet.pig.TupleConversionException;
-import org.apache.parquet.pig.convert.DecimalUtils;
 import org.apache.parquet.schema.GroupType;
-import org.apache.parquet.schema.OriginalType;
 import org.apache.parquet.schema.PrimitiveType;
-import org.apache.parquet.schema.DecimalMetadata;
 import org.apache.parquet.schema.Type;
 import org.apache.parquet.schema.Type.Repetition;
 
@@ -74,7 +72,7 @@ public class TupleConverter extends GroupConverter {
         FieldSchema field = pigSchema.getField(i);
         if(parquetSchema.containsField(field.alias) || columnIndexAccess) {
           Type type = getType(columnIndexAccess, field.alias, i);
-          
+
           if(type != null) {
             final int index = i;
             converters[c++] = newConverter(field, type, new ParentValueContainer() {
@@ -85,7 +83,7 @@ public class TupleConverter extends GroupConverter {
             }, elephantBirdCompatible, columnIndexAccess);
           }
         }
-        
+
       }
     } catch (FrontendException e) {
       throw new ParquetDecodingException("can not initialize pig converter from:\n" + parquetSchema + "\n" + pigSchema, e);
@@ -100,10 +98,10 @@ public class TupleConverter extends GroupConverter {
     } else {
       return parquetSchema.getType(parquetSchema.getFieldIndex(alias));
     }
-    
+
     return null;
   }
-  
+
   static Converter newConverter(FieldSchema pigField, Type type, final ParentValueContainer parent, boolean elephantBirdCompatible, boolean columnIndexAccess) {
     try {
       switch (pigField.type) {
@@ -122,7 +120,7 @@ public class TupleConverter extends GroupConverter {
       case DataType.CHARARRAY:
           //If the orignal type isn't a string, we don't want to use the dictionary because
           //a custom implementation will be needed for each type.  Just default to no dictionary.
-        return new FieldStringConverter(parent, type.getOriginalType() == OriginalType.UTF8);
+        return new FieldStringConverter(parent, type.getLogicalTypeAnnotation() instanceof LogicalTypeAnnotation.StringLogicalTypeAnnotation);
       case DataType.BYTEARRAY:
         return new FieldByteArrayConverter(parent);
       case DataType.INTEGER:
@@ -277,8 +275,6 @@ public class TupleConverter extends GroupConverter {
     public void addBoolean(boolean value) {
       parent.add(Boolean.toString(value));
     }
-    
-    
   }
 
   /**
@@ -403,7 +399,7 @@ public class TupleConverter extends GroupConverter {
 
     @Override
     public void addInt(int value) {
-      parent.add((long)value); 
+      parent.add((long)value);
     }
 
     @Override
@@ -425,7 +421,7 @@ public class TupleConverter extends GroupConverter {
     public void addBinary(Binary value) {
       parent.add(Long.parseLong(value.toStringUsingUTF8()));
     }
-    
+
   }
 
   /**
@@ -511,8 +507,6 @@ public class TupleConverter extends GroupConverter {
     public void addBinary(Binary value) {
       parent.add(Boolean.parseBoolean(value.toStringUsingUTF8()));
     }
-
-    
   }
 
   /**
@@ -554,7 +548,8 @@ public class TupleConverter extends GroupConverter {
 
       ParentValueContainer childsParent;
       FieldSchema pigField;
-      if (nestedType.isPrimitive() || nestedType.getOriginalType() == OriginalType.MAP || nestedType.getOriginalType() == OriginalType.LIST) {
+      if (nestedType.isPrimitive() || nestedType.getLogicalTypeAnnotation() instanceof LogicalTypeAnnotation.MapLogicalTypeAnnotation
+        || nestedType.getLogicalTypeAnnotation() instanceof LogicalTypeAnnotation.ListLogicalTypeAnnotation) {
         // Pig bags always contain tuples
         // In that case we need to wrap the value in an extra tuple
         childsParent = new ParentValueContainer() {
diff --git a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoMessageConverter.java b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoMessageConverter.java
index 979d78e..92d8b62 100644
--- a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoMessageConverter.java
+++ b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoMessageConverter.java
@@ -1,4 +1,4 @@
-/* 
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -31,15 +31,17 @@ import org.apache.parquet.io.api.GroupConverter;
 import org.apache.parquet.io.api.PrimitiveConverter;
 import org.apache.parquet.schema.GroupType;
 import org.apache.parquet.schema.IncompatibleSchemaModificationException;
-import org.apache.parquet.schema.OriginalType;
+import org.apache.parquet.schema.LogicalTypeAnnotation;
 import org.apache.parquet.schema.Type;
 
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 
 import static com.google.protobuf.Descriptors.FieldDescriptor.JavaType;
+import static java.util.Optional.of;
 
 /**
  * Converts Protocol Buffer message (both top level and inner) to parquet.
@@ -128,13 +130,22 @@ class ProtoMessageConverter extends GroupConverter {
       };
     }
 
-    if (OriginalType.LIST == parquetType.getOriginalType()) {
-      return new ListConverter(parentBuilder, fieldDescriptor, parquetType);
+    LogicalTypeAnnotation logicalTypeAnnotation = parquetType.getLogicalTypeAnnotation();
+    if (logicalTypeAnnotation == null) {
+      return newScalarConverter(parent, parentBuilder, fieldDescriptor, parquetType);
     }
-    if (OriginalType.MAP == parquetType.getOriginalType()) {
-      return new MapConverter(parentBuilder, fieldDescriptor, parquetType);
-    }
-    return newScalarConverter(parent, parentBuilder, fieldDescriptor, parquetType);
+
+    return logicalTypeAnnotation.accept(new LogicalTypeAnnotation.LogicalTypeAnnotationVisitor<Converter>() {
+      @Override
+      public Optional<Converter> visit(LogicalTypeAnnotation.ListLogicalTypeAnnotation listLogicalType) {
+        return of(new ListConverter(parentBuilder, fieldDescriptor, parquetType));
+      }
+
+      @Override
+      public Optional<Converter> visit(LogicalTypeAnnotation.MapLogicalTypeAnnotation mapLogicalType) {
+        return of(new MapConverter(parentBuilder, fieldDescriptor, parquetType));
+      }
+    }).orElse(newScalarConverter(parent, parentBuilder, fieldDescriptor, parquetType));
   }
 
   private Converter newScalarConverter(ParentValueContainer pvc, Message.Builder parentBuilder, Descriptors.FieldDescriptor fieldDescriptor, Type parquetType) {
@@ -376,9 +387,9 @@ class ProtoMessageConverter extends GroupConverter {
     private final Converter converter;
 
     public ListConverter(Message.Builder parentBuilder, Descriptors.FieldDescriptor fieldDescriptor, Type parquetType) {
-      OriginalType originalType = parquetType.getOriginalType();
-      if (originalType != OriginalType.LIST || parquetType.isPrimitive()) {
-        throw new ParquetDecodingException("Expected LIST wrapper. Found: " + originalType + " instead.");
+      LogicalTypeAnnotation logicalTypeAnnotation = parquetType.getLogicalTypeAnnotation();
+      if (!(logicalTypeAnnotation instanceof LogicalTypeAnnotation.ListLogicalTypeAnnotation) || parquetType.isPrimitive()) {
+        throw new ParquetDecodingException("Expected LIST wrapper. Found: " + logicalTypeAnnotation + " instead.");
       }
 
       GroupType rootWrapperType = parquetType.asGroupType();
@@ -435,9 +446,9 @@ class ProtoMessageConverter extends GroupConverter {
     private final Converter converter;
 
     public MapConverter(Message.Builder parentBuilder, Descriptors.FieldDescriptor fieldDescriptor, Type parquetType) {
-      OriginalType originalType = parquetType.getOriginalType();
-      if (originalType != OriginalType.MAP) {
-        throw new ParquetDecodingException("Expected MAP wrapper. Found: " + originalType + " instead.");
+      LogicalTypeAnnotation logicalTypeAnnotation = parquetType.getLogicalTypeAnnotation();
+      if (!(logicalTypeAnnotation instanceof LogicalTypeAnnotation.MapLogicalTypeAnnotation)) {
+        throw new ParquetDecodingException("Expected MAP wrapper. Found: " + logicalTypeAnnotation + " instead.");
       }
 
       Type parquetSchema;
diff --git a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoSchemaConverter.java b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoSchemaConverter.java
index 0e1aa20..db5be14 100644
--- a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoSchemaConverter.java
+++ b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoSchemaConverter.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -23,8 +23,8 @@ import com.google.protobuf.Descriptors.FieldDescriptor;
 import com.google.protobuf.Descriptors.FieldDescriptor.JavaType;
 import com.google.protobuf.Message;
 import com.twitter.elephantbird.util.Protobufs;
+import org.apache.parquet.schema.LogicalTypeAnnotation;
 import org.apache.parquet.schema.MessageType;
-import org.apache.parquet.schema.OriginalType;
 import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
 import org.apache.parquet.schema.Type;
 import org.apache.parquet.schema.Types;
@@ -35,8 +35,10 @@ import org.slf4j.LoggerFactory;
 
 import java.util.List;
 
-import static org.apache.parquet.schema.OriginalType.ENUM;
-import static org.apache.parquet.schema.OriginalType.UTF8;
+import static org.apache.parquet.schema.LogicalTypeAnnotation.enumType;
+import static org.apache.parquet.schema.LogicalTypeAnnotation.listType;
+import static org.apache.parquet.schema.LogicalTypeAnnotation.mapType;
+import static org.apache.parquet.schema.LogicalTypeAnnotation.stringType;
 import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.*;
 
 /**
@@ -101,20 +103,19 @@ public class ProtoSchemaConverter {
     ParquetType parquetType = getParquetType(descriptor);
     if (descriptor.isRepeated() && parquetSpecsCompliant) {
       // the old schema style did not include the LIST wrapper around repeated fields
-      return addRepeatedPrimitive(descriptor, parquetType.primitiveType, parquetType.originalType, builder);
+      return addRepeatedPrimitive(parquetType.primitiveType, parquetType.logicalTypeAnnotation, builder);
     }
 
-    return builder.primitive(parquetType.primitiveType, getRepetition(descriptor)).as(parquetType.originalType);
+    return builder.primitive(parquetType.primitiveType, getRepetition(descriptor)).as(parquetType.logicalTypeAnnotation);
   }
 
-  private <T> Builder<? extends Builder<?, GroupBuilder<T>>, GroupBuilder<T>> addRepeatedPrimitive(FieldDescriptor descriptor,
-                                                                                                   PrimitiveTypeName primitiveType,
-                                                                                                   OriginalType originalType,
+  private <T> Builder<? extends Builder<?, GroupBuilder<T>>, GroupBuilder<T>> addRepeatedPrimitive(PrimitiveTypeName primitiveType,
+                                                                                                   LogicalTypeAnnotation logicalTypeAnnotation,
                                                                                                    final GroupBuilder<T> builder) {
     return builder
-        .group(Type.Repetition.OPTIONAL).as(OriginalType.LIST)
+        .group(Type.Repetition.OPTIONAL).as(listType())
           .group(Type.Repetition.REPEATED)
-            .primitive(primitiveType, Type.Repetition.REQUIRED).as(originalType)
+            .primitive(primitiveType, Type.Repetition.REQUIRED).as(logicalTypeAnnotation)
           .named("element")
         .named("list");
   }
@@ -122,7 +123,7 @@ public class ProtoSchemaConverter {
   private <T> GroupBuilder<GroupBuilder<T>> addRepeatedMessage(FieldDescriptor descriptor, GroupBuilder<T> builder) {
     GroupBuilder<GroupBuilder<GroupBuilder<GroupBuilder<T>>>> result =
       builder
-        .group(Type.Repetition.OPTIONAL).as(OriginalType.LIST)
+        .group(Type.Repetition.OPTIONAL).as(listType())
         .group(Type.Repetition.REPEATED)
         .group(Type.Repetition.OPTIONAL);
 
@@ -156,9 +157,9 @@ public class ProtoSchemaConverter {
     ParquetType mapKeyParquetType = getParquetType(fields.get(0));
 
     GroupBuilder<GroupBuilder<GroupBuilder<T>>> group = builder
-      .group(Type.Repetition.OPTIONAL).as(OriginalType.MAP) // only optional maps are allowed in Proto3
+      .group(Type.Repetition.OPTIONAL).as(mapType()) // only optional maps are allowed in Proto3
       .group(Type.Repetition.REPEATED) // key_value wrapper
-      .primitive(mapKeyParquetType.primitiveType, Type.Repetition.REQUIRED).as(mapKeyParquetType.originalType).named("key");
+      .primitive(mapKeyParquetType.primitiveType, Type.Repetition.REQUIRED).as(mapKeyParquetType.logicalTypeAnnotation).named("key");
 
     return addField(fields.get(1), group).named("value")
       .named("key_value");
@@ -173,8 +174,8 @@ public class ProtoSchemaConverter {
       case DOUBLE: return ParquetType.of(DOUBLE);
       case BOOLEAN: return ParquetType.of(BOOLEAN);
       case FLOAT: return ParquetType.of(FLOAT);
-      case STRING: return ParquetType.of(BINARY, UTF8);
-      case ENUM: return ParquetType.of(BINARY, ENUM);
+      case STRING: return ParquetType.of(BINARY, stringType());
+      case ENUM: return ParquetType.of(BINARY, enumType());
       case BYTE_STRING: return ParquetType.of(BINARY);
       default:
         throw new UnsupportedOperationException("Cannot convert Protocol Buffer: unknown type " + javaType);
@@ -183,15 +184,15 @@ public class ProtoSchemaConverter {
 
   private static class ParquetType {
     PrimitiveTypeName primitiveType;
-    OriginalType originalType;
+    LogicalTypeAnnotation logicalTypeAnnotation;
 
-    private ParquetType(PrimitiveTypeName primitiveType, OriginalType originalType) {
+    private ParquetType(PrimitiveTypeName primitiveType, LogicalTypeAnnotation logicalTypeAnnotation) {
       this.primitiveType = primitiveType;
-      this.originalType = originalType;
+      this.logicalTypeAnnotation = logicalTypeAnnotation;
     }
 
-    public static ParquetType of(PrimitiveTypeName primitiveType, OriginalType originalType) {
-      return new ParquetType(primitiveType, originalType);
+    public static ParquetType of(PrimitiveTypeName primitiveType, LogicalTypeAnnotation logicalTypeAnnotation) {
+      return new ParquetType(primitiveType, logicalTypeAnnotation);
     }
 
     public static ParquetType of(PrimitiveTypeName primitiveType) {
diff --git a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoWriteSupport.java b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoWriteSupport.java
index 59c236f..7436b04 100644
--- a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoWriteSupport.java
+++ b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoWriteSupport.java
@@ -1,4 +1,4 @@
-/* 
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -38,6 +38,9 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
+
+import static java.util.Optional.ofNullable;
 
 /**
  * Implementation of {@link WriteSupport} for writing Protocol Buffers.
@@ -216,15 +219,21 @@ public class ProtoWriteSupport<T extends MessageOrBuilder> extends WriteSupport<
     }
 
     private GroupType getGroupType(Type type) {
-      if (type.getOriginalType() == OriginalType.LIST) {
-        return type.asGroupType().getType("list").asGroupType().getType("element").asGroupType();
-      }
-
-      if (type.getOriginalType() == OriginalType.MAP) {
-        return type.asGroupType().getType("key_value").asGroupType().getType("value").asGroupType();
+      LogicalTypeAnnotation logicalTypeAnnotation = type.getLogicalTypeAnnotation();
+      if (logicalTypeAnnotation == null) {
+        return type.asGroupType();
       }
+      return logicalTypeAnnotation.accept(new LogicalTypeAnnotation.LogicalTypeAnnotationVisitor<GroupType>() {
+        @Override
+        public Optional<GroupType> visit(LogicalTypeAnnotation.ListLogicalTypeAnnotation listLogicalType) {
+          return ofNullable(type.asGroupType().getType("list").asGroupType().getType("element").asGroupType());
+        }
 
-      return type.asGroupType();
+        @Override
+        public Optional<GroupType> visit(LogicalTypeAnnotation.MapLogicalTypeAnnotation mapLogicalType) {
+          return ofNullable(type.asGroupType().getType("key_value").asGroupType().getType("value").asGroupType());
+        }
+      }).orElse(type.asGroupType());
     }
 
     private MapWriter createMapWriter(FieldDescriptor fieldDescriptor, Type type) {
diff --git a/parquet-thrift/src/main/java/org/apache/parquet/thrift/ThriftSchemaConvertVisitor.java b/parquet-thrift/src/main/java/org/apache/parquet/thrift/ThriftSchemaConvertVisitor.java
index 1185382..7bfcdb1 100644
--- a/parquet-thrift/src/main/java/org/apache/parquet/thrift/ThriftSchemaConvertVisitor.java
+++ b/parquet-thrift/src/main/java/org/apache/parquet/thrift/ThriftSchemaConvertVisitor.java
@@ -1,4 +1,4 @@
-/* 
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -23,8 +23,8 @@ import java.util.List;
 
 import org.apache.parquet.ShouldNeverHappenException;
 import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.LogicalTypeAnnotation;
 import org.apache.parquet.schema.MessageType;
-import org.apache.parquet.schema.OriginalType;
 import org.apache.parquet.schema.PrimitiveType;
 import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
 import org.apache.parquet.schema.Type;
@@ -55,8 +55,8 @@ import org.apache.parquet.thrift.struct.ThriftType.StructType.StructOrUnionType;
 import static org.apache.parquet.Preconditions.checkNotNull;
 import static org.apache.parquet.schema.ConversionPatterns.listType;
 import static org.apache.parquet.schema.ConversionPatterns.mapType;
-import static org.apache.parquet.schema.OriginalType.ENUM;
-import static org.apache.parquet.schema.OriginalType.UTF8;
+import static org.apache.parquet.schema.LogicalTypeAnnotation.enumType;
+import static org.apache.parquet.schema.LogicalTypeAnnotation.stringType;
 import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
 import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BOOLEAN;
 import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.DOUBLE;
@@ -278,7 +278,7 @@ class ThriftSchemaConvertVisitor implements ThriftType.StateVisitor<ConvertedFie
     return visitPrimitiveType(type, null, state);
   }
 
-  private ConvertedField visitPrimitiveType(PrimitiveTypeName type, OriginalType orig, State state) {
+  private ConvertedField visitPrimitiveType(PrimitiveTypeName type, LogicalTypeAnnotation orig, State state) {
     PrimitiveBuilder<PrimitiveType> b = primitive(type, state.repetition);
 
     if (orig != null) {
@@ -294,7 +294,7 @@ class ThriftSchemaConvertVisitor implements ThriftType.StateVisitor<ConvertedFie
 
   @Override
   public ConvertedField visit(EnumType enumType, State state) {
-    return visitPrimitiveType(BINARY, ENUM, state);
+    return visitPrimitiveType(BINARY, enumType(), state);
   }
 
   @Override
@@ -329,7 +329,7 @@ class ThriftSchemaConvertVisitor implements ThriftType.StateVisitor<ConvertedFie
 
   @Override
   public ConvertedField visit(StringType stringType, State state) {
-    return stringType.isBinary() ? visitPrimitiveType(BINARY, state) : visitPrimitiveType(BINARY, UTF8, state);
+    return stringType.isBinary() ? visitPrimitiveType(BINARY, state) : visitPrimitiveType(BINARY, stringType(), state);
   }
 
   private static boolean isUnion(StructOrUnionType s) {
diff --git a/parquet-tools/src/main/java/org/apache/parquet/tools/command/DumpCommand.java b/parquet-tools/src/main/java/org/apache/parquet/tools/command/DumpCommand.java
index 26b5562..27043b9 100644
--- a/parquet-tools/src/main/java/org/apache/parquet/tools/command/DumpCommand.java
+++ b/parquet-tools/src/main/java/org/apache/parquet/tools/command/DumpCommand.java
@@ -58,7 +58,6 @@ import org.apache.parquet.io.api.GroupConverter;
 import org.apache.parquet.io.api.PrimitiveConverter;
 import org.apache.parquet.schema.MessageType;
 import org.apache.parquet.schema.PrimitiveStringifier;
-import org.apache.parquet.tools.util.MetadataUtils;
 import org.apache.parquet.tools.util.PrettyPrintWriter;
 import org.apache.parquet.tools.util.PrettyPrintWriter.WhiteSpaceHandler;
 
diff --git a/parquet-tools/src/main/java/org/apache/parquet/tools/util/MetadataUtils.java b/parquet-tools/src/main/java/org/apache/parquet/tools/command/MetadataUtils.java
similarity index 76%
copy from parquet-tools/src/main/java/org/apache/parquet/tools/util/MetadataUtils.java
copy to parquet-tools/src/main/java/org/apache/parquet/tools/command/MetadataUtils.java
index 870b8c1..0bade37 100644
--- a/parquet-tools/src/main/java/org/apache/parquet/tools/util/MetadataUtils.java
+++ b/parquet-tools/src/main/java/org/apache/parquet/tools/command/MetadataUtils.java
@@ -1,4 +1,4 @@
-/* 
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -16,16 +16,10 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.parquet.tools.util;
-
-import java.util.ArrayList;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
+package org.apache.parquet.tools.command;
 
 import com.google.common.base.Joiner;
 import com.google.common.base.Strings;
-
 import org.apache.parquet.column.ColumnDescriptor;
 import org.apache.parquet.column.statistics.Statistics;
 import org.apache.parquet.hadoop.metadata.BlockMetaData;
@@ -33,19 +27,23 @@ import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
 import org.apache.parquet.hadoop.metadata.FileMetaData;
 import org.apache.parquet.hadoop.metadata.ParquetMetadata;
 import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.LogicalTypeAnnotation;
 import org.apache.parquet.schema.MessageType;
 import org.apache.parquet.schema.OriginalType;
 import org.apache.parquet.schema.PrimitiveType;
 import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
 import org.apache.parquet.schema.Type;
 import org.apache.parquet.schema.Type.Repetition;
+import org.apache.parquet.tools.util.PrettyPrintWriter;
 
-public class MetadataUtils {
-  public static final double BAD_COMPRESSION_RATIO_CUTOFF = 0.97;
-  public static final double GOOD_COMPRESSION_RATIO_CUTOFF = 1.2;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
 
-  public static void showDetails(PrettyPrintWriter out, ParquetMetadata meta) {
-    showDetails(out, meta.getFileMetaData());
+class MetadataUtils {
+  static void showDetails(PrettyPrintWriter out, ParquetMetadata meta, boolean showOriginalTypes) {
+    showDetails(out, meta.getFileMetaData(), showOriginalTypes);
 
     long i = 1;
     for (BlockMetaData bmeta : meta.getBlocks()) {
@@ -54,7 +52,7 @@ public class MetadataUtils {
     }
   }
 
-  public static void showDetails(PrettyPrintWriter out, FileMetaData meta) {
+  static void showDetails(PrettyPrintWriter out, FileMetaData meta, boolean showOriginalTypes) {
     out.format("creator: %s%n", meta.getCreatedBy());
 
     Map<String,String> extra = meta.getKeyValueMetaData();
@@ -70,11 +68,7 @@ public class MetadataUtils {
     out.println();
     out.format("file schema: %s%n", meta.getSchema().getName());
     out.rule('-');
-    showDetails(out, meta.getSchema());
-  }
-
-  public static void showDetails(PrettyPrintWriter out, BlockMetaData meta) {
-    showDetails(out, meta, null);
+    showDetails(out, meta.getSchema(), showOriginalTypes);
   }
 
   private static void showDetails(PrettyPrintWriter out, BlockMetaData meta, Long num) {
@@ -87,7 +81,7 @@ public class MetadataUtils {
     showDetails(out, meta.getColumns());
   }
 
-  public static void showDetails(PrettyPrintWriter out, List<ColumnChunkMetaData> ccmeta) {
+  static void showDetails(PrettyPrintWriter out, List<ColumnChunkMetaData> ccmeta) {
     Map<String,Object> chunks = new LinkedHashMap<String,Object>();
     for (ColumnChunkMetaData cmeta : ccmeta) {
       String[] path = cmeta.getPath().toArray();
@@ -123,10 +117,6 @@ public class MetadataUtils {
     }
   }
 
-  public static void showDetails(PrettyPrintWriter out, ColumnChunkMetaData meta) {
-    showDetails(out, meta, true);
-  }
-
   private static void showDetails(PrettyPrintWriter out, ColumnChunkMetaData meta, boolean name) {
     long doff = meta.getDictionaryPageOffset();
     long foff = meta.getFirstDataPageOffset();
@@ -157,35 +147,14 @@ public class MetadataUtils {
     out.println();
   }
 
-  public static void showDetails(PrettyPrintWriter out, ColumnDescriptor desc) {
-    String path = Joiner.on(".").skipNulls().join(desc.getPath());
-    PrimitiveTypeName type = desc.getType();
-    int defl = desc.getMaxDefinitionLevel();
-    int repl = desc.getMaxRepetitionLevel();
-
-    out.format("column desc: %s T:%s R:%d D:%d%n", path, type, repl, defl); 
-  }
-
-  public static void showDetails(PrettyPrintWriter out, MessageType type) {
+  static void showDetails(PrettyPrintWriter out, MessageType type, boolean showOriginalTypes) {
     List<String> cpath = new ArrayList<String>();
     for (Type ftype : type.getFields()) {
-      showDetails(out, ftype, 0, type, cpath);
+      showDetails(out, ftype, 0, type, cpath, showOriginalTypes);
     }
   }
 
-  public static void showDetails(PrettyPrintWriter out, GroupType type) {
-    showDetails(out, type, 0, null, null);
-  }
-
-  public static void showDetails(PrettyPrintWriter out, PrimitiveType type) {
-    showDetails(out, type, 0, null, null);
-  }
-
-  public static void showDetails(PrettyPrintWriter out, Type type) {
-    showDetails(out, type, 0, null, null);
-  }
-
-  private static void showDetails(PrettyPrintWriter out, GroupType type, int depth, MessageType container, List<String> cpath) {
+  private static void showDetails(PrettyPrintWriter out, GroupType type, int depth, MessageType container, List<String> cpath, boolean showOriginalTypes) {
     String name = Strings.repeat(".", depth) + type.getName();
     Repetition rep = type.getRepetition();
     int fcount = type.getFieldCount();
@@ -193,19 +162,29 @@ public class MetadataUtils {
 
     cpath.add(type.getName());
     for (Type ftype : type.getFields()) {
-      showDetails(out, ftype, depth + 1, container, cpath);
+      showDetails(out, ftype, depth + 1, container, cpath, showOriginalTypes);
     }
     cpath.remove(cpath.size() - 1);
   }
 
-  private static void showDetails(PrettyPrintWriter out, PrimitiveType type, int depth, MessageType container, List<String> cpath) {
+  private static void showDetails(PrettyPrintWriter out, PrimitiveType type, int depth, MessageType container, List<String> cpath, boolean showOriginalTypes) {
     String name = Strings.repeat(".", depth) + type.getName();
-    OriginalType otype = type.getOriginalType();
     Repetition rep = type.getRepetition();
     PrimitiveTypeName ptype = type.getPrimitiveTypeName();
 
     out.format("%s: %s %s", name, rep, ptype);
-    if (otype != null) out.format(" O:%s", otype);
+    if (showOriginalTypes) {
+      OriginalType otype;
+      try {
+        otype = type.getOriginalType();
+      } catch (Exception e) {
+        otype = null;
+      }
+      if (otype != null) out.format(" O:%s", otype);
+    } else {
+      LogicalTypeAnnotation ltype = type.getLogicalTypeAnnotation();
+      if (ltype != null) out.format(" L:%s", ltype);
+    }
 
     if (container != null) {
       cpath.add(type.getName());
@@ -221,12 +200,12 @@ public class MetadataUtils {
     out.println();
   }
 
-  private static void showDetails(PrettyPrintWriter out, Type type, int depth, MessageType container, List<String> cpath) {
+  private static void showDetails(PrettyPrintWriter out, Type type, int depth, MessageType container, List<String> cpath, boolean showOriginalTypes) {
     if (type instanceof GroupType) {
-      showDetails(out, type.asGroupType(), depth, container, cpath);
+      showDetails(out, type.asGroupType(), depth, container, cpath, showOriginalTypes);
       return;
     } else if (type instanceof PrimitiveType) {
-      showDetails(out, type.asPrimitiveType(), depth, container, cpath);
+      showDetails(out, type.asPrimitiveType(), depth, container, cpath, showOriginalTypes);
       return;
     }
   }
diff --git a/parquet-tools/src/main/java/org/apache/parquet/tools/command/ShowMetaCommand.java b/parquet-tools/src/main/java/org/apache/parquet/tools/command/ShowMetaCommand.java
index 8d35551..b07fa7a 100644
--- a/parquet-tools/src/main/java/org/apache/parquet/tools/command/ShowMetaCommand.java
+++ b/parquet-tools/src/main/java/org/apache/parquet/tools/command/ShowMetaCommand.java
@@ -1,4 +1,4 @@
-/* 
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -19,13 +19,15 @@
 package org.apache.parquet.tools.command;
 
 import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
 
 import org.apache.parquet.hadoop.Footer;
 import org.apache.parquet.hadoop.ParquetFileReader;
-import org.apache.parquet.tools.util.MetadataUtils;
 import org.apache.parquet.tools.util.PrettyPrintWriter;
 import org.apache.parquet.tools.util.PrettyPrintWriter.WhiteSpaceHandler;
 
@@ -37,6 +39,15 @@ public class ShowMetaCommand extends ArgsOnlyCommand {
     "where <input> is the parquet file to print to stdout"
   };
 
+  public static final Options OPTIONS;
+  static {
+    OPTIONS = new Options();
+    Option originalType = OptionBuilder.withLongOpt("originalType")
+      .withDescription("Print logical types in OriginalType representation.")
+      .create('o');
+    OPTIONS.addOption(originalType);
+  }
+
   public ShowMetaCommand() {
     super(1, 1);
   }
@@ -52,12 +63,18 @@ public class ShowMetaCommand extends ArgsOnlyCommand {
   }
 
   @Override
+  public Options getOptions() {
+    return OPTIONS;
+  }
+
+  @Override
   public void execute(CommandLine options) throws Exception {
     super.execute(options);
 
     String[] args = options.getArgs();
     String input = args[0];
-    
+    boolean showOriginalTypes = options.hasOption('o');
+
     Configuration conf = new Configuration();
     Path inputPath = new Path(input);
     FileStatus inputFileStatus = inputPath.getFileSystem(conf).getFileStatus(inputPath);
@@ -71,7 +88,7 @@ public class ShowMetaCommand extends ArgsOnlyCommand {
 
     for(Footer f: footers) {
       out.format("file: %s%n" , f.getFile());
-      MetadataUtils.showDetails(out, f.getParquetMetadata());
+      MetadataUtils.showDetails(out, f.getParquetMetadata(), showOriginalTypes);
       out.flushColumns();
     }
   }
diff --git a/parquet-tools/src/main/java/org/apache/parquet/tools/command/ShowSchemaCommand.java b/parquet-tools/src/main/java/org/apache/parquet/tools/command/ShowSchemaCommand.java
index d83e564..6f83857 100644
--- a/parquet-tools/src/main/java/org/apache/parquet/tools/command/ShowSchemaCommand.java
+++ b/parquet-tools/src/main/java/org/apache/parquet/tools/command/ShowSchemaCommand.java
@@ -1,4 +1,4 @@
-/* 
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -32,7 +32,6 @@ import org.apache.parquet.hadoop.metadata.ParquetMetadata;
 import org.apache.parquet.hadoop.util.HiddenFileFilter;
 import org.apache.parquet.schema.MessageType;
 import org.apache.parquet.tools.Main;
-import org.apache.parquet.tools.util.MetadataUtils;
 import org.apache.parquet.tools.util.PrettyPrintWriter;
 
 import static org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER;
@@ -49,7 +48,11 @@ public class ShowSchemaCommand extends ArgsOnlyCommand {
     Option help = OptionBuilder.withLongOpt("detailed")
                                .withDescription("Show detailed information about the schema.")
                                .create('d');
+    Option originalType = OptionBuilder.withLongOpt("originalType")
+      .withDescription("Print logical types in OriginalType representation.")
+      .create('o');
     OPTIONS.addOption(help);
+    OPTIONS.addOption(originalType);
   }
 
   public ShowSchemaCommand() {
@@ -98,8 +101,9 @@ public class ShowSchemaCommand extends ArgsOnlyCommand {
 
     Main.out.println(schema);
     if (options.hasOption('d')) {
+      boolean showOriginalTypes = options.hasOption('o');
       PrettyPrintWriter out = PrettyPrintWriter.stdoutPrettyPrinter().build();
-      MetadataUtils.showDetails(out, metaData);
+      MetadataUtils.showDetails(out, metaData, showOriginalTypes);
     }
   }
 }
diff --git a/parquet-tools/src/main/java/org/apache/parquet/tools/read/SimpleRecordConverter.java b/parquet-tools/src/main/java/org/apache/parquet/tools/read/SimpleRecordConverter.java
index a119a34..c07875a 100644
--- a/parquet-tools/src/main/java/org/apache/parquet/tools/read/SimpleRecordConverter.java
+++ b/parquet-tools/src/main/java/org/apache/parquet/tools/read/SimpleRecordConverter.java
@@ -1,4 +1,4 @@
-/* 
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -20,15 +20,18 @@ package org.apache.parquet.tools.read;
 
 import java.math.BigDecimal;
 import java.math.BigInteger;
+import java.util.Optional;
 
 import org.apache.parquet.io.api.Binary;
 import org.apache.parquet.io.api.Converter;
 import org.apache.parquet.io.api.GroupConverter;
 import org.apache.parquet.io.api.PrimitiveConverter;
 import org.apache.parquet.schema.GroupType;
-import org.apache.parquet.schema.OriginalType;
+import org.apache.parquet.schema.LogicalTypeAnnotation;
 import org.apache.parquet.schema.Type;
 
+import static java.util.Optional.of;
+
 public class SimpleRecordConverter extends GroupConverter {
   private final Converter converters[];
   private final String name;
@@ -51,31 +54,38 @@ public class SimpleRecordConverter extends GroupConverter {
   }
 
   private Converter createConverter(Type field) {
-    OriginalType otype = field.getOriginalType();
+    LogicalTypeAnnotation ltype = field.getLogicalTypeAnnotation();
 
     if (field.isPrimitive()) {
-      if (otype != null) {
-        switch (otype) {
-          case MAP: break;
-          case LIST: break;
-          case UTF8: return new StringConverter(field.getName());
-          case MAP_KEY_VALUE: break;
-          case ENUM: break;
-          case DECIMAL:
-            int scale = field.asPrimitiveType().getDecimalMetadata().getScale();
-            return new DecimalConverter(field.getName(), scale);
-        }
+      if (ltype != null) {
+        return ltype.accept(new LogicalTypeAnnotation.LogicalTypeAnnotationVisitor<Converter>() {
+          @Override
+          public Optional<Converter> visit(LogicalTypeAnnotation.StringLogicalTypeAnnotation stringLogicalType) {
+            return of(new StringConverter(field.getName()));
+          }
+
+          @Override
+          public Optional<Converter> visit(LogicalTypeAnnotation.DecimalLogicalTypeAnnotation decimalLogicalType) {
+            int scale = decimalLogicalType.getScale();
+            return of(new DecimalConverter(field.getName(), scale));
+          }
+        }).orElse(new SimplePrimitiveConverter(field.getName()));
       }
-
-      return new SimplePrimitiveConverter(field.getName());
     }
 
     GroupType groupType = field.asGroupType();
-    if (otype != null) {
-      switch (otype) {
-        case MAP: return new SimpleMapRecordConverter(groupType, field.getName(), this);
-        case LIST: return new SimpleListRecordConverter(groupType, field.getName(), this);
-      }
+    if (ltype != null) {
+      return ltype.accept(new LogicalTypeAnnotation.LogicalTypeAnnotationVisitor<Converter>() {
+        @Override
+        public Optional<Converter> visit(LogicalTypeAnnotation.MapLogicalTypeAnnotation mapLogicalType) {
+          return of(new SimpleMapRecordConverter(groupType, field.getName(), SimpleRecordConverter.this));
+        }
+
+        @Override
+        public Optional<Converter> visit(LogicalTypeAnnotation.ListLogicalTypeAnnotation listLogicalType) {
+          return of(new SimpleListRecordConverter(groupType, field.getName(), SimpleRecordConverter.this));
+        }
+      }).orElse(new SimpleRecordConverter(groupType, field.getName(), this));
     }
     return new SimpleRecordConverter(groupType, field.getName(), this);
   }
diff --git a/parquet-tools/src/main/java/org/apache/parquet/tools/util/MetadataUtils.java b/parquet-tools/src/main/java/org/apache/parquet/tools/util/MetadataUtils.java
index 870b8c1..206028a 100644
--- a/parquet-tools/src/main/java/org/apache/parquet/tools/util/MetadataUtils.java
+++ b/parquet-tools/src/main/java/org/apache/parquet/tools/util/MetadataUtils.java
@@ -1,4 +1,4 @@
-/* 
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -40,6 +40,7 @@ import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
 import org.apache.parquet.schema.Type;
 import org.apache.parquet.schema.Type.Repetition;
 
+@Deprecated
 public class MetadataUtils {
   public static final double BAD_COMPRESSION_RATIO_CUTOFF = 0.97;
   public static final double GOOD_COMPRESSION_RATIO_CUTOFF = 1.2;
@@ -163,7 +164,7 @@ public class MetadataUtils {
     int defl = desc.getMaxDefinitionLevel();
     int repl = desc.getMaxRepetitionLevel();
 
-    out.format("column desc: %s T:%s R:%d D:%d%n", path, type, repl, defl); 
+    out.format("column desc: %s T:%s R:%d D:%d%n", path, type, repl, defl);
   }
 
   public static void showDetails(PrettyPrintWriter out, MessageType type) {