You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by ga...@apache.org on 2018/05/24 11:46:14 UTC

[parquet-mr] branch master updated: PARQUET-1253: Support for new logical type representation (#463)

This is an automated email from the ASF dual-hosted git repository.

gabor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-mr.git


The following commit(s) were added to refs/heads/master by this push:
     new 94a8bf6  PARQUET-1253: Support for new logical type representation (#463)
94a8bf6 is described below

commit 94a8bf6d304d08e8a1fc181e7a06a545103e8ddb
Author: nandorKollar <na...@users.noreply.github.com>
AuthorDate: Thu May 24 13:46:11 2018 +0200

    PARQUET-1253: Support for new logical type representation (#463)
---
 .../parquet/cascading/TestParquetTBaseScheme.java  |   7 +-
 .../java/org/apache/parquet/schema/GroupType.java  |  17 +-
 .../parquet/schema/LogicalTypeAnnotation.java      | 878 +++++++++++++++++++++
 .../apache/parquet/schema/MessageTypeParser.java   |  55 +-
 .../org/apache/parquet/schema/PrimitiveType.java   |  60 +-
 .../main/java/org/apache/parquet/schema/Type.java  |  40 +-
 .../main/java/org/apache/parquet/schema/Types.java |  73 +-
 .../apache/parquet/parser/TestParquetParser.java   |  46 +-
 .../apache/parquet/schema/TestTypeBuilders.java    |  47 +-
 .../format/converter/ParquetMetadataConverter.java | 332 ++++++--
 .../parquet/hadoop/metadata/ParquetMetadata.java   |  15 +-
 .../converter/TestParquetMetadataConverter.java    |  18 +-
 .../org/apache/parquet/pig/PigSchemaConverter.java |   6 +-
 13 files changed, 1434 insertions(+), 160 deletions(-)

diff --git a/parquet-cascading3/src/test/java/org/apache/parquet/cascading/TestParquetTBaseScheme.java b/parquet-cascading3/src/test/java/org/apache/parquet/cascading/TestParquetTBaseScheme.java
index 7b9f817..97b2ccf 100644
--- a/parquet-cascading3/src/test/java/org/apache/parquet/cascading/TestParquetTBaseScheme.java
+++ b/parquet-cascading3/src/test/java/org/apache/parquet/cascading/TestParquetTBaseScheme.java
@@ -40,14 +40,12 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.thrift.protocol.TCompactProtocol;
 import org.apache.thrift.protocol.TProtocol;
 import org.apache.thrift.protocol.TProtocolFactory;
 import org.apache.thrift.transport.TIOStreamTransport;
 import org.junit.Test;
-import static org.junit.Assert.*;
 
 import org.apache.parquet.hadoop.thrift.ThriftToParquetFileWriter;
 import org.apache.parquet.hadoop.util.ContextUtil;
@@ -55,8 +53,9 @@ import org.apache.parquet.thrift.test.Name;
 
 import java.io.File;
 import java.io.ByteArrayOutputStream;
-import java.util.HashMap;
-import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 public class TestParquetTBaseScheme {
   final String txtInputPath = "target/test-classes/names.txt";
diff --git a/parquet-column/src/main/java/org/apache/parquet/schema/GroupType.java b/parquet-column/src/main/java/org/apache/parquet/schema/GroupType.java
index dafe7cc..5cb40e5 100644
--- a/parquet-column/src/main/java/org/apache/parquet/schema/GroupType.java
+++ b/parquet-column/src/main/java/org/apache/parquet/schema/GroupType.java
@@ -1,4 +1,4 @@
-/* 
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -44,7 +44,7 @@ public class GroupType extends Type {
    * @param fields the contained fields
    */
   public GroupType(Repetition repetition, String name, List<Type> fields) {
-    this(repetition, name, null, fields, null);
+    this(repetition, name, (LogicalTypeAnnotation) null, fields, null);
   }
 
   /**
@@ -94,6 +94,15 @@ public class GroupType extends Type {
     }
   }
 
+  GroupType(Repetition repetition, String name, LogicalTypeAnnotation logicalTypeAnnotation, List<Type> fields, ID id) {
+    super(name, repetition, logicalTypeAnnotation, id);
+    this.fields = fields;
+    this.indexByName = new HashMap<String, Integer>();
+    for (int i = 0; i < fields.size(); i++) {
+      indexByName.put(fields.get(i).getName(), i);
+    }
+  }
+
   /**
    * @param id the field id
    * @return a new GroupType with the same fields and a new id
diff --git a/parquet-column/src/main/java/org/apache/parquet/schema/LogicalTypeAnnotation.java b/parquet-column/src/main/java/org/apache/parquet/schema/LogicalTypeAnnotation.java
new file mode 100644
index 0000000..e22867a
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/schema/LogicalTypeAnnotation.java
@@ -0,0 +1,878 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.schema;
+
+import org.apache.parquet.Preconditions;
+
+import java.util.List;
+import java.util.Objects;
+
+public abstract class LogicalTypeAnnotation {
+  enum LogicalTypeToken {
+    MAP {
+      @Override
+      protected LogicalTypeAnnotation fromString(List<String> params) {
+        return mapType();
+      }
+    },
+    LIST {
+      @Override
+      protected LogicalTypeAnnotation fromString(List<String> params) {
+        return listType();
+      }
+    },
+    UTF8 {
+      @Override
+      protected LogicalTypeAnnotation fromString(List<String> params) {
+        return stringType();
+      }
+    },
+    MAP_KEY_VALUE {
+      @Override
+      protected LogicalTypeAnnotation fromString(List<String> params) {
+        return MapKeyValueTypeAnnotation.getInstance();
+      }
+    },
+    ENUM {
+      @Override
+      protected LogicalTypeAnnotation fromString(List<String> params) {
+        return enumType();
+      }
+    },
+    DECIMAL {
+      @Override
+      protected LogicalTypeAnnotation fromString(List<String> params) {
+        if (params.size() != 2) {
+          throw new RuntimeException("Expecting 2 parameters for decimal logical type, got " + params.size());
+        }
+        return decimalType(Integer.valueOf(params.get(1)), Integer.valueOf(params.get(0)));
+      }
+    },
+    DATE {
+      @Override
+      protected LogicalTypeAnnotation fromString(List<String> params) {
+        return dateType();
+      }
+    },
+    TIME {
+      @Override
+      protected LogicalTypeAnnotation fromString(List<String> params) {
+        if (params.size() != 2) {
+          throw new RuntimeException("Expecting 2 parameters for time logical type, got " + params.size());
+        }
+        return timeType(Boolean.parseBoolean(params.get(1)), TimeUnit.valueOf(params.get(0)));
+      }
+    },
+    TIMESTAMP {
+      @Override
+      protected LogicalTypeAnnotation fromString(List<String> params) {
+        if (params.size() != 2) {
+          throw new RuntimeException("Expecting 2 parameters for timestamp logical type, got " + params.size());
+        }
+        return timestampType(Boolean.parseBoolean(params.get(1)), TimeUnit.valueOf(params.get(0)));
+      }
+    },
+    INT {
+      @Override
+      protected LogicalTypeAnnotation fromString(List<String> params) {
+        if (params.size() != 2) {
+          throw new RuntimeException("Expecting 2 parameters for integer logical type, got " + params.size());
+        }
+        return intType(Integer.valueOf(params.get(0)), Boolean.parseBoolean(params.get(1)));
+      }
+    },
+    JSON {
+      @Override
+      protected LogicalTypeAnnotation fromString(List<String> params) {
+        return jsonType();
+      }
+    },
+    BSON {
+      @Override
+      protected LogicalTypeAnnotation fromString(List<String> params) {
+        return bsonType();
+      }
+    },
+    INTERVAL {
+      @Override
+      protected LogicalTypeAnnotation fromString(List<String> params) {
+        return IntervalLogicalTypeAnnotation.getInstance();
+      }
+    };
+
+    protected abstract LogicalTypeAnnotation fromString(List<String> params);
+  }
+
+  /**
+   * Convert this logical type to old logical type representation in parquet-mr (if there's any).
+   * Those logical type implementations, which don't have a corresponding mapping should return null.
+   *
+   * @return the OriginalType representation of the new logical type, or null if there's none
+   */
+  public abstract OriginalType toOriginalType();
+
+  /**
+   * Visits this logical type with the given visitor
+   *
+   * @param logicalTypeAnnotationVisitor the visitor to visit this type
+   */
+  public abstract void accept(LogicalTypeAnnotationVisitor logicalTypeAnnotationVisitor);
+
+  abstract LogicalTypeToken getType();
+
+  String typeParametersAsString() {
+    return "";
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+    sb.append(getType());
+    sb.append(typeParametersAsString());
+    return sb.toString();
+  }
+
+  /**
+   * Helper method to convert the old representation of logical types (OriginalType) to new logical type.
+   */
+  public static LogicalTypeAnnotation fromOriginalType(OriginalType originalType, DecimalMetadata decimalMetadata) {
+    if (originalType == null) {
+      return null;
+    }
+    switch (originalType) {
+      case UTF8:
+        return stringType();
+      case MAP:
+        return mapType();
+      case DECIMAL:
+        int scale = (decimalMetadata == null ? 0 : decimalMetadata.getScale());
+        int precision = (decimalMetadata == null ? 0 : decimalMetadata.getPrecision());
+        return decimalType(scale, precision);
+      case LIST:
+        return listType();
+      case DATE:
+        return dateType();
+      case INTERVAL:
+        return IntervalLogicalTypeAnnotation.getInstance();
+      case TIMESTAMP_MILLIS:
+        return timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS);
+      case TIMESTAMP_MICROS:
+        return timestampType(true, LogicalTypeAnnotation.TimeUnit.MICROS);
+      case TIME_MILLIS:
+        return timeType(true, LogicalTypeAnnotation.TimeUnit.MILLIS);
+      case TIME_MICROS:
+        return timeType(true, LogicalTypeAnnotation.TimeUnit.MICROS);
+      case UINT_8:
+        return intType(8, false);
+      case UINT_16:
+        return intType(16, false);
+      case UINT_32:
+        return intType(32, false);
+      case UINT_64:
+        return intType(64, false);
+      case INT_8:
+        return intType(8, true);
+      case INT_16:
+        return intType(16, true);
+      case INT_32:
+        return intType(32, true);
+      case INT_64:
+        return intType(64, true);
+      case ENUM:
+        return enumType();
+      case JSON:
+        return jsonType();
+      case BSON:
+        return bsonType();
+      case MAP_KEY_VALUE:
+        return MapKeyValueTypeAnnotation.getInstance();
+      default:
+        throw new RuntimeException("Can't convert original type to logical type, unknown original type " + originalType);
+    }
+  }
+
+  public static StringLogicalTypeAnnotation stringType() {
+    return StringLogicalTypeAnnotation.INSTANCE;
+  }
+
+  public static MapLogicalTypeAnnotation mapType() {
+    return MapLogicalTypeAnnotation.INSTANCE;
+  }
+
+  public static ListLogicalTypeAnnotation listType() {
+    return ListLogicalTypeAnnotation.INSTANCE;
+  }
+
+  public static EnumLogicalTypeAnnotation enumType() {
+    return EnumLogicalTypeAnnotation.INSTANCE;
+  }
+
+  public static DecimalLogicalTypeAnnotation decimalType(final int scale, final int precision) {
+    return new DecimalLogicalTypeAnnotation(scale, precision);
+  }
+
+  public static DateLogicalTypeAnnotation dateType() {
+    return DateLogicalTypeAnnotation.INSTANCE;
+  }
+
+  public static TimeLogicalTypeAnnotation timeType(final boolean isAdjustedToUTC, final TimeUnit unit) {
+    return new TimeLogicalTypeAnnotation(isAdjustedToUTC, unit);
+  }
+
+  public static TimestampLogicalTypeAnnotation timestampType(final boolean isAdjustedToUTC, final TimeUnit unit) {
+    return new TimestampLogicalTypeAnnotation(isAdjustedToUTC, unit);
+  }
+
+  public static IntLogicalTypeAnnotation intType(final int bitWidth, final boolean isSigned) {
+    Preconditions.checkArgument(
+      bitWidth == 8 || bitWidth == 16 || bitWidth == 32 || bitWidth == 64,
+      "Invalid bit width for integer logical type, " + bitWidth + " is not allowed, " +
+        "valid bit width values: 8, 16, 32, 64");
+    return new IntLogicalTypeAnnotation(bitWidth, isSigned);
+  }
+
+  public static JsonLogicalTypeAnnotation jsonType() {
+    return JsonLogicalTypeAnnotation.INSTANCE;
+  }
+
+  public static BsonLogicalTypeAnnotation bsonType() {
+    return BsonLogicalTypeAnnotation.INSTANCE;
+  }
+
+  public static class StringLogicalTypeAnnotation extends LogicalTypeAnnotation {
+    private static final StringLogicalTypeAnnotation INSTANCE = new StringLogicalTypeAnnotation();
+
+    private StringLogicalTypeAnnotation() {
+    }
+
+    @Override
+    public OriginalType toOriginalType() {
+      return OriginalType.UTF8;
+    }
+
+    @Override
+    public void accept(LogicalTypeAnnotationVisitor logicalTypeAnnotationVisitor) {
+      logicalTypeAnnotationVisitor.visit(this);
+    }
+
+    @Override
+    LogicalTypeToken getType() {
+      return LogicalTypeToken.UTF8;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      return obj instanceof StringLogicalTypeAnnotation;
+    }
+
+    @Override
+    public int hashCode() {
+      // This type doesn't have any parameters, thus using class hashcode
+      return getClass().hashCode();
+    }
+  }
+
+  public static class MapLogicalTypeAnnotation extends LogicalTypeAnnotation {
+    private static final MapLogicalTypeAnnotation INSTANCE = new MapLogicalTypeAnnotation();
+
+    private MapLogicalTypeAnnotation() {
+    }
+
+    @Override
+    public OriginalType toOriginalType() {
+      return OriginalType.MAP;
+    }
+
+    @Override
+    public void accept(LogicalTypeAnnotationVisitor logicalTypeAnnotationVisitor) {
+      logicalTypeAnnotationVisitor.visit(this);
+    }
+
+    @Override
+    LogicalTypeToken getType() {
+      return LogicalTypeToken.MAP;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      return obj instanceof MapLogicalTypeAnnotation;
+    }
+
+    @Override
+    public int hashCode() {
+      // This type doesn't have any parameters, thus using class hashcode
+      return getClass().hashCode();
+    }
+  }
+
+  public static class ListLogicalTypeAnnotation extends LogicalTypeAnnotation {
+    private static final ListLogicalTypeAnnotation INSTANCE = new ListLogicalTypeAnnotation();
+
+    private ListLogicalTypeAnnotation() {
+    }
+
+    @Override
+    public OriginalType toOriginalType() {
+      return OriginalType.LIST;
+    }
+
+    @Override
+    public void accept(LogicalTypeAnnotationVisitor logicalTypeAnnotationVisitor) {
+      logicalTypeAnnotationVisitor.visit(this);
+    }
+
+    @Override
+    LogicalTypeToken getType() {
+      return LogicalTypeToken.LIST;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      return obj instanceof ListLogicalTypeAnnotation;
+    }
+
+    @Override
+    public int hashCode() {
+      // This type doesn't have any parameters, thus using class hashcode
+      return getClass().hashCode();
+    }
+  }
+
+  public static class EnumLogicalTypeAnnotation extends LogicalTypeAnnotation {
+    private static final EnumLogicalTypeAnnotation INSTANCE = new EnumLogicalTypeAnnotation();
+
+    private EnumLogicalTypeAnnotation() {
+    }
+
+    @Override
+    public OriginalType toOriginalType() {
+      return OriginalType.ENUM;
+    }
+
+    @Override
+    public void accept(LogicalTypeAnnotationVisitor logicalTypeAnnotationVisitor) {
+      logicalTypeAnnotationVisitor.visit(this);
+    }
+
+    @Override
+    LogicalTypeToken getType() {
+      return LogicalTypeToken.ENUM;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      return obj instanceof EnumLogicalTypeAnnotation;
+    }
+
+    @Override
+    public int hashCode() {
+      // This type doesn't have any parameters, thus using class hashcode
+      return getClass().hashCode();
+    }
+  }
+
+  public static class DecimalLogicalTypeAnnotation extends LogicalTypeAnnotation {
+    private final int scale;
+    private final int precision;
+
+    private DecimalLogicalTypeAnnotation(int scale, int precision) {
+      this.scale = scale;
+      this.precision = precision;
+    }
+
+    public int getPrecision() {
+      return precision;
+    }
+
+    public int getScale() {
+      return scale;
+    }
+
+    @Override
+    public OriginalType toOriginalType() {
+      return OriginalType.DECIMAL;
+    }
+
+    @Override
+    public void accept(LogicalTypeAnnotationVisitor logicalTypeAnnotationVisitor) {
+      logicalTypeAnnotationVisitor.visit(this);
+    }
+
+    @Override
+    LogicalTypeToken getType() {
+      return LogicalTypeToken.DECIMAL;
+    }
+
+    @Override
+    protected String typeParametersAsString() {
+      StringBuilder sb = new StringBuilder();
+      sb.append("(");
+      sb.append(precision);
+      sb.append(",");
+      sb.append(scale);
+      sb.append(")");
+      return sb.toString();
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (!(obj instanceof DecimalLogicalTypeAnnotation)) {
+        return false;
+      }
+      DecimalLogicalTypeAnnotation other = (DecimalLogicalTypeAnnotation) obj;
+      return scale == other.scale && precision == other.precision;
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(scale, precision);
+    }
+  }
+
+  public static class DateLogicalTypeAnnotation extends LogicalTypeAnnotation {
+    private static final DateLogicalTypeAnnotation INSTANCE = new DateLogicalTypeAnnotation();
+
+    private DateLogicalTypeAnnotation() {
+    }
+
+    @Override
+    public OriginalType toOriginalType() {
+      return OriginalType.DATE;
+    }
+
+    @Override
+    public void accept(LogicalTypeAnnotationVisitor logicalTypeAnnotationVisitor) {
+      logicalTypeAnnotationVisitor.visit(this);
+    }
+
+    @Override
+    LogicalTypeToken getType() {
+      return LogicalTypeToken.DATE;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      return obj instanceof DateLogicalTypeAnnotation;
+    }
+
+    @Override
+    public int hashCode() {
+      // This type doesn't have any parameters, thus using class hashcode
+      return getClass().hashCode();
+    }
+  }
+
+  public enum TimeUnit {
+    MILLIS,
+    MICROS
+  }
+
+  public static class TimeLogicalTypeAnnotation extends LogicalTypeAnnotation {
+    private final boolean isAdjustedToUTC;
+    private final TimeUnit unit;
+
+    private TimeLogicalTypeAnnotation(boolean isAdjustedToUTC, TimeUnit unit) {
+      this.isAdjustedToUTC = isAdjustedToUTC;
+      this.unit = unit;
+    }
+
+    @Override
+    public OriginalType toOriginalType() {
+      switch (unit) {
+        case MILLIS:
+          return OriginalType.TIME_MILLIS;
+        case MICROS:
+          return OriginalType.TIME_MICROS;
+        default:
+          throw new RuntimeException("Unknown original type for " + unit);
+      }
+    }
+
+    @Override
+    public void accept(LogicalTypeAnnotationVisitor logicalTypeAnnotationVisitor) {
+      logicalTypeAnnotationVisitor.visit(this);
+    }
+
+    @Override
+    LogicalTypeToken getType() {
+      return LogicalTypeToken.TIME;
+    }
+
+    @Override
+    protected String typeParametersAsString() {
+      StringBuilder sb = new StringBuilder();
+      sb.append("(");
+      sb.append(unit);
+      sb.append(",");
+      sb.append(isAdjustedToUTC);
+      sb.append(")");
+      return sb.toString();
+    }
+
+    public TimeUnit getUnit() {
+      return unit;
+    }
+
+    public boolean isAdjustedToUTC() {
+      return isAdjustedToUTC;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (!(obj instanceof TimeLogicalTypeAnnotation)) {
+        return false;
+      }
+      TimeLogicalTypeAnnotation other = (TimeLogicalTypeAnnotation) obj;
+      return isAdjustedToUTC == other.isAdjustedToUTC && unit == other.unit;
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(isAdjustedToUTC, unit);
+    }
+  }
+
+  public static class TimestampLogicalTypeAnnotation extends LogicalTypeAnnotation {
+    private final boolean isAdjustedToUTC;
+    private final TimeUnit unit;
+
+    private TimestampLogicalTypeAnnotation(boolean isAdjustedToUTC, TimeUnit unit) {
+      this.isAdjustedToUTC = isAdjustedToUTC;
+      this.unit = unit;
+    }
+
+    @Override
+    public OriginalType toOriginalType() {
+      switch (unit) {
+        case MILLIS:
+          return OriginalType.TIMESTAMP_MILLIS;
+        case MICROS:
+          return OriginalType.TIMESTAMP_MICROS;
+        default:
+          throw new RuntimeException("Unknown original type for " + unit);
+      }
+    }
+
+    @Override
+    public void accept(LogicalTypeAnnotationVisitor logicalTypeAnnotationVisitor) {
+      logicalTypeAnnotationVisitor.visit(this);
+    }
+
+    @Override
+    LogicalTypeToken getType() {
+      return LogicalTypeToken.TIMESTAMP;
+    }
+
+    @Override
+    protected String typeParametersAsString() {
+      StringBuilder sb = new StringBuilder();
+      sb.append("(");
+      sb.append(unit);
+      sb.append(",");
+      sb.append(isAdjustedToUTC);
+      sb.append(")");
+      return sb.toString();
+    }
+
+    public TimeUnit getUnit() {
+      return unit;
+    }
+
+    public boolean isAdjustedToUTC() {
+      return isAdjustedToUTC;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (!(obj instanceof TimestampLogicalTypeAnnotation)) {
+        return false;
+      }
+      TimestampLogicalTypeAnnotation other = (TimestampLogicalTypeAnnotation) obj;
+      return (isAdjustedToUTC == other.isAdjustedToUTC) && (unit == other.unit);
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(isAdjustedToUTC, unit);
+    }
+  }
+
+  public static class IntLogicalTypeAnnotation extends LogicalTypeAnnotation {
+    private final int bitWidth;
+    private final boolean isSigned;
+
+
+    private IntLogicalTypeAnnotation(int bitWidth, boolean isSigned) {
+      this.bitWidth = bitWidth;
+      this.isSigned = isSigned;
+    }
+
+    @Override
+    public OriginalType toOriginalType() {
+      switch (bitWidth) {
+        case 8:
+          return isSigned ? OriginalType.INT_8 : OriginalType.UINT_8;
+        case 16:
+          return isSigned ? OriginalType.INT_16 : OriginalType.UINT_16;
+        case 32:
+          return isSigned ? OriginalType.INT_32 : OriginalType.UINT_32;
+        case 64:
+          return isSigned ? OriginalType.INT_64 : OriginalType.UINT_64;
+        default:
+          throw new RuntimeException("Unknown original type " + toOriginalType());
+      }
+    }
+
+    @Override
+    public void accept(LogicalTypeAnnotationVisitor logicalTypeAnnotationVisitor) {
+      logicalTypeAnnotationVisitor.visit(this);
+    }
+
+    @Override
+    LogicalTypeToken getType() {
+      return LogicalTypeToken.INT;
+    }
+
+    @Override
+    protected String typeParametersAsString() {
+      StringBuilder sb = new StringBuilder();
+      sb.append("(");
+      sb.append(bitWidth);
+      sb.append(",");
+      sb.append(isSigned);
+      sb.append(")");
+      return sb.toString();
+    }
+
+    public int getBitWidth() {
+      return bitWidth;
+    }
+
+    public boolean isSigned() {
+      return isSigned;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (!(obj instanceof IntLogicalTypeAnnotation)) {
+        return false;
+      }
+      IntLogicalTypeAnnotation other = (IntLogicalTypeAnnotation) obj;
+      return (bitWidth == other.bitWidth) && (isSigned == other.isSigned);
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(bitWidth, isSigned);
+    }
+  }
+
+  public static class JsonLogicalTypeAnnotation extends LogicalTypeAnnotation {
+    private static final JsonLogicalTypeAnnotation INSTANCE = new JsonLogicalTypeAnnotation();
+
+    private JsonLogicalTypeAnnotation() {
+    }
+
+    @Override
+    public OriginalType toOriginalType() {
+      return OriginalType.JSON;
+    }
+
+    @Override
+    public void accept(LogicalTypeAnnotationVisitor logicalTypeAnnotationVisitor) {
+      logicalTypeAnnotationVisitor.visit(this);
+    }
+
+    @Override
+    LogicalTypeToken getType() {
+      return LogicalTypeToken.JSON;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      return obj instanceof JsonLogicalTypeAnnotation;
+    }
+
+    @Override
+    public int hashCode() {
+      // This type doesn't have any parameters, thus using class hashcode
+      return getClass().hashCode();
+    }
+  }
+
+  public static class BsonLogicalTypeAnnotation extends LogicalTypeAnnotation {
+    private static final BsonLogicalTypeAnnotation INSTANCE = new BsonLogicalTypeAnnotation();
+
+    private BsonLogicalTypeAnnotation() {
+    }
+
+    @Override
+    public OriginalType toOriginalType() {
+      return OriginalType.BSON;
+    }
+
+    @Override
+    public void accept(LogicalTypeAnnotationVisitor logicalTypeAnnotationVisitor) {
+      logicalTypeAnnotationVisitor.visit(this);
+    }
+
+    @Override
+    LogicalTypeToken getType() {
+      return LogicalTypeToken.BSON;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      return obj instanceof BsonLogicalTypeAnnotation;
+    }
+
+    @Override
+    public int hashCode() {
+      // This type doesn't have any parameters, thus using class hashcode
+      return getClass().hashCode();
+    }
+  }
+
+  // This logical type annotation is implemented to support backward compatibility with ConvertedType.
+  // The new logical type representation in parquet-format doesn't have any interval type,
+  // thus this annotation is mapped to UNKNOWN.
+  public static class IntervalLogicalTypeAnnotation extends LogicalTypeAnnotation {
+    private static IntervalLogicalTypeAnnotation INSTANCE = new IntervalLogicalTypeAnnotation();
+
+    public static LogicalTypeAnnotation getInstance() {
+      return INSTANCE;
+    }
+
+    private IntervalLogicalTypeAnnotation() {
+    }
+
+    @Override
+    public OriginalType toOriginalType() {
+      return OriginalType.INTERVAL;
+    }
+
+    @Override
+    public void accept(LogicalTypeAnnotationVisitor logicalTypeAnnotationVisitor) {
+      logicalTypeAnnotationVisitor.visit(this);
+    }
+
+    @Override
+    LogicalTypeToken getType() {
+      return LogicalTypeToken.INTERVAL;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      return obj instanceof IntervalLogicalTypeAnnotation;
+    }
+
+    @Override
+    public int hashCode() {
+      // This type doesn't have any parameters, thus using class hashcode
+      return getClass().hashCode();
+    }
+  }
+
+  // This logical type annotation is implemented to support backward compatibility with ConvertedType.
+  // The new logical type representation in parquet-format doesn't have any key-value type,
+  // thus this annotation is mapped to UNKNOWN. This type shouldn't be used.
+  public static class MapKeyValueTypeAnnotation extends LogicalTypeAnnotation {
+    private static MapKeyValueTypeAnnotation INSTANCE = new MapKeyValueTypeAnnotation();
+
+    public static MapKeyValueTypeAnnotation getInstance() {
+      return INSTANCE;
+    }
+
+    private MapKeyValueTypeAnnotation() {
+    }
+
+    @Override
+    public OriginalType toOriginalType() {
+      return OriginalType.MAP_KEY_VALUE;
+    }
+
+    @Override
+    public void accept(LogicalTypeAnnotationVisitor logicalTypeAnnotationVisitor) {
+      logicalTypeAnnotationVisitor.visit(this);
+    }
+
+    @Override
+    LogicalTypeToken getType() {
+      return LogicalTypeToken.MAP_KEY_VALUE;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      return obj instanceof MapKeyValueTypeAnnotation;
+    }
+
+    @Override
+    public int hashCode() {
+      // This type doesn't have any parameters, thus using class hashcode
+      return getClass().hashCode();
+    }
+  }
+
+  /**
+   * Implement this interface to visit a logical type annotation in the schema.
+   * The default implementation for each logical type specific visitor method is empty.
+   * <p>
+   * Example usage: logicalTypeAnnotation.accept(new LogicalTypeAnnotationVisitor() { ... });
+   */
+  public interface LogicalTypeAnnotationVisitor {
+    default void visit(StringLogicalTypeAnnotation logicalTypeAnnotation) {
+    }
+
+    default void visit(MapLogicalTypeAnnotation logicalTypeAnnotation) {
+    }
+
+    default void visit(ListLogicalTypeAnnotation logicalTypeAnnotation) {
+    }
+
+    default void visit(EnumLogicalTypeAnnotation logicalTypeAnnotation) {
+    }
+
+    default void visit(DecimalLogicalTypeAnnotation logicalTypeAnnotation) {
+    }
+
+    default void visit(DateLogicalTypeAnnotation logicalTypeAnnotation) {
+    }
+
+    default void visit(TimeLogicalTypeAnnotation logicalTypeAnnotation) {
+    }
+
+    default void visit(TimestampLogicalTypeAnnotation logicalTypeAnnotation) {
+    }
+
+    default void visit(IntLogicalTypeAnnotation logicalTypeAnnotation) {
+    }
+
+    default void visit(JsonLogicalTypeAnnotation logicalTypeAnnotation) {
+    }
+
+    default void visit(BsonLogicalTypeAnnotation logicalTypeAnnotation) {
+    }
+
+    default void visit(IntervalLogicalTypeAnnotation logicalTypeAnnotation) {
+    }
+
+    default void visit(MapKeyValueTypeAnnotation logicalTypeAnnotation) {
+    }
+  }
+}
diff --git a/parquet-column/src/main/java/org/apache/parquet/schema/MessageTypeParser.java b/parquet-column/src/main/java/org/apache/parquet/schema/MessageTypeParser.java
index 4e1d0fd..204b756 100644
--- a/parquet-column/src/main/java/org/apache/parquet/schema/MessageTypeParser.java
+++ b/parquet-column/src/main/java/org/apache/parquet/schema/MessageTypeParser.java
@@ -1,4 +1,4 @@
-/* 
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -18,7 +18,9 @@
  */
 package org.apache.parquet.schema;
 
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.List;
 import java.util.Locale;
 import java.util.StringTokenizer;
 
@@ -159,25 +161,44 @@ public class MessageTypeParser {
     t = st.nextToken();
     OriginalType originalType = null;
     if (t.equalsIgnoreCase("(")) {
-      originalType = OriginalType.valueOf(st.nextToken());
-      childBuilder.as(originalType);
-      if (OriginalType.DECIMAL == originalType) {
+      t = st.nextToken();
+      if (isLogicalType(t)) {
+        LogicalTypeAnnotation.LogicalTypeToken logicalType = LogicalTypeAnnotation.LogicalTypeToken.valueOf(t);
         t = st.nextToken();
-        // parse precision and scale
-        if (t.equalsIgnoreCase("(")) {
-          childBuilder.precision(Integer.parseInt(st.nextToken()));
-          t = st.nextToken();
-          if (t.equalsIgnoreCase(",")) {
-            childBuilder.scale(Integer.parseInt(st.nextToken()));
+        List<String> tokens = new ArrayList<>();
+        if ("(".equals(t)) {
+          while (!")".equals(t)) {
+            if (!(",".equals(t) || "(".equals(t) || ")".equals(t))) {
+              tokens.add(t);
+            }
             t = st.nextToken();
           }
-          check(t, ")", "decimal type ended by )", st);
           t = st.nextToken();
         }
+        LogicalTypeAnnotation logicalTypeAnnotation = logicalType.fromString(tokens);
+        childBuilder.as(logicalTypeAnnotation);
       } else {
-        t = st.nextToken();
+        // Try to parse as old logical type, called OriginalType
+        originalType = OriginalType.valueOf(t);
+        childBuilder.as(originalType);
+        if (OriginalType.DECIMAL == originalType) {
+          t = st.nextToken();
+          // parse precision and scale
+          if (t.equalsIgnoreCase("(")) {
+            childBuilder.precision(Integer.parseInt(st.nextToken()));
+            t = st.nextToken();
+            if (t.equalsIgnoreCase(",")) {
+              childBuilder.scale(Integer.parseInt(st.nextToken()));
+              t = st.nextToken();
+            }
+            check(t, ")", "decimal type ended by )", st);
+            t = st.nextToken();
+          }
+        } else {
+          t = st.nextToken();
+        }
       }
-      check(t, ")", "original type ended by )", st);
+      check(t, ")", "logical type ended by )", st);
       t = st.nextToken();
     }
     if (t.equals("=")) {
@@ -193,6 +214,10 @@ public class MessageTypeParser {
     }
   }
 
+  private static boolean isLogicalType(String t) {
+    return Arrays.stream(LogicalTypeAnnotation.LogicalTypeToken.values()).anyMatch((type) -> type.name().equals(t));
+  }
+
   private static PrimitiveTypeName asPrimitive(String t, Tokenizer st) {
     try {
       return PrimitiveTypeName.valueOf(t.toUpperCase(Locale.ENGLISH));
diff --git a/parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveType.java b/parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveType.java
index 2a5e250..08adfbe 100644
--- a/parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveType.java
+++ b/parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveType.java
@@ -383,9 +383,8 @@ public final class PrimitiveType extends Type {
    * @param primitive STRING, INT64, ...
    * @param name the name of the type
    */
-  public PrimitiveType(Repetition repetition, PrimitiveTypeName primitive,
-                       String name) {
-    this(repetition, primitive, 0, name, null, null, null);
+  public PrimitiveType(Repetition repetition, PrimitiveTypeName primitive, String name) {
+    this(repetition, primitive, 0, name, (LogicalTypeAnnotation) null, null, null);
   }
 
   /**
@@ -395,7 +394,7 @@ public final class PrimitiveType extends Type {
    * @param name the name of the type
    */
   public PrimitiveType(Repetition repetition, PrimitiveTypeName primitive, int length, String name) {
-    this(repetition, primitive, length, name, null, null, null);
+    this(repetition, primitive, length, name, (LogicalTypeAnnotation) null, null, null);
   }
 
   /**
@@ -403,7 +402,10 @@ public final class PrimitiveType extends Type {
    * @param primitive STRING, INT64, ...
    * @param name the name of the type
    * @param originalType (optional) the original type to help with cross schema convertion (LIST, MAP, ...)
+   *
+   * @deprecated will be removed in 2.0.0; use builders in {@link Types} instead
    */
+  @Deprecated
   public PrimitiveType(Repetition repetition, PrimitiveTypeName primitive,
                        String name, OriginalType originalType) {
     this(repetition, primitive, 0, name, originalType, null, null);
@@ -430,7 +432,10 @@ public final class PrimitiveType extends Type {
    * @param originalType (optional) the original type (MAP, DECIMAL, UTF8, ...)
    * @param decimalMeta (optional) metadata about the decimal type
    * @param id the id of the field
+   *
+   * @deprecated will be removed in 2.0.0; use builders in {@link Types} instead
    */
+  @Deprecated
   public PrimitiveType(Repetition repetition, PrimitiveTypeName primitive,
                        int length, String name, OriginalType originalType,
                        DecimalMetadata decimalMeta, ID id) {
@@ -440,7 +445,7 @@ public final class PrimitiveType extends Type {
   PrimitiveType(Repetition repetition, PrimitiveTypeName primitive,
       int length, String name, OriginalType originalType,
       DecimalMetadata decimalMeta, ID id, ColumnOrder columnOrder) {
-    super(name, repetition, originalType, id);
+    super(name, repetition, originalType, decimalMeta, id);
     this.primitive = primitive;
     this.length = length;
     this.decimalMeta = decimalMeta;
@@ -453,6 +458,37 @@ public final class PrimitiveType extends Type {
     this.columnOrder = requireValidColumnOrder(columnOrder);
   }
 
+  PrimitiveType(Repetition repetition, PrimitiveTypeName primitive,
+                       String name, LogicalTypeAnnotation logicalTypeAnnotation) {
+    this(repetition, primitive, 0, name, logicalTypeAnnotation, null, null);
+  }
+
+  PrimitiveType(Repetition repetition, PrimitiveTypeName primitive,
+                       int length, String name, LogicalTypeAnnotation logicalTypeAnnotation, ID id) {
+    this(repetition, primitive, length, name, logicalTypeAnnotation, id, null);
+  }
+
+  PrimitiveType(Repetition repetition, PrimitiveTypeName primitive,
+                int length, String name, LogicalTypeAnnotation logicalTypeAnnotation,
+                ID id, ColumnOrder columnOrder) {
+    super(name, repetition, logicalTypeAnnotation, id);
+    this.primitive = primitive;
+    this.length = length;
+    if (getOriginalType() == OriginalType.DECIMAL) {
+      LogicalTypeAnnotation.DecimalLogicalTypeAnnotation decimal = (LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) logicalTypeAnnotation;
+      this.decimalMeta = new DecimalMetadata(decimal.getPrecision(), decimal.getScale());
+    } else {
+      this.decimalMeta = null;
+    }
+
+    if (columnOrder == null) {
+      columnOrder = primitive == PrimitiveTypeName.INT96 || getOriginalType() == OriginalType.INTERVAL
+        ? ColumnOrder.undefined()
+        : ColumnOrder.typeDefined();
+    }
+    this.columnOrder = requireValidColumnOrder(columnOrder);
+  }
+
   private ColumnOrder requireValidColumnOrder(ColumnOrder columnOrder) {
     if (primitive == PrimitiveTypeName.INT96) {
       Preconditions.checkArgument(columnOrder.getColumnOrderName() == ColumnOrderName.UNDEFINED,
@@ -551,17 +587,9 @@ public final class PrimitiveType extends Type {
       sb.append("(" + length + ")");
     }
     sb.append(" ").append(getName());
-    if (getOriginalType() != null) {
-      sb.append(" (").append(getOriginalType());
-      DecimalMetadata meta = getDecimalMetadata();
-      if (meta != null) {
-        sb.append("(")
-            .append(meta.getPrecision())
-            .append(",")
-            .append(meta.getScale())
-            .append(")");
-      }
-      sb.append(")");
+    if (getLogicalTypeAnnotation() != null) {
+      // TODO: should we print decimal metadata too?
+      sb.append(" (").append(getLogicalTypeAnnotation().toString()).append(")");
     }
     if (getId() != null) {
       sb.append(" = ").append(getId());
diff --git a/parquet-column/src/main/java/org/apache/parquet/schema/Type.java b/parquet-column/src/main/java/org/apache/parquet/schema/Type.java
index bca8121..d046957 100644
--- a/parquet-column/src/main/java/org/apache/parquet/schema/Type.java
+++ b/parquet-column/src/main/java/org/apache/parquet/schema/Type.java
@@ -1,4 +1,4 @@
-/* 
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -115,7 +115,7 @@ abstract public class Type {
 
   private final String name;
   private final Repetition repetition;
-  private final OriginalType originalType;
+  private final LogicalTypeAnnotation logicalTypeAnnotation;
   private final ID id;
 
   /**
@@ -124,7 +124,7 @@ abstract public class Type {
    */
   @Deprecated
   public Type(String name, Repetition repetition) {
-    this(name, repetition, null, null);
+    this(name, repetition, (LogicalTypeAnnotation) null, null);
   }
 
   /**
@@ -144,10 +144,26 @@ abstract public class Type {
    * @param id (optional) the id of the fields.
    */
   Type(String name, Repetition repetition, OriginalType originalType, ID id) {
+    this(name, repetition, originalType, null, id);
+  }
+
+  Type(String name, Repetition repetition, OriginalType originalType, DecimalMetadata decimalMetadata, ID id) {
+    super();
+    this.name = checkNotNull(name, "name");
+    this.repetition = checkNotNull(repetition, "repetition");
+    this.logicalTypeAnnotation = originalType == null ? null : LogicalTypeAnnotation.fromOriginalType(originalType, decimalMetadata);
+    this.id = id;
+  }
+
+  Type(String name, Repetition repetition, LogicalTypeAnnotation logicalTypeAnnotation) {
+    this(name, repetition, logicalTypeAnnotation, null);
+  }
+
+  Type(String name, Repetition repetition, LogicalTypeAnnotation logicalTypeAnnotation, ID id) {
     super();
     this.name = checkNotNull(name, "name");
     this.repetition = checkNotNull(repetition, "repetition");
-    this.originalType = originalType;
+    this.logicalTypeAnnotation = logicalTypeAnnotation;
     this.id = id;
   }
 
@@ -186,11 +202,15 @@ abstract public class Type {
     return id;
   }
 
+  public LogicalTypeAnnotation getLogicalTypeAnnotation() {
+    return logicalTypeAnnotation;
+  }
+
   /**
    * @return the original type (LIST, MAP, ...)
    */
   public OriginalType getOriginalType() {
-    return originalType;
+    return logicalTypeAnnotation == null ? null : logicalTypeAnnotation.toOriginalType();
   }
 
   /**
@@ -243,8 +263,8 @@ abstract public class Type {
   public int hashCode() {
     int c = repetition.hashCode();
     c = 31 * c + name.hashCode();
-    if (originalType != null) {
-      c = 31 * c +  originalType.hashCode();
+    if (logicalTypeAnnotation != null) {
+      c = 31 * c +  logicalTypeAnnotation.hashCode();
     }
     if (id != null) {
       c = 31 * c + id.hashCode();
@@ -258,7 +278,7 @@ abstract public class Type {
         && repetition == other.repetition
         && eqOrBothNull(repetition, other.repetition)
         && eqOrBothNull(id, other.id)
-        && eqOrBothNull(originalType, other.originalType);
+        && eqOrBothNull(logicalTypeAnnotation, other.logicalTypeAnnotation);
   };
 
   @Override
diff --git a/parquet-column/src/main/java/org/apache/parquet/schema/Types.java b/parquet-column/src/main/java/org/apache/parquet/schema/Types.java
index 0a9b91f..54fb1d8 100644
--- a/parquet-column/src/main/java/org/apache/parquet/schema/Types.java
+++ b/parquet-column/src/main/java/org/apache/parquet/schema/Types.java
@@ -199,7 +199,7 @@ public class Types {
     protected final Class<? extends P> returnClass;
 
     protected Type.Repetition repetition = null;
-    protected OriginalType originalType = null;
+    protected LogicalTypeAnnotation logicalTypeAnnotation = null;
     protected Type.ID id = null;
     private boolean repetitionAlreadySet = false;
 
@@ -251,9 +251,32 @@ public class Types {
      *
      * @param type an {@code OriginalType}
      * @return this builder for method chaining
+     *
+     * @deprecated use {@link #as(LogicalTypeAnnotation)} with the corresponding logical type instead
      */
+    @Deprecated
     public THIS as(OriginalType type) {
-      this.originalType = type;
+      this.logicalTypeAnnotation = LogicalTypeAnnotation.fromOriginalType(type, null);
+      return self();
+    }
+
+    protected boolean newLogicalTypeSet;
+
+    /**
+     * Adds a type annotation ({@link LogicalTypeAnnotation}) to the type being built.
+     * <p>
+     * Type annotations are used to extend the types that parquet can store, by
+     * specifying how the primitive types should be interpreted. This keeps the
+     * set of primitive types to a minimum and reuses parquet's efficient
+     * encodings. For example, strings are stored as byte arrays (binary) with
+     * a UTF8 annotation.
+     *
+     * @param type an {@code {@link LogicalTypeAnnotation}}
+     * @return this builder for method chaining
+     */
+    public THIS as(LogicalTypeAnnotation type) {
+      this.logicalTypeAnnotation = type;
+      this.newLogicalTypeSet = true;
       return self();
     }
 
@@ -304,6 +327,9 @@ public class Types {
       }
     }
 
+    protected OriginalType getOriginalType () {
+      return logicalTypeAnnotation == null ? null : logicalTypeAnnotation.toOriginalType();
+    }
   }
 
   public abstract static class
@@ -344,6 +370,9 @@ public class Types {
       return self();
     }
 
+    private boolean precisionAlreadySet;
+    private boolean scaleAlreadySet;
+
     /**
      * Adds the precision for a DECIMAL.
      * <p>
@@ -353,9 +382,13 @@ public class Types {
      *
      * @param precision an int precision value for the DECIMAL
      * @return this builder for method chaining
+     *
+     * @deprecated use {@link #as(LogicalTypeAnnotation)} with the corresponding decimal type instead
      */
+    @Deprecated
     public THIS precision(int precision) {
       this.precision = precision;
+      precisionAlreadySet = true;
       return self();
     }
 
@@ -371,9 +404,13 @@ public class Types {
      *
      * @param scale an int scale value for the DECIMAL
      * @return this builder for method chaining
+     *
+     * @deprecated use {@link #as(LogicalTypeAnnotation)} with the corresponding decimal type instead
      */
+    @Deprecated
     public THIS scale(int scale) {
       this.scale = scale;
+      scaleAlreadySet = true;
       return self();
     }
 
@@ -403,7 +440,8 @@ public class Types {
       DecimalMetadata meta = decimalMetadata();
 
       // validate type annotations and required metadata
-      if (originalType != null) {
+      if (logicalTypeAnnotation != null) {
+        OriginalType originalType = logicalTypeAnnotation.toOriginalType();
         switch (originalType) {
           case UTF8:
           case JSON:
@@ -476,7 +514,7 @@ public class Types {
         }
       }
 
-      return new PrimitiveType(repetition, primitiveType, length, name, originalType, meta, id, columnOrder);
+      return new PrimitiveType(repetition, primitiveType, length, name, getOriginalType(), meta, id, columnOrder);
     }
 
     private static long maxPrecision(int numBytes) {
@@ -489,12 +527,25 @@ public class Types {
 
     protected DecimalMetadata decimalMetadata() {
       DecimalMetadata meta = null;
-      if (OriginalType.DECIMAL == originalType) {
+      if (OriginalType.DECIMAL == getOriginalType()) {
+        LogicalTypeAnnotation.DecimalLogicalTypeAnnotation decimalType = (LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) logicalTypeAnnotation;
+        if (newLogicalTypeSet) {
+          if (scaleAlreadySet) {
+            Preconditions.checkArgument(this.scale == decimalType.getScale(),
+              "Decimal scale should match with the scale of the logical type");
+          }
+          if (precisionAlreadySet) {
+            Preconditions.checkArgument(this.precision == decimalType.getPrecision(),
+              "Decimal precision should match with the precision of the logical type");
+          }
+          scale = decimalType.getScale();
+          precision = decimalType.getPrecision();
+        }
         Preconditions.checkArgument(precision > 0,
             "Invalid DECIMAL precision: " + precision);
-        Preconditions.checkArgument(scale >= 0,
-            "Invalid DECIMAL scale: " + scale);
-        Preconditions.checkArgument(scale <= precision,
+        Preconditions.checkArgument(this.scale >= 0,
+            "Invalid DECIMAL scale: " + this.scale);
+        Preconditions.checkArgument(this.scale <= precision,
             "Invalid DECIMAL scale: cannot be greater than precision");
         meta = new DecimalMetadata(precision, scale);
       }
@@ -651,7 +702,7 @@ public class Types {
 
     @Override
     protected GroupType build(String name) {
-      return new GroupType(repetition, name, originalType, fields, id);
+      return new GroupType(repetition, name, getOriginalType(), fields, id);
     }
 
     public MapBuilder<THIS> map(
@@ -1046,7 +1097,7 @@ public class Types {
 
     @Override
     protected Type build(String name) {
-      Preconditions.checkState(originalType == null,
+      Preconditions.checkState(logicalTypeAnnotation == null,
           "MAP is already a logical type and can't be changed.");
       if (keyType == null) {
         keyType = STRING_KEY;
@@ -1194,7 +1245,7 @@ public class Types {
 
     @Override
     protected Type build(String name) {
-      Preconditions.checkState(originalType == null,
+      Preconditions.checkState(logicalTypeAnnotation == null,
           "LIST is already the logical type and can't be changed");
       Preconditions.checkNotNull(elementType, "List element type");
 
diff --git a/parquet-column/src/test/java/org/apache/parquet/parser/TestParquetParser.java b/parquet-column/src/test/java/org/apache/parquet/parser/TestParquetParser.java
index e2f737a..5082501 100644
--- a/parquet-column/src/test/java/org/apache/parquet/parser/TestParquetParser.java
+++ b/parquet-column/src/test/java/org/apache/parquet/parser/TestParquetParser.java
@@ -1,4 +1,4 @@
-/* 
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -18,6 +18,10 @@
  */
 package org.apache.parquet.parser;
 
+import static org.apache.parquet.schema.LogicalTypeAnnotation.TimeUnit.MILLIS;
+import static org.apache.parquet.schema.LogicalTypeAnnotation.intType;
+import static org.apache.parquet.schema.LogicalTypeAnnotation.timeType;
+import static org.apache.parquet.schema.LogicalTypeAnnotation.timestampType;
 import static org.junit.Assert.assertEquals;
 import static org.apache.parquet.schema.MessageTypeParser.parseMessageType;
 import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
@@ -248,6 +252,8 @@ public class TestParquetParser {
         "  required int32 time (TIME_MILLIS);" +
         "  required int64 timestamp (TIMESTAMP_MILLIS);" +
         "  required FIXED_LEN_BYTE_ARRAY(12) interval (INTERVAL);" +
+        "  required int32 newTime (TIME(MILLIS,true));" +
+        "  required int64 newTimestamp (TIMESTAMP(MILLIS,false));" +
         "}\n";
 
     MessageType parsed = MessageTypeParser.parseMessageType(message);
@@ -256,7 +262,9 @@ public class TestParquetParser {
         .required(INT32).as(TIME_MILLIS).named("time")
         .required(INT64).as(TIMESTAMP_MILLIS).named("timestamp")
         .required(FIXED_LEN_BYTE_ARRAY).length(12).as(INTERVAL).named("interval")
-        .named("TimeMessage");
+        .required(INT32).as(timeType(true, MILLIS)).named("newTime")
+        .required(INT64).as(timestampType(false, MILLIS)).named("newTimestamp")
+      .named("TimeMessage");
 
     assertEquals(expected, parsed);
     MessageType reparsed = MessageTypeParser.parseMessageType(parsed.toString());
@@ -294,6 +302,36 @@ public class TestParquetParser {
   }
 
   @Test
+  public void testIntegerAnnotations() {
+    String message = "message IntMessage {" +
+      "  required int32 i8 (INT(8,true));" +
+      "  required int32 i16 (INT(16,true));" +
+      "  required int32 i32 (INT(32,true));" +
+      "  required int64 i64 (INT(64,true));" +
+      "  required int32 u8 (INT(8,false));" +
+      "  required int32 u16 (INT(16,false));" +
+      "  required int32 u32 (INT(32,false));" +
+      "  required int64 u64 (INT(64,false));" +
+      "}\n";
+
+    MessageType parsed = MessageTypeParser.parseMessageType(message);
+    MessageType expected = Types.buildMessage()
+      .required(INT32).as(intType(8, true)).named("i8")
+      .required(INT32).as(intType(16, true)).named("i16")
+      .required(INT32).as(intType(32, true)).named("i32")
+      .required(INT64).as(intType(64, true)).named("i64")
+      .required(INT32).as(intType(8, false)).named("u8")
+      .required(INT32).as(intType(16, false)).named("u16")
+      .required(INT32).as(intType(32, false)).named("u32")
+      .required(INT64).as(intType(64, false)).named("u64")
+      .named("IntMessage");
+
+    assertEquals(expected, parsed);
+    MessageType reparsed = MessageTypeParser.parseMessageType(parsed.toString());
+    assertEquals(expected, reparsed);
+  }
+
+  @Test
   public void testEmbeddedAnnotations() {
     String message = "message EmbeddedMessage {" +
         "  required binary json (JSON);" +
diff --git a/parquet-column/src/test/java/org/apache/parquet/schema/TestTypeBuilders.java b/parquet-column/src/test/java/org/apache/parquet/schema/TestTypeBuilders.java
index 0b1f41a..a42e9e3 100644
--- a/parquet-column/src/test/java/org/apache/parquet/schema/TestTypeBuilders.java
+++ b/parquet-column/src/test/java/org/apache/parquet/schema/TestTypeBuilders.java
@@ -1,4 +1,4 @@
-/* 
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -1396,6 +1396,47 @@ public class TestTypeBuilders {
     });
   }
 
+  @Test
+  public void testDecimalLogicalType() {
+    PrimitiveType expected = new PrimitiveType(REQUIRED, BINARY, "aDecimal",
+      LogicalTypeAnnotation.decimalType(3, 4));
+    PrimitiveType actual = Types.required(BINARY)
+      .as(LogicalTypeAnnotation.decimalType(3, 4)).named("aDecimal");
+    Assert.assertEquals(expected, actual);
+  }
+
+  @Test
+  public void testDecimalLogicalTypeWithDeprecatedScale() {
+    PrimitiveType expected = new PrimitiveType(REQUIRED, BINARY, "aDecimal",
+      LogicalTypeAnnotation.decimalType(3, 4));
+    PrimitiveType actual = Types.required(BINARY)
+      .as(LogicalTypeAnnotation.decimalType(3, 4)).scale(3).named("aDecimal");
+    Assert.assertEquals(expected, actual);
+  }
+
+  @Test
+  public void testDecimalLogicalTypeWithDeprecatedPrecision() {
+    PrimitiveType expected = new PrimitiveType(REQUIRED, BINARY, "aDecimal",
+      LogicalTypeAnnotation.decimalType(3, 4));
+    PrimitiveType actual = Types.required(BINARY)
+      .as(LogicalTypeAnnotation.decimalType(3, 4)).precision(4).named("aDecimal");
+    Assert.assertEquals(expected, actual);
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testDecimalLogicalTypeWithDeprecatedScaleMismatch() {
+    Types.required(BINARY)
+      .as(LogicalTypeAnnotation.decimalType(3, 4))
+      .scale(4).named("aDecimal");
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testDecimalLogicalTypeWithDeprecatedPrecisionMismatch() {
+    Types.required(BINARY)
+      .as(LogicalTypeAnnotation.decimalType(3, 4))
+      .precision(5).named("aDecimal");
+  }
+
   /**
    * A convenience method to avoid a large number of @Test(expected=...) tests
    * @param message A String message to describe this assertion
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java
index 555b856..40c0b84 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java
@@ -1,4 +1,4 @@
-/* 
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -39,8 +39,24 @@ import java.util.concurrent.ConcurrentHashMap;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.parquet.CorruptStatistics;
 import org.apache.parquet.ParquetReadOptions;
+import org.apache.parquet.format.BsonType;
 import org.apache.parquet.format.CompressionCodec;
+import org.apache.parquet.format.DateType;
+import org.apache.parquet.format.DecimalType;
+import org.apache.parquet.format.EnumType;
+import org.apache.parquet.format.IntType;
+import org.apache.parquet.format.JsonType;
+import org.apache.parquet.format.ListType;
+import org.apache.parquet.format.LogicalType;
+import org.apache.parquet.format.MapType;
+import org.apache.parquet.format.MicroSeconds;
+import org.apache.parquet.format.MilliSeconds;
+import org.apache.parquet.format.NullType;
 import org.apache.parquet.format.PageEncodingStats;
+import org.apache.parquet.format.StringType;
+import org.apache.parquet.format.TimeType;
+import org.apache.parquet.format.TimeUnit;
+import org.apache.parquet.format.TimestampType;
 import org.apache.parquet.hadoop.metadata.ColumnPath;
 import org.apache.parquet.format.ColumnChunk;
 import org.apache.parquet.format.ColumnMetaData;
@@ -75,6 +91,7 @@ import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
 import org.apache.parquet.schema.Type.Repetition;
 import org.apache.parquet.schema.TypeVisitor;
 import org.apache.parquet.schema.Types;
+import org.apache.parquet.schema.LogicalTypeAnnotation;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -172,8 +189,9 @@ public class ParquetMetadataConverter {
         SchemaElement element = new SchemaElement(primitiveType.getName());
         element.setRepetition_type(toParquetRepetition(primitiveType.getRepetition()));
         element.setType(getType(primitiveType.getPrimitiveTypeName()));
-        if (primitiveType.getOriginalType() != null) {
-          element.setConverted_type(getConvertedType(primitiveType.getOriginalType()));
+        if (primitiveType.getLogicalTypeAnnotation() != null) {
+          element.setConverted_type(convertToConvertedType(primitiveType.getLogicalTypeAnnotation()));
+          element.setLogicalType(convertToLogicalType(primitiveType.getLogicalTypeAnnotation()));
         }
         if (primitiveType.getDecimalMetadata() != null) {
           element.setPrecision(primitiveType.getDecimalMetadata().getPrecision());
@@ -201,8 +219,9 @@ public class ParquetMetadataConverter {
       public void visit(GroupType groupType) {
         SchemaElement element = new SchemaElement(groupType.getName());
         element.setRepetition_type(toParquetRepetition(groupType.getRepetition()));
-        if (groupType.getOriginalType() != null) {
-          element.setConverted_type(getConvertedType(groupType.getOriginalType()));
+        if (groupType.getLogicalTypeAnnotation() != null) {
+          element.setConverted_type(convertToConvertedType(groupType.getLogicalTypeAnnotation()));
+          element.setLogicalType(convertToLogicalType(groupType.getLogicalTypeAnnotation()));
         }
         if (groupType.getId() != null) {
           element.setField_id(groupType.getId().intValue());
@@ -221,6 +240,158 @@ public class ParquetMetadataConverter {
     });
   }
 
+  LogicalType convertToLogicalType(LogicalTypeAnnotation logicalTypeAnnotation) {
+    LogicalTypeConverterVisitor logicalTypeConverterVisitor = new LogicalTypeConverterVisitor();
+    logicalTypeAnnotation.accept(logicalTypeConverterVisitor);
+    return logicalTypeConverterVisitor.logicalType;
+  }
+
+  ConvertedType convertToConvertedType(LogicalTypeAnnotation logicalTypeAnnotation) {
+    LogicalTypeConverterVisitor logicalTypeConverterVisitor = new LogicalTypeConverterVisitor();
+    logicalTypeAnnotation.accept(logicalTypeConverterVisitor);
+    return logicalTypeConverterVisitor.convertedType;
+  }
+
+
+  static org.apache.parquet.format.TimeUnit convertUnit(LogicalTypeAnnotation.TimeUnit unit) {
+    switch (unit) {
+      case MICROS:
+        return org.apache.parquet.format.TimeUnit.MICROS(new MicroSeconds());
+      case MILLIS:
+        return org.apache.parquet.format.TimeUnit.MILLIS(new MilliSeconds());
+      default:
+        throw new RuntimeException("Unknown time unit " + unit);
+    }
+  }
+
+  private static class LogicalTypeConverterVisitor implements LogicalTypeAnnotation.LogicalTypeAnnotationVisitor {
+    private LogicalType logicalType;
+    private ConvertedType convertedType;
+
+    @Override
+    public void visit(LogicalTypeAnnotation.StringLogicalTypeAnnotation logicalTypeAnnotation) {
+      logicalType = LogicalType.STRING(new StringType());
+      convertedType = ConvertedType.UTF8;
+    }
+
+    @Override
+    public void visit(LogicalTypeAnnotation.MapLogicalTypeAnnotation logicalTypeAnnotation) {
+      logicalType = LogicalType.MAP(new MapType());
+      convertedType = ConvertedType.MAP;
+    }
+
+    @Override
+    public void visit(LogicalTypeAnnotation.ListLogicalTypeAnnotation logicalTypeAnnotation) {
+      logicalType = LogicalType.LIST(new ListType());
+      convertedType = ConvertedType.LIST;
+    }
+
+    @Override
+    public void visit(LogicalTypeAnnotation.EnumLogicalTypeAnnotation logicalTypeAnnotation) {
+      logicalType = LogicalType.ENUM(new EnumType());
+      convertedType = ConvertedType.ENUM;
+    }
+
+    @Override
+    public void visit(LogicalTypeAnnotation.DecimalLogicalTypeAnnotation logicalTypeAnnotation) {
+      logicalType = LogicalType.DECIMAL(new DecimalType(logicalTypeAnnotation.getScale(), logicalTypeAnnotation.getPrecision()));
+      convertedType = ConvertedType.DECIMAL;
+    }
+
+    @Override
+    public void visit(LogicalTypeAnnotation.DateLogicalTypeAnnotation logicalTypeAnnotation) {
+      logicalType = LogicalType.DATE(new DateType());
+      convertedType = ConvertedType.DATE;
+    }
+
+    @Override
+    public void visit(LogicalTypeAnnotation.TimeLogicalTypeAnnotation logicalTypeAnnotation) {
+      logicalType = LogicalType.TIME(new TimeType(logicalTypeAnnotation.isAdjustedToUTC(), convertUnit(logicalTypeAnnotation.getUnit())));
+      switch (logicalTypeAnnotation.toOriginalType()) {
+        case TIME_MILLIS:
+          convertedType = ConvertedType.TIME_MILLIS;
+          break;
+        case TIME_MICROS:
+          convertedType = ConvertedType.TIME_MICROS;
+          break;
+        default:
+          throw new RuntimeException("Unknown converted type for " + logicalTypeAnnotation.toOriginalType());
+      }
+    }
+
+    @Override
+    public void visit(LogicalTypeAnnotation.TimestampLogicalTypeAnnotation logicalTypeAnnotation) {
+      logicalType = LogicalType.TIMESTAMP(new TimestampType(logicalTypeAnnotation.isAdjustedToUTC(), convertUnit(logicalTypeAnnotation.getUnit())));
+      switch (logicalTypeAnnotation.toOriginalType()) {
+        case TIMESTAMP_MICROS:
+          convertedType = ConvertedType.TIMESTAMP_MICROS;
+          break;
+        case TIMESTAMP_MILLIS:
+          convertedType = ConvertedType.TIMESTAMP_MILLIS;
+          break;
+        default:
+          throw new RuntimeException("Unknown converted type for " + logicalTypeAnnotation.toOriginalType());
+      }
+    }
+
+    @Override
+    public void visit(LogicalTypeAnnotation.IntLogicalTypeAnnotation logicalTypeAnnotation) {
+      logicalType = LogicalType.INTEGER(new IntType((byte) logicalTypeAnnotation.getBitWidth(), logicalTypeAnnotation.isSigned()));
+      switch (logicalTypeAnnotation.toOriginalType()) {
+        case INT_8:
+          convertedType = ConvertedType.INT_8;
+          break;
+        case INT_16:
+          convertedType = ConvertedType.INT_16;
+          break;
+        case INT_32:
+          convertedType = ConvertedType.INT_32;
+          break;
+        case INT_64:
+          convertedType = ConvertedType.INT_64;
+          break;
+        case UINT_8:
+          convertedType = ConvertedType.UINT_8;
+          break;
+        case UINT_16:
+          convertedType = ConvertedType.UINT_16;
+          break;
+        case UINT_32:
+          convertedType = ConvertedType.UINT_32;
+          break;
+        case UINT_64:
+          convertedType = ConvertedType.UINT_64;
+          break;
+        default:
+          throw new RuntimeException("Unknown original type " + logicalTypeAnnotation.toOriginalType());
+      }
+    }
+
+    @Override
+    public void visit(LogicalTypeAnnotation.JsonLogicalTypeAnnotation logicalTypeAnnotation) {
+      logicalType = LogicalType.JSON(new JsonType());
+      convertedType = ConvertedType.JSON;
+    }
+
+    @Override
+    public void visit(LogicalTypeAnnotation.BsonLogicalTypeAnnotation logicalTypeAnnotation) {
+      logicalType = LogicalType.BSON(new BsonType());
+      convertedType = ConvertedType.BSON;
+    }
+
+    @Override
+    public void visit(LogicalTypeAnnotation.IntervalLogicalTypeAnnotation logicalTypeAnnotation) {
+      logicalType = LogicalType.UNKNOWN(new NullType());
+      convertedType = ConvertedType.INTERVAL;
+    }
+
+    @Override
+    public void visit(LogicalTypeAnnotation.MapKeyValueTypeAnnotation logicalTypeAnnotation) {
+      logicalType = LogicalType.UNKNOWN(new NullType());
+      convertedType = ConvertedType.MAP_KEY_VALUE;
+    }
+  }
+
   private void addRowGroup(ParquetMetadata parquetMetadata, List<RowGroup> rowGroups, BlockMetaData block) {
     //rowGroup.total_byte_size = ;
     List<ColumnChunkMetaData> columns = block.getColumns();
@@ -600,108 +771,104 @@ public class ParquetMetadataConverter {
   }
 
   // Visible for testing
-  OriginalType getOriginalType(ConvertedType type) {
+  LogicalTypeAnnotation getOriginalType(ConvertedType type, SchemaElement schemaElement) {
     switch (type) {
       case UTF8:
-        return OriginalType.UTF8;
+        return LogicalTypeAnnotation.stringType();
       case MAP:
-        return OriginalType.MAP;
+        return LogicalTypeAnnotation.mapType();
       case MAP_KEY_VALUE:
-        return OriginalType.MAP_KEY_VALUE;
+        return LogicalTypeAnnotation.MapKeyValueTypeAnnotation.getInstance();
       case LIST:
-        return OriginalType.LIST;
+        return LogicalTypeAnnotation.listType();
       case ENUM:
-        return OriginalType.ENUM;
+        return LogicalTypeAnnotation.enumType();
       case DECIMAL:
-        return OriginalType.DECIMAL;
+        int scale = (schemaElement == null ? 0 : schemaElement.scale);
+        int precision = (schemaElement == null ? 0 : schemaElement.precision);
+        return LogicalTypeAnnotation.decimalType(scale, precision);
       case DATE:
-        return OriginalType.DATE;
+        return LogicalTypeAnnotation.dateType();
       case TIME_MILLIS:
-        return OriginalType.TIME_MILLIS;
+        return LogicalTypeAnnotation.timeType(true, LogicalTypeAnnotation.TimeUnit.MILLIS);
       case TIME_MICROS:
-        return OriginalType.TIME_MICROS;
+        return LogicalTypeAnnotation.timeType(true, LogicalTypeAnnotation.TimeUnit.MICROS);
       case TIMESTAMP_MILLIS:
-        return OriginalType.TIMESTAMP_MILLIS;
+        return LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS);
       case TIMESTAMP_MICROS:
-        return OriginalType.TIMESTAMP_MICROS;
+        return LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MICROS);
       case INTERVAL:
-        return OriginalType.INTERVAL;
+        return LogicalTypeAnnotation.IntervalLogicalTypeAnnotation.getInstance();
       case INT_8:
-        return OriginalType.INT_8;
+        return LogicalTypeAnnotation.intType(8, true);
       case INT_16:
-        return OriginalType.INT_16;
+        return LogicalTypeAnnotation.intType(16, true);
       case INT_32:
-        return OriginalType.INT_32;
+        return LogicalTypeAnnotation.intType(32, true);
       case INT_64:
-        return OriginalType.INT_64;
+        return LogicalTypeAnnotation.intType(64, true);
       case UINT_8:
-        return OriginalType.UINT_8;
+        return LogicalTypeAnnotation.intType(8, false);
       case UINT_16:
-        return OriginalType.UINT_16;
+        return LogicalTypeAnnotation.intType(16, false);
       case UINT_32:
-        return OriginalType.UINT_32;
+        return LogicalTypeAnnotation.intType(32, false);
       case UINT_64:
-        return OriginalType.UINT_64;
+        return LogicalTypeAnnotation.intType(64, false);
       case JSON:
-        return OriginalType.JSON;
+        return LogicalTypeAnnotation.jsonType();
       case BSON:
-        return OriginalType.BSON;
+        return LogicalTypeAnnotation.bsonType();
       default:
-        throw new RuntimeException("Unknown converted type " + type);
+        throw new RuntimeException("Can't convert converted type to logical type, unknown converted type " + type);
     }
   }
 
-  // Visible for testing
-  ConvertedType getConvertedType(OriginalType type) {
-    switch (type) {
-      case UTF8:
-        return ConvertedType.UTF8;
+  LogicalTypeAnnotation getOriginalType(LogicalType type) {
+    switch (type.getSetField()) {
       case MAP:
-        return ConvertedType.MAP;
-      case MAP_KEY_VALUE:
-        return ConvertedType.MAP_KEY_VALUE;
-      case LIST:
-        return ConvertedType.LIST;
-      case ENUM:
-        return ConvertedType.ENUM;
-      case DECIMAL:
-        return ConvertedType.DECIMAL;
+        return LogicalTypeAnnotation.mapType();
+      case BSON:
+        return LogicalTypeAnnotation.bsonType();
       case DATE:
-        return ConvertedType.DATE;
-      case TIME_MILLIS:
-        return ConvertedType.TIME_MILLIS;
-      case TIME_MICROS:
-        return ConvertedType.TIME_MICROS;
-      case TIMESTAMP_MILLIS:
-        return ConvertedType.TIMESTAMP_MILLIS;
-      case TIMESTAMP_MICROS:
-        return ConvertedType.TIMESTAMP_MICROS;
-      case INTERVAL:
-        return ConvertedType.INTERVAL;
-      case INT_8:
-        return ConvertedType.INT_8;
-      case INT_16:
-        return ConvertedType.INT_16;
-      case INT_32:
-        return ConvertedType.INT_32;
-      case INT_64:
-        return ConvertedType.INT_64;
-      case UINT_8:
-        return ConvertedType.UINT_8;
-      case UINT_16:
-        return ConvertedType.UINT_16;
-      case UINT_32:
-        return ConvertedType.UINT_32;
-      case UINT_64:
-        return ConvertedType.UINT_64;
+        return LogicalTypeAnnotation.dateType();
+      case ENUM:
+        return LogicalTypeAnnotation.enumType();
       case JSON:
-        return ConvertedType.JSON;
-      case BSON:
-        return ConvertedType.BSON;
+        return LogicalTypeAnnotation.jsonType();
+      case LIST:
+        return LogicalTypeAnnotation.listType();
+      case TIME:
+        TimeType time = type.getTIME();
+        return LogicalTypeAnnotation.timeType(time.isAdjustedToUTC, convertTimeUnit(time.unit));
+      case STRING:
+        return LogicalTypeAnnotation.stringType();
+      case DECIMAL:
+        DecimalType decimal = type.getDECIMAL();
+        return LogicalTypeAnnotation.decimalType(decimal.scale, decimal.precision);
+      case INTEGER:
+        IntType integer = type.getINTEGER();
+        return LogicalTypeAnnotation.intType(integer.bitWidth, integer.isSigned);
+      case UNKNOWN:
+        return null;
+      case TIMESTAMP:
+        TimestampType timestamp = type.getTIMESTAMP();
+        return LogicalTypeAnnotation.timestampType(timestamp.isAdjustedToUTC, convertTimeUnit(timestamp.unit));
       default:
-        throw new RuntimeException("Unknown original type " + type);
-     }
-   }
+        throw new RuntimeException("Unknown logical type " + type);
+    }
+  }
+
+  private LogicalTypeAnnotation.TimeUnit convertTimeUnit(TimeUnit unit) {
+    switch (unit.getSetField()) {
+      case MICROS:
+        return LogicalTypeAnnotation.TimeUnit.MICROS;
+      case MILLIS:
+        return LogicalTypeAnnotation.TimeUnit.MILLIS;
+      default:
+        throw new RuntimeException("Unknown time unit " + unit);
+    }
+  }
 
   private static void addKeyValue(FileMetaData fileMetaData, String key, String value) {
     KeyValue keyValue = new KeyValue(key);
@@ -998,8 +1165,15 @@ public class ParquetMetadataConverter {
         buildChildren((Types.GroupBuilder) childBuilder, schema, schemaElement.num_children, columnOrders, columnCount);
       }
 
+      if (schemaElement.isSetLogicalType()) {
+        childBuilder.as(getOriginalType(schemaElement.logicalType));
+      }
       if (schemaElement.isSetConverted_type()) {
-        childBuilder.as(getOriginalType(schemaElement.converted_type));
+        LogicalTypeAnnotation originalType = getOriginalType(schemaElement.converted_type, schemaElement);
+        LogicalTypeAnnotation newLogicalType = getOriginalType(schemaElement.logicalType);
+        if (!originalType.equals(newLogicalType)) {
+          childBuilder.as(getOriginalType(schemaElement.converted_type, schemaElement));
+        }
       }
       if (schemaElement.isSetField_id()) {
         childBuilder.id(schemaElement.field_id);
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/ParquetMetadata.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/ParquetMetadata.java
index 47cad49..ac181c7 100755
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/ParquetMetadata.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/ParquetMetadata.java
@@ -1,4 +1,4 @@
-/* 
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -27,8 +27,7 @@ import org.codehaus.jackson.JsonGenerationException;
 import org.codehaus.jackson.JsonParseException;
 import org.codehaus.jackson.map.JsonMappingException;
 import org.codehaus.jackson.map.ObjectMapper;
-import org.codehaus.jackson.map.ObjectWriter;
-import org.codehaus.jackson.map.SerializationConfig.Feature;
+import org.codehaus.jackson.map.SerializationConfig;
 
 /**
  * Meta Data block stored in the footer of the file
@@ -38,6 +37,12 @@ public class ParquetMetadata {
 
   private static final ObjectMapper objectMapper = new ObjectMapper();
 
+  // Enable FAIL_ON_EMPTY_BEANS on objectmapper. Without this feature parquet-casdacing tests fail,
+  // because LogicalTypeAnnotation implementations are classes without any property.
+  static {
+    objectMapper.configure(SerializationConfig.Feature.FAIL_ON_EMPTY_BEANS, false);
+  }
+
   /**
    * @param parquetMetaData an instance of parquet metadata to convert
    * @return the json representation
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/format/converter/TestParquetMetadataConverter.java b/parquet-hadoop/src/test/java/org/apache/parquet/format/converter/TestParquetMetadataConverter.java
index 6cce32f..4fc4035 100644
--- a/parquet-hadoop/src/test/java/org/apache/parquet/format/converter/TestParquetMetadataConverter.java
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/format/converter/TestParquetMetadataConverter.java
@@ -1,4 +1,4 @@
-/* 
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -60,6 +60,8 @@ import org.apache.parquet.column.statistics.FloatStatistics;
 import org.apache.parquet.column.statistics.IntStatistics;
 import org.apache.parquet.column.statistics.LongStatistics;
 import org.apache.parquet.column.statistics.Statistics;
+import org.apache.parquet.format.DecimalType;
+import org.apache.parquet.format.LogicalType;
 import org.apache.parquet.hadoop.metadata.BlockMetaData;
 import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
 import org.apache.parquet.hadoop.metadata.ColumnPath;
@@ -67,6 +69,7 @@ import org.apache.parquet.hadoop.metadata.CompressionCodecName;
 import org.apache.parquet.hadoop.metadata.ParquetMetadata;
 import org.apache.parquet.io.api.Binary;
 import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.LogicalTypeAnnotation;
 import org.junit.Assert;
 import org.junit.Test;
 import org.apache.parquet.example.Paper;
@@ -130,12 +133,14 @@ public class TestParquetMetadataConverter {
             .setRepetition_type(FieldRepetitionType.REQUIRED)
             .setType(Type.BYTE_ARRAY)
             .setConverted_type(ConvertedType.DECIMAL)
+            .setLogicalType(LogicalType.DECIMAL(new DecimalType(2, 9)))
             .setPrecision(9).setScale(2),
         new SchemaElement("aFixedDecimal")
             .setRepetition_type(FieldRepetitionType.OPTIONAL)
             .setType(Type.FIXED_LEN_BYTE_ARRAY)
             .setType_length(4)
             .setConverted_type(ConvertedType.DECIMAL)
+            .setLogicalType(LogicalType.DECIMAL(new DecimalType(2, 9)))
             .setPrecision(9).setScale(2)
     );
     Assert.assertEquals(expected, schemaElements);
@@ -163,10 +168,11 @@ public class TestParquetMetadataConverter {
       assertEquals(type, parquetMetadataConverter.getType(parquetMetadataConverter.getPrimitive(type)));
     }
     for (OriginalType original : OriginalType.values()) {
-      assertEquals(original, parquetMetadataConverter.getOriginalType(parquetMetadataConverter.getConvertedType(original)));
+      assertEquals(original, parquetMetadataConverter.getOriginalType(
+        parquetMetadataConverter.convertToConvertedType(LogicalTypeAnnotation.fromOriginalType(original, null)), null).toOriginalType());
     }
     for (ConvertedType converted : ConvertedType.values()) {
-      assertEquals(converted, parquetMetadataConverter.getConvertedType(parquetMetadataConverter.getOriginalType(converted)));
+      assertEquals(converted, parquetMetadataConverter.convertToConvertedType(parquetMetadataConverter.getOriginalType(converted, null)));
     }
   }
 
@@ -336,7 +342,7 @@ public class TestParquetMetadataConverter {
             0, 0, 0, 0, 0);
     return md;
   }
-  
+
   @Test
   public void testEncodingsCache() {
     ParquetMetadataConverter parquetMetadataConverter = new ParquetMetadataConverter();
diff --git a/parquet-pig/src/main/java/org/apache/parquet/pig/PigSchemaConverter.java b/parquet-pig/src/main/java/org/apache/parquet/pig/PigSchemaConverter.java
index c445134..24f7ee8 100644
--- a/parquet-pig/src/main/java/org/apache/parquet/pig/PigSchemaConverter.java
+++ b/parquet-pig/src/main/java/org/apache/parquet/pig/PigSchemaConverter.java
@@ -1,4 +1,4 @@
-/* 
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY

-- 
To stop receiving notification emails like this one, please contact
gabor@apache.org.