You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by bl...@apache.org on 2016/04/20 17:41:30 UTC

[2/2] parquet-mr git commit: PARQUET-358: Add support for Avro's logical types API.

PARQUET-358: Add support for Avro's logical types API.

This adds support for Avro's logical types API to parquet-avro.

* The logical types API was introduced in Avro 1.8.0, so this bumps the Avro dependency version to 1.8.0.
* Types supported are: decimal, date, time-millis, time-micros, timestamp-millis, and timestamp-micros
* Tests have been copied from Avro and ported to the parquet-avro API

Author: Ryan Blue <bl...@apache.org>

Closes #318 from rdblue/PARQUET-358-add-avro-logical-types-api and squashes the following commits:

bd81f9c [Ryan Blue] PARQUET-358: Fix review items.
0a882ee [Ryan Blue] PARQUET-358: Add logical types circular reference test.
5124618 [Ryan Blue] PARQUET-358: Add license documentation for code from Avro.
dcb14be [Ryan Blue] PARQUET-358: Add support for Avro's logical types API.


Project: http://git-wip-us.apache.org/repos/asf/parquet-mr/repo
Commit: http://git-wip-us.apache.org/repos/asf/parquet-mr/commit/6b24a1d1
Tree: http://git-wip-us.apache.org/repos/asf/parquet-mr/tree/6b24a1d1
Diff: http://git-wip-us.apache.org/repos/asf/parquet-mr/diff/6b24a1d1

Branch: refs/heads/master
Commit: 6b24a1d1b5e2792a7821ad172a45e38d2b04f9b8
Parents: 82b8ecc
Author: Ryan Blue <bl...@apache.org>
Authored: Wed Apr 20 08:41:22 2016 -0700
Committer: Ryan Blue <bl...@apache.org>
Committed: Wed Apr 20 08:41:22 2016 -0700

----------------------------------------------------------------------
 LICENSE                                         |   8 +
 NOTICE                                          |  11 +
 parquet-avro/pom.xml                            |   4 -
 .../avro/AvroIndexedRecordConverter.java        |  18 +-
 .../apache/parquet/avro/AvroReadSupport.java    |   4 +-
 .../parquet/avro/AvroRecordConverter.java       | 121 +++-
 .../parquet/avro/AvroSchemaConverter.java       | 147 ++--
 .../apache/parquet/avro/AvroWriteSupport.java   | 167 +++--
 .../parquet/avro/ParentValueContainer.java      | 175 +++++
 .../src/main/resources/META-INF/LICENSE         | 186 +++++
 parquet-avro/src/main/resources/META-INF/NOTICE |  18 +
 .../org/apache/parquet/avro/AvroTestUtil.java   |  53 ++
 .../parquet/avro/TestAvroSchemaConverter.java   | 278 +++++++-
 .../parquet/avro/TestCircularReferences.java    | 383 ++++++++++
 .../parquet/avro/TestGenericLogicalTypes.java   | 271 +++++++
 .../org/apache/parquet/avro/TestReadWrite.java  | 118 +++-
 .../avro/TestReadWriteOldListBehavior.java      |   1 -
 .../parquet/avro/TestReflectLogicalTypes.java   | 705 +++++++++++++++++++
 .../java/org/apache/parquet/io/api/Binary.java  |  55 +-
 .../java/org/apache/parquet/schema/Types.java   |  12 +-
 .../org/apache/parquet/io/api/TestBinary.java   |  10 -
 pom.xml                                         |   1 +
 22 files changed, 2593 insertions(+), 153 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b24a1d1/LICENSE
----------------------------------------------------------------------
diff --git a/LICENSE b/LICENSE
index b759148..b006581 100644
--- a/LICENSE
+++ b/LICENSE
@@ -178,6 +178,14 @@
 
 --------------------------------------------------------------------------------
 
+This product includes code from Apache Avro.
+
+Copyright: 2014 The Apache Software Foundation.
+Home page: https://avro.apache.org/
+License: http://www.apache.org/licenses/LICENSE-2.0
+
+--------------------------------------------------------------------------------
+
 This project includes code from Daniel Lemire's JavaFastPFOR project. The
 "Lemire" bit packing source code produced by parquet-generator is derived from
 the JavaFastPFOR project.

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b24a1d1/NOTICE
----------------------------------------------------------------------
diff --git a/NOTICE b/NOTICE
index c6e3bf2..a9b6c56 100644
--- a/NOTICE
+++ b/NOTICE
@@ -43,3 +43,14 @@ with the following copyright notice:
   See the License for the specific language governing permissions and
   limitations under the License.
 
+--------------------------------------------------------------------------------
+
+This product includes code from Apache Avro, which includes the following in
+its NOTICE file:
+
+  Apache Avro
+  Copyright 2010-2015 The Apache Software Foundation
+
+  This product includes software developed at
+  The Apache Software Foundation (http://www.apache.org/).
+

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b24a1d1/parquet-avro/pom.xml
----------------------------------------------------------------------
diff --git a/parquet-avro/pom.xml b/parquet-avro/pom.xml
index aad197d..50c37db 100644
--- a/parquet-avro/pom.xml
+++ b/parquet-avro/pom.xml
@@ -32,10 +32,6 @@
   <name>Apache Parquet Avro</name>
   <url>https://parquet.apache.org</url>
 
-  <properties>
-    <avro.version>1.7.6</avro.version>
-  </properties>
-
   <dependencies>
     <dependency>
       <groupId>org.apache.parquet</groupId>

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b24a1d1/parquet-avro/src/main/java/org/apache/parquet/avro/AvroIndexedRecordConverter.java
----------------------------------------------------------------------
diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroIndexedRecordConverter.java b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroIndexedRecordConverter.java
index 06c66d6..48eab4d 100644
--- a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroIndexedRecordConverter.java
+++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroIndexedRecordConverter.java
@@ -21,6 +21,8 @@ package org.apache.parquet.avro;
 import java.lang.reflect.Constructor;
 import java.util.HashMap;
 import java.util.Map;
+import org.apache.avro.Conversion;
+import org.apache.avro.LogicalType;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericArray;
 import org.apache.avro.generic.GenericData;
@@ -111,6 +113,11 @@ class AvroIndexedRecordConverter<T extends IndexedRecord> extends GroupConverter
 
   @SuppressWarnings("unchecked")
   private static <T> Class<T> getDatumClass(GenericData model, Schema schema) {
+    if (model.getConversionFor(schema.getLogicalType()) != null) {
+      // use generic classes to pass data to conversions
+      return null;
+    }
+
     if (model instanceof SpecificData) {
       return (Class<T>) ((SpecificData) model).getClass(schema);
     }
@@ -133,7 +140,16 @@ class AvroIndexedRecordConverter<T extends IndexedRecord> extends GroupConverter
   }
 
   private static Converter newConverter(Schema schema, Type type,
-      GenericData model, ParentValueContainer parent) {
+      GenericData model, ParentValueContainer setter) {
+
+    LogicalType logicalType = schema.getLogicalType();
+    // the expected type is always null because it is determined by the parent
+    // datum class, which never helps for generic. when logical types are added
+    // to specific, this should pass the expected type here.
+    Conversion<?> conversion = model.getConversionFor(logicalType);
+    ParentValueContainer parent = ParentValueContainer
+        .getConversionContainer(setter, conversion, schema);
+
     if (schema.getType().equals(Schema.Type.BOOLEAN)) {
       return new AvroConverters.FieldBooleanConverter(parent);
     } else if (schema.getType().equals(Schema.Type.INT)) {

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b24a1d1/parquet-avro/src/main/java/org/apache/parquet/avro/AvroReadSupport.java
----------------------------------------------------------------------
diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroReadSupport.java b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroReadSupport.java
index e73e8af..7d55bf5 100644
--- a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroReadSupport.java
+++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroReadSupport.java
@@ -110,9 +110,9 @@ public class AvroReadSupport<T> extends ReadSupport<T> {
     MessageType parquetSchema = readContext.getRequestedSchema();
     Schema avroSchema;
 
-    if (readContext.getReadSupportMetadata().get(AVRO_READ_SCHEMA_METADATA_KEY) != null) {
+    if (metadata.get(AVRO_READ_SCHEMA_METADATA_KEY) != null) {
       // use the Avro read schema provided by the user
-      avroSchema = new Schema.Parser().parse(readContext.getReadSupportMetadata().get(AVRO_READ_SCHEMA_METADATA_KEY));
+      avroSchema = new Schema.Parser().parse(metadata.get(AVRO_READ_SCHEMA_METADATA_KEY));
     } else if (keyValueMetaData.get(AVRO_SCHEMA_METADATA_KEY) != null) {
       // use the Avro schema from the file metadata if present
       avroSchema = new Schema.Parser().parse(keyValueMetaData.get(AVRO_SCHEMA_METADATA_KEY));

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b24a1d1/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordConverter.java
----------------------------------------------------------------------
diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordConverter.java b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordConverter.java
index 38a761c..10bb29b 100644
--- a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordConverter.java
+++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordConverter.java
@@ -29,6 +29,7 @@ import it.unimi.dsi.fastutil.shorts.ShortArrayList;
 import java.lang.reflect.Field;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
+import java.lang.reflect.Modifier;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
@@ -36,8 +37,14 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.LinkedHashMap;
+import org.apache.avro.AvroTypeException;
+import org.apache.avro.Conversion;
+import org.apache.avro.LogicalType;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericData;
+import org.apache.avro.reflect.AvroIgnore;
+import org.apache.avro.reflect.AvroName;
 import org.apache.avro.reflect.ReflectData;
 import org.apache.avro.reflect.Stringable;
 import org.apache.avro.specific.SpecificData;
@@ -69,7 +76,8 @@ class AvroRecordConverter<T> extends AvroConverters.AvroGroupConverter {
   private static final String JAVA_CLASS_PROP = "java-class";
   private static final String JAVA_KEY_CLASS_PROP = "java-key-class";
 
-  protected T currentRecord;
+  protected T currentRecord = null;
+  private ParentValueContainer rootContainer = null;
   private final Converter[] converters;
 
   private final Schema avroSchema;
@@ -80,6 +88,15 @@ class AvroRecordConverter<T> extends AvroConverters.AvroGroupConverter {
   public AvroRecordConverter(MessageType parquetSchema, Schema avroSchema,
                              GenericData baseModel) {
     this(null, parquetSchema, avroSchema, baseModel);
+    LogicalType logicalType = avroSchema.getLogicalType();
+    Conversion<?> conversion = baseModel.getConversionFor(logicalType);
+    this.rootContainer = ParentValueContainer.getConversionContainer(new ParentValueContainer() {
+      @Override
+      @SuppressWarnings("unchecked")
+      public void add(Object value) {
+        AvroRecordConverter.this.currentRecord = (T) value;
+      }
+    }, conversion, avroSchema);
   }
 
   public AvroRecordConverter(ParentValueContainer parent,
@@ -101,6 +118,8 @@ class AvroRecordConverter<T> extends AvroConverters.AvroGroupConverter {
       recordClass = getDatumClass(avroSchema, model);
     }
 
+    Map<String, Class<?>> fields = getFieldsByName(recordClass, false);
+
     int parquetFieldIndex = 0;
     for (Type parquetField: parquetSchema.getFields()) {
       final Schema.Field avroField = getAvroField(parquetField.getName());
@@ -112,8 +131,10 @@ class AvroRecordConverter<T> extends AvroConverters.AvroGroupConverter {
           AvroRecordConverter.this.set(avroField.name(), finalAvroIndex, value);
         }
       };
+
+      Class<?> fieldClass = fields.get(avroField.name());
       converters[parquetFieldIndex] = newConverter(
-          nonNullSchema, parquetField, this.model, container);
+          nonNullSchema, parquetField, this.model, fieldClass, container);
 
       // @Stringable doesn't affect the reflected schema; must be enforced here
       if (recordClass != null &&
@@ -147,6 +168,43 @@ class AvroRecordConverter<T> extends AvroConverters.AvroGroupConverter {
     }
   }
 
+  // this was taken from Avro's ReflectData
+  private static Map<String, Class<?>> getFieldsByName(Class<?> recordClass,
+                                                       boolean excludeJava) {
+    Map<String, Class<?>> fields = new LinkedHashMap<String, Class<?>>();
+
+    if (recordClass != null) {
+      Class<?> current = recordClass;
+      do {
+        if (excludeJava && current.getPackage() != null
+            && current.getPackage().getName().startsWith("java.")) {
+          break; // skip java built-in classes
+        }
+        for (Field field : current.getDeclaredFields()) {
+          if (field.isAnnotationPresent(AvroIgnore.class) ||
+              isTransientOrStatic(field)) {
+            continue;
+          }
+          AvroName altName = field.getAnnotation(AvroName.class);
+          Class<?> existing = fields.put(
+              altName != null ? altName.value() : field.getName(),
+              field.getType());
+          if (existing != null) {
+            throw new AvroTypeException(
+                current + " contains two fields named: " + field.getName());
+          }
+        }
+        current = current.getSuperclass();
+      } while (current != null);
+    }
+
+    return fields;
+  }
+
+  private static boolean isTransientOrStatic(Field field) {
+    return (field.getModifiers() & (Modifier.TRANSIENT | Modifier.STATIC)) != 0;
+  }
+
   private Schema.Field getAvroField(String parquetFieldName) {
     Schema.Field avroField = avroSchema.getField(parquetFieldName);
     if (avroField != null) {
@@ -164,12 +222,28 @@ class AvroRecordConverter<T> extends AvroConverters.AvroGroupConverter {
         parquetFieldName));
   }
 
+  private static Converter newConverter(
+      Schema schema, Type type, GenericData model, ParentValueContainer setter) {
+    return newConverter(schema, type, model, null, setter);
+  }
+
   private static Converter newConverter(Schema schema, Type type,
-      GenericData model, ParentValueContainer parent) {
+      GenericData model, Class<?> knownClass, ParentValueContainer setter) {
+    LogicalType logicalType = schema.getLogicalType();
+    Conversion<?> conversion;
+    if (knownClass != null) {
+      conversion = model.getConversionByClass(knownClass, logicalType);
+    } else {
+      conversion = model.getConversionFor(logicalType);
+    }
+
+    ParentValueContainer parent = ParentValueContainer
+        .getConversionContainer(setter, conversion, schema);
+
     if (schema.getType().equals(Schema.Type.BOOLEAN)) {
       return new AvroConverters.FieldBooleanConverter(parent);
     } else if (schema.getType().equals(Schema.Type.INT)) {
-      Class<?> datumClass = getDatumClass(schema, model);
+      Class<?> datumClass = getDatumClass(conversion, knownClass, schema, model);
       if (datumClass == null) {
         return new AvroConverters.FieldIntegerConverter(parent);
       } else if (datumClass == byte.class || datumClass == Byte.class) {
@@ -187,7 +261,7 @@ class AvroRecordConverter<T> extends AvroConverters.AvroGroupConverter {
     } else if (schema.getType().equals(Schema.Type.DOUBLE)) {
       return new AvroConverters.FieldDoubleConverter(parent);
     } else if (schema.getType().equals(Schema.Type.BYTES)) {
-      Class<?> datumClass = getDatumClass(schema, model);
+      Class<?> datumClass = getDatumClass(conversion, knownClass, schema, model);
       if (datumClass == null) {
         return new AvroConverters.FieldByteBufferConverter(parent);
       } else if (datumClass.isArray() && datumClass.getComponentType() == byte.class) {
@@ -201,7 +275,7 @@ class AvroRecordConverter<T> extends AvroConverters.AvroGroupConverter {
     } else if (schema.getType().equals(Schema.Type.ENUM)) {
       return new AvroConverters.FieldEnumConverter(parent, schema, model);
     } else if (schema.getType().equals(Schema.Type.ARRAY)) {
-      Class<?> datumClass = getDatumClass(schema, model);
+      Class<?> datumClass = getDatumClass(conversion, knownClass, schema, model);
       if (datumClass != null && datumClass.isArray()) {
         return new AvroArrayConverter(
             parent, type.asGroupType(), schema, model, datumClass);
@@ -265,8 +339,24 @@ class AvroRecordConverter<T> extends AvroConverters.AvroGroupConverter {
     }
   }
 
-  @SuppressWarnings("unchecked")
   private static <T> Class<T> getDatumClass(Schema schema, GenericData model) {
+    return getDatumClass(null, null, schema, model);
+  }
+
+  @SuppressWarnings("unchecked")
+  private static <T> Class<T> getDatumClass(Conversion<?> conversion,
+                                            Class<T> knownClass,
+                                            Schema schema, GenericData model) {
+    if (conversion != null) {
+      // use generic classes to pass data to conversions
+      return null;
+    }
+
+    // known class can be set when using reflect
+    if (knownClass != null) {
+      return knownClass;
+    }
+
     if (model instanceof SpecificData) {
       // this works for reflect as well
       return ((SpecificData) model).getClass(schema);
@@ -314,6 +404,9 @@ class AvroRecordConverter<T> extends AvroConverters.AvroGroupConverter {
     fillInDefaults();
     if (parent != null) {
       parent.add(currentRecord);
+    } else {
+      // this applies any converters needed for the root value
+      rootContainer.add(currentRecord);
     }
   }
 
@@ -502,10 +595,10 @@ class AvroRecordConverter<T> extends AvroConverters.AvroGroupConverter {
       // matching it against the element schema.
       if (isElementType(repeatedType, elementSchema)) {
         // the element type is the repeated type (and required)
-        converter = newConverter(elementSchema, repeatedType, model, setter);
+        converter = newConverter(elementSchema, repeatedType, model, elementClass, setter);
       } else {
         // the element is wrapped in a synthetic group and may be optional
-        converter = new PrimitiveElementConverter(
+        converter = new ArrayElementConverter(
             repeatedType.asGroupType(), elementSchema, model, setter);
       }
     }
@@ -643,20 +736,20 @@ class AvroRecordConverter<T> extends AvroConverters.AvroGroupConverter {
      *   }
      * </pre>
      */
-    final class PrimitiveElementConverter extends GroupConverter {
+    final class ArrayElementConverter extends GroupConverter {
       private boolean isSet;
       private final Converter elementConverter;
 
-      public PrimitiveElementConverter(GroupType repeatedType,
-                                       Schema elementSchema, GenericData model,
-                                       final ParentValueContainer setter) {
+      public ArrayElementConverter(GroupType repeatedType,
+                                   Schema elementSchema, GenericData model,
+                                   final ParentValueContainer setter) {
         Type elementType = repeatedType.getType(0);
         Preconditions.checkArgument(
             !elementClass.isPrimitive() || elementType.isRepetition(REQUIRED),
             "Cannot convert list of optional elements to primitive array");
         Schema nonNullElementSchema = AvroSchemaConverter.getNonNull(elementSchema);
         this.elementConverter = newConverter(
-            nonNullElementSchema, elementType, model, new ParentValueContainer() {
+            nonNullElementSchema, elementType, model, elementClass, new ParentValueContainer() {
               @Override
               public void add(Object value) {
                 isSet = true;

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b24a1d1/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java
----------------------------------------------------------------------
diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java
index 6cfa8d1..6b9b94c 100644
--- a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java
+++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java
@@ -18,20 +18,26 @@
  */
 package org.apache.parquet.avro;
 
-import java.util.*;
-
+import org.apache.avro.LogicalType;
+import org.apache.avro.LogicalTypes;
 import org.apache.avro.Schema;
 
 import org.apache.hadoop.conf.Configuration;
-import org.codehaus.jackson.node.NullNode;
 import org.apache.parquet.schema.ConversionPatterns;
+import org.apache.parquet.schema.DecimalMetadata;
 import org.apache.parquet.schema.GroupType;
 import org.apache.parquet.schema.MessageType;
 import org.apache.parquet.schema.OriginalType;
 import org.apache.parquet.schema.PrimitiveType;
-import org.apache.parquet.schema.Type;
 import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
+import org.apache.parquet.schema.Type;
+import org.apache.parquet.schema.Types;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
 
+import static org.apache.avro.JsonProperties.NULL_VALUE;
 import static org.apache.parquet.avro.AvroWriteSupport.WRITE_OLD_LIST_STRUCTURE;
 import static org.apache.parquet.avro.AvroWriteSupport.WRITE_OLD_LIST_STRUCTURE_DEFAULT;
 import static org.apache.parquet.schema.OriginalType.*;
@@ -113,26 +119,28 @@ public class AvroSchemaConverter {
     return convertField(fieldName, schema, Type.Repetition.REQUIRED);
   }
 
+  @SuppressWarnings("deprecation")
   private Type convertField(String fieldName, Schema schema, Type.Repetition repetition) {
+    Types.PrimitiveBuilder<PrimitiveType> builder;
     Schema.Type type = schema.getType();
     if (type.equals(Schema.Type.BOOLEAN)) {
-      return primitive(fieldName, BOOLEAN, repetition);
+      builder = Types.primitive(BOOLEAN, repetition);
     } else if (type.equals(Schema.Type.INT)) {
-      return primitive(fieldName, INT32, repetition);
+      builder = Types.primitive(INT32, repetition);
     } else if (type.equals(Schema.Type.LONG)) {
-      return primitive(fieldName, INT64, repetition);
+      builder = Types.primitive(INT64, repetition);
     } else if (type.equals(Schema.Type.FLOAT)) {
-      return primitive(fieldName, FLOAT, repetition);
+      builder = Types.primitive(FLOAT, repetition);
     } else if (type.equals(Schema.Type.DOUBLE)) {
-      return primitive(fieldName, DOUBLE, repetition);
+      builder = Types.primitive(DOUBLE, repetition);
     } else if (type.equals(Schema.Type.BYTES)) {
-      return primitive(fieldName, BINARY, repetition);
+      builder = Types.primitive(BINARY, repetition);
     } else if (type.equals(Schema.Type.STRING)) {
-      return primitive(fieldName, BINARY, repetition, UTF8);
+      builder = Types.primitive(BINARY, repetition).as(UTF8);
     } else if (type.equals(Schema.Type.RECORD)) {
       return new GroupType(repetition, fieldName, convertFields(schema.getFields()));
     } else if (type.equals(Schema.Type.ENUM)) {
-      return primitive(fieldName, BINARY, repetition, ENUM);
+      builder = Types.primitive(BINARY, repetition).as(ENUM);
     } else if (type.equals(Schema.Type.ARRAY)) {
       if (writeOldListStructure) {
         return ConversionPatterns.listType(repetition, fieldName,
@@ -146,16 +154,36 @@ public class AvroSchemaConverter {
       // avro map key type is always string
       return ConversionPatterns.stringKeyMapType(repetition, fieldName, valType);
     } else if (type.equals(Schema.Type.FIXED)) {
-      return primitive(fieldName, FIXED_LEN_BYTE_ARRAY, repetition,
-                       schema.getFixedSize(), null);
+      builder = Types.primitive(FIXED_LEN_BYTE_ARRAY, repetition)
+          .length(schema.getFixedSize());
     } else if (type.equals(Schema.Type.UNION)) {
       return convertUnion(fieldName, schema, repetition);
+    } else {
+      throw new UnsupportedOperationException("Cannot convert Avro type " + type);
     }
-    throw new UnsupportedOperationException("Cannot convert Avro type " + type);
+
+    // schema translation can only be done for known logical types because this
+    // creates an equivalence
+    LogicalType logicalType = schema.getLogicalType();
+    if (logicalType != null) {
+      if (logicalType instanceof LogicalTypes.Decimal) {
+        builder = builder.as(DECIMAL)
+            .precision(((LogicalTypes.Decimal) logicalType).getPrecision())
+            .scale(((LogicalTypes.Decimal) logicalType).getScale());
+
+      } else {
+        OriginalType annotation = convertLogicalType(logicalType);
+        if (annotation != null) {
+          builder.as(annotation);
+        }
+      }
+    }
+
+    return builder.named(fieldName);
   }
 
   private Type convertUnion(String fieldName, Schema schema, Type.Repetition repetition) {
-    List<Schema> nonNullSchemas = new ArrayList(schema.getTypes().size());
+    List<Schema> nonNullSchemas = new ArrayList<Schema>(schema.getTypes().size());
     for (Schema childSchema : schema.getTypes()) {
       if (childSchema.getType().equals(Schema.Type.NULL)) {
         if (Type.Repetition.REQUIRED == repetition) {
@@ -175,7 +203,7 @@ public class AvroSchemaConverter {
         return convertField(fieldName, nonNullSchemas.get(0), repetition);
 
       default: // complex union type
-        List<Type> unionTypes = new ArrayList(nonNullSchemas.size());
+        List<Type> unionTypes = new ArrayList<Type>(nonNullSchemas.size());
         int index = 0;
         for (Schema childSchema : nonNullSchemas) {
           unionTypes.add( convertField("member" + index++, childSchema, Type.Repetition.OPTIONAL));
@@ -188,24 +216,6 @@ public class AvroSchemaConverter {
     return convertField(field.name(), field.schema());
   }
 
-  private PrimitiveType primitive(String name,
-      PrimitiveType.PrimitiveTypeName primitive, Type.Repetition repetition,
-      int typeLength, OriginalType originalType) {
-    return new PrimitiveType(repetition, primitive, typeLength, name,
-                             originalType);
-  }
-
-  private PrimitiveType primitive(String name,
-      PrimitiveType.PrimitiveTypeName primitive, Type.Repetition repetition,
-      OriginalType originalType) {
-    return new PrimitiveType(repetition, primitive, name, originalType);
-  }
-
-  private PrimitiveType primitive(String name,
-      PrimitiveType.PrimitiveTypeName primitive, Type.Repetition repetition) {
-    return new PrimitiveType(repetition, primitive, name, null);
-  }
-
   public Schema convert(MessageType parquetSchema) {
     return convertFields(parquetSchema.getName(), parquetSchema.getFields());
   }
@@ -217,10 +227,11 @@ public class AvroSchemaConverter {
       if (parquetType.isRepetition(REPEATED)) {
         throw new UnsupportedOperationException("REPEATED not supported outside LIST or MAP. Type: " + parquetType);
       } else if (parquetType.isRepetition(Type.Repetition.OPTIONAL)) {
-        fields.add(new Schema.Field(parquetType.getName(), optional(fieldSchema), null,
-            NullNode.getInstance()));
+        fields.add(new Schema.Field(
+            parquetType.getName(), optional(fieldSchema), null, NULL_VALUE));
       } else { // REQUIRED
-        fields.add(new Schema.Field(parquetType.getName(), fieldSchema, null, null));
+        fields.add(new Schema.Field(
+            parquetType.getName(), fieldSchema, null, (Object) null));
       }
     }
     Schema schema = Schema.createRecord(name, null, null, false);
@@ -230,10 +241,11 @@ public class AvroSchemaConverter {
 
   private Schema convertField(final Type parquetType) {
     if (parquetType.isPrimitive()) {
+      final PrimitiveType asPrimitive = parquetType.asPrimitiveType();
       final PrimitiveTypeName parquetPrimitiveTypeName =
-          parquetType.asPrimitiveType().getPrimitiveTypeName();
-      final OriginalType originalType = parquetType.getOriginalType();
-      return parquetPrimitiveTypeName.convert(
+          asPrimitive.getPrimitiveTypeName();
+      final OriginalType annotation = parquetType.getOriginalType();
+      Schema schema = parquetPrimitiveTypeName.convert(
           new PrimitiveType.PrimitiveTypeNameConverter<Schema, RuntimeException>() {
             @Override
             public Schema convertBOOLEAN(PrimitiveTypeName primitiveTypeName) {
@@ -266,13 +278,24 @@ public class AvroSchemaConverter {
             }
             @Override
             public Schema convertBINARY(PrimitiveTypeName primitiveTypeName) {
-              if (originalType == OriginalType.UTF8 || originalType == OriginalType.ENUM) {
+              if (annotation == OriginalType.UTF8 || annotation == OriginalType.ENUM) {
                 return Schema.create(Schema.Type.STRING);
               } else {
                 return Schema.create(Schema.Type.BYTES);
               }
             }
           });
+
+      LogicalType logicalType = convertOriginalType(
+          annotation, asPrimitive.getDecimalMetadata());
+      if (logicalType != null && (annotation != DECIMAL ||
+          parquetPrimitiveTypeName == BINARY ||
+          parquetPrimitiveTypeName == FIXED_LEN_BYTE_ARRAY)) {
+        schema = logicalType.addToSchema(schema);
+      }
+
+      return schema;
+
     } else {
       GroupType parquetGroupType = parquetType.asGroupType();
       OriginalType originalType = parquetGroupType.getOriginalType();
@@ -335,6 +358,46 @@ public class AvroSchemaConverter {
     }
   }
 
+  private OriginalType convertLogicalType(LogicalType logicalType) {
+    if (logicalType == null) {
+      return null;
+    } else if (logicalType instanceof LogicalTypes.Decimal) {
+      return OriginalType.DECIMAL;
+    } else if (logicalType instanceof LogicalTypes.Date) {
+      return OriginalType.DATE;
+    } else if (logicalType instanceof LogicalTypes.TimeMillis) {
+      return OriginalType.TIME_MILLIS;
+    } else if (logicalType instanceof LogicalTypes.TimeMicros) {
+      return OriginalType.TIME_MICROS;
+    } else if (logicalType instanceof LogicalTypes.TimestampMillis) {
+      return OriginalType.TIMESTAMP_MILLIS;
+    } else if (logicalType instanceof LogicalTypes.TimestampMicros) {
+      return OriginalType.TIMESTAMP_MICROS;
+    }
+    return null;
+  }
+
+  private LogicalType convertOriginalType(OriginalType annotation, DecimalMetadata meta) {
+    if (annotation == null) {
+      return null;
+    }
+    switch (annotation) {
+      case DECIMAL:
+        return LogicalTypes.decimal(meta.getPrecision(), meta.getScale());
+      case DATE:
+        return LogicalTypes.date();
+      case TIME_MILLIS:
+        return LogicalTypes.timeMillis();
+      case TIME_MICROS:
+        return LogicalTypes.timeMicros();
+      case TIMESTAMP_MILLIS:
+        return LogicalTypes.timestampMillis();
+      case TIMESTAMP_MICROS:
+        return LogicalTypes.timestampMicros();
+    }
+    return null;
+  }
+
   /**
    * Implements the rules for interpreting existing data from the logical type
    * spec for the LIST annotation. This is used to produce the expected schema.

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b24a1d1/parquet-avro/src/main/java/org/apache/parquet/avro/AvroWriteSupport.java
----------------------------------------------------------------------
diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroWriteSupport.java b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroWriteSupport.java
index c75bb03..7fcd88e 100644
--- a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroWriteSupport.java
+++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroWriteSupport.java
@@ -23,6 +23,8 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import org.apache.avro.Conversion;
+import org.apache.avro.LogicalType;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericFixed;
@@ -69,6 +71,8 @@ public class AvroWriteSupport<T> extends WriteSupport<T> {
   private RecordConsumer recordConsumer;
   private MessageType rootSchema;
   private Schema rootAvroSchema;
+  private LogicalType rootLogicalType;
+  private Conversion<?> rootConversion;
   private GenericData model;
   private ListWriter listWriter;
 
@@ -82,6 +86,7 @@ public class AvroWriteSupport<T> extends WriteSupport<T> {
   public AvroWriteSupport(MessageType schema, Schema avroSchema) {
     this.rootSchema = schema;
     this.rootAvroSchema = avroSchema;
+    this.rootLogicalType = rootAvroSchema.getLogicalType();
     this.model = null;
   }
 
@@ -89,6 +94,7 @@ public class AvroWriteSupport<T> extends WriteSupport<T> {
                           GenericData model) {
     this.rootSchema = schema;
     this.rootAvroSchema = avroSchema;
+    this.rootLogicalType = rootAvroSchema.getLogicalType();
     this.model = model;
   }
 
@@ -136,16 +142,25 @@ public class AvroWriteSupport<T> extends WriteSupport<T> {
   // overloaded version for backward compatibility
   @SuppressWarnings("unchecked")
   public void write(IndexedRecord record) {
-    recordConsumer.startMessage();
-    writeRecordFields(rootSchema, rootAvroSchema, record);
-    recordConsumer.endMessage();
+    write((T) record);
   }
 
   @Override
   public void write(T record) {
-    recordConsumer.startMessage();
-    writeRecordFields(rootSchema, rootAvroSchema, record);
-    recordConsumer.endMessage();
+    if (rootLogicalType != null) {
+      Conversion<?> conversion = model.getConversionByClass(
+          record.getClass(), rootLogicalType);
+
+      recordConsumer.startMessage();
+      writeRecordFields(rootSchema, rootAvroSchema,
+          convert(rootAvroSchema, rootLogicalType, conversion, record));
+      recordConsumer.endMessage();
+
+    } else {
+      recordConsumer.startMessage();
+      writeRecordFields(rootSchema, rootAvroSchema, record);
+      recordConsumer.endMessage();
+    }
   }
 
   private void writeRecord(GroupType schema, Schema avroSchema,
@@ -226,6 +241,8 @@ public class AvroWriteSupport<T> extends WriteSupport<T> {
       }
     }
 
+    // TODO: what if the value is null?
+
     // Sparsely populated method of encoding unions, each member has its own
     // set of columns.
     String memberName = "member" + parquetIndex;
@@ -237,44 +254,108 @@ public class AvroWriteSupport<T> extends WriteSupport<T> {
     recordConsumer.endGroup();
   }
 
-  @SuppressWarnings("unchecked")
+  /**
+   * Calls an appropriate write method based on the value.
+   * Value MUST not be null.
+   *
+   * @param type the Parquet type
+   * @param avroSchema the Avro schema
+   * @param value a non-null value to write
+   */
   private void writeValue(Type type, Schema avroSchema, Object value) {
     Schema nonNullAvroSchema = AvroSchemaConverter.getNonNull(avroSchema);
-    Schema.Type avroType = nonNullAvroSchema.getType();
-    if (avroType.equals(Schema.Type.BOOLEAN)) {
-      recordConsumer.addBoolean((Boolean) value);
-    } else if (avroType.equals(Schema.Type.INT)) {
-      if (value instanceof Character) {
-        recordConsumer.addInteger((Character) value);
-      } else {
-        recordConsumer.addInteger(((Number) value).intValue());
-      }
-    } else if (avroType.equals(Schema.Type.LONG)) {
-      recordConsumer.addLong(((Number) value).longValue());
-    } else if (avroType.equals(Schema.Type.FLOAT)) {
-      recordConsumer.addFloat(((Number) value).floatValue());
-    } else if (avroType.equals(Schema.Type.DOUBLE)) {
-      recordConsumer.addDouble(((Number) value).doubleValue());
-    } else if (avroType.equals(Schema.Type.BYTES)) {
-      if (value instanceof byte[]) {
-        recordConsumer.addBinary(Binary.fromReusedByteArray((byte[]) value));
-      } else {
-        recordConsumer.addBinary(Binary.fromReusedByteBuffer((ByteBuffer) value));
-      }
-    } else if (avroType.equals(Schema.Type.STRING)) {
-      recordConsumer.addBinary(fromAvroString(value));
-    } else if (avroType.equals(Schema.Type.RECORD)) {
-      writeRecord(type.asGroupType(), nonNullAvroSchema, value);
-    } else if (avroType.equals(Schema.Type.ENUM)) {
-      recordConsumer.addBinary(Binary.fromString(value.toString()));
-    } else if (avroType.equals(Schema.Type.ARRAY)) {
-      listWriter.writeList(type.asGroupType(), nonNullAvroSchema, value);
-    } else if (avroType.equals(Schema.Type.MAP)) {
-      writeMap(type.asGroupType(), nonNullAvroSchema, (Map<CharSequence, ?>) value);
-    } else if (avroType.equals(Schema.Type.UNION)) {
-      writeUnion(type.asGroupType(), nonNullAvroSchema, value);
-    } else if (avroType.equals(Schema.Type.FIXED)) {
-      recordConsumer.addBinary(Binary.fromReusedByteArray(((GenericFixed) value).bytes()));
+    LogicalType logicalType = nonNullAvroSchema.getLogicalType();
+    if (logicalType != null) {
+      Conversion<?> conversion = model.getConversionByClass(
+          value.getClass(), logicalType);
+      writeValueWithoutConversion(type, nonNullAvroSchema,
+          convert(nonNullAvroSchema, logicalType, conversion, value));
+    } else {
+      writeValueWithoutConversion(type, nonNullAvroSchema, value);
+    }
+  }
+
+  private <D> Object convert(Schema schema, LogicalType logicalType,
+                             Conversion<D> conversion, Object datum) {
+    if (conversion == null) {
+      return datum;
+    }
+    Class<D> fromClass = conversion.getConvertedType();
+    switch (schema.getType()) {
+      case RECORD:  return conversion.toRecord(fromClass.cast(datum), schema, logicalType);
+      case ENUM:    return conversion.toEnumSymbol(fromClass.cast(datum), schema, logicalType);
+      case ARRAY:   return conversion.toArray(fromClass.cast(datum), schema, logicalType);
+      case MAP:     return conversion.toMap(fromClass.cast(datum), schema, logicalType);
+      case FIXED:   return conversion.toFixed(fromClass.cast(datum), schema, logicalType);
+      case STRING:  return conversion.toCharSequence(fromClass.cast(datum), schema, logicalType);
+      case BYTES:   return conversion.toBytes(fromClass.cast(datum), schema, logicalType);
+      case INT:     return conversion.toInt(fromClass.cast(datum), schema, logicalType);
+      case LONG:    return conversion.toLong(fromClass.cast(datum), schema, logicalType);
+      case FLOAT:   return conversion.toFloat(fromClass.cast(datum), schema, logicalType);
+      case DOUBLE:  return conversion.toDouble(fromClass.cast(datum), schema, logicalType);
+      case BOOLEAN: return conversion.toBoolean(fromClass.cast(datum), schema, logicalType);
+    }
+    return datum;
+  }
+
+  /**
+   * Calls an appropriate write method based on the value.
+   * Value must not be null and the schema must not be nullable.
+   *
+   * @param type a Parquet type
+   * @param avroSchema a non-nullable Avro schema
+   * @param value a non-null value to write
+   */
+  @SuppressWarnings("unchecked")
+  private void writeValueWithoutConversion(Type type, Schema avroSchema, Object value) {
+    switch (avroSchema.getType()) {
+      case BOOLEAN:
+        recordConsumer.addBoolean((Boolean) value);
+        break;
+      case INT:
+        if (value instanceof Character) {
+          recordConsumer.addInteger((Character) value);
+        } else {
+          recordConsumer.addInteger(((Number) value).intValue());
+        }
+        break;
+      case LONG:
+        recordConsumer.addLong(((Number) value).longValue());
+        break;
+      case FLOAT:
+        recordConsumer.addFloat(((Number) value).floatValue());
+        break;
+      case DOUBLE:
+        recordConsumer.addDouble(((Number) value).doubleValue());
+        break;
+      case FIXED:
+        recordConsumer.addBinary(Binary.fromReusedByteArray(((GenericFixed) value).bytes()));
+        break;
+      case BYTES:
+        if (value instanceof byte[]) {
+          recordConsumer.addBinary(Binary.fromReusedByteArray((byte[]) value));
+        } else {
+          recordConsumer.addBinary(Binary.fromReusedByteBuffer((ByteBuffer) value));
+        }
+        break;
+      case STRING:
+        recordConsumer.addBinary(fromAvroString(value));
+        break;
+      case RECORD:
+        writeRecord(type.asGroupType(), avroSchema, value);
+        break;
+      case ENUM:
+        recordConsumer.addBinary(Binary.fromString(value.toString()));
+        break;
+      case ARRAY:
+        listWriter.writeList(type.asGroupType(), avroSchema, value);
+        break;
+      case MAP:
+        writeMap(type.asGroupType(), avroSchema, (Map<CharSequence, ?>) value);
+        break;
+      case UNION:
+        writeUnion(type.asGroupType(), avroSchema, value);
+        break;
     }
   }
 
@@ -283,7 +364,7 @@ public class AvroWriteSupport<T> extends WriteSupport<T> {
       Utf8 utf8 = (Utf8) value;
       return Binary.fromReusedByteArray(utf8.getBytes(), 0, utf8.getByteLength());
     }
-    return Binary.fromString(value.toString());
+    return Binary.fromString((CharSequence) value);
   }
 
   private static GenericData getDataModel(Configuration conf) {

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b24a1d1/parquet-avro/src/main/java/org/apache/parquet/avro/ParentValueContainer.java
----------------------------------------------------------------------
diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/ParentValueContainer.java b/parquet-avro/src/main/java/org/apache/parquet/avro/ParentValueContainer.java
index 67b710d..f36f5fc 100644
--- a/parquet-avro/src/main/java/org/apache/parquet/avro/ParentValueContainer.java
+++ b/parquet-avro/src/main/java/org/apache/parquet/avro/ParentValueContainer.java
@@ -18,6 +18,16 @@
  */
 package org.apache.parquet.avro;
 
+import org.apache.avro.Conversion;
+import org.apache.avro.LogicalType;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericEnumSymbol;
+import org.apache.avro.generic.IndexedRecord;
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Map;
+
 abstract class ParentValueContainer {
 
   /**
@@ -60,4 +70,169 @@ abstract class ParentValueContainer {
     add(value);
   }
 
+  static class LogicalTypePrimitiveContainer extends ParentValueContainer {
+    private final ParentValueContainer wrapped;
+    private final Schema schema;
+    private final LogicalType logicalType;
+    private final Conversion conversion;
+
+    public LogicalTypePrimitiveContainer(ParentValueContainer wrapped,
+                                         Schema schema, Conversion conversion) {
+      this.wrapped = wrapped;
+      this.schema = schema;
+      this.logicalType = schema.getLogicalType();
+      this.conversion = conversion;
+    }
+
+    @Override
+    public void addDouble(double value) {
+      wrapped.add(conversion.fromDouble(value, schema, logicalType));
+    }
+
+    @Override
+    public void addFloat(float value) {
+      wrapped.add(conversion.fromFloat(value, schema, logicalType));
+    }
+
+    @Override
+    public void addLong(long value) {
+      wrapped.add(conversion.fromLong(value, schema, logicalType));
+    }
+
+    @Override
+    public void addInt(int value) {
+      wrapped.add(conversion.fromInt(value, schema, logicalType));
+    }
+
+    @Override
+    public void addShort(short value) {
+      wrapped.add(conversion.fromInt((int) value, schema, logicalType));
+    }
+
+    @Override
+    public void addChar(char value) {
+      wrapped.add(conversion.fromInt((int) value, schema, logicalType));
+    }
+
+    @Override
+    public void addByte(byte value) {
+      wrapped.add(conversion.fromInt((int) value, schema, logicalType));
+    }
+
+    @Override
+    public void addBoolean(boolean value) {
+      wrapped.add(conversion.fromBoolean(value, schema, logicalType));
+    }
+  }
+
+  static ParentValueContainer getConversionContainer(
+      final ParentValueContainer parent, final Conversion<?> conversion,
+      final Schema schema) {
+    if (conversion == null) {
+      return parent;
+    }
+
+    final LogicalType logicalType = schema.getLogicalType();
+
+    switch (schema.getType()) {
+      case STRING:
+        return new ParentValueContainer() {
+          @Override
+          public void add(Object value) {
+            parent.add(conversion.fromCharSequence(
+                (CharSequence) value, schema, logicalType));
+          }
+        };
+      case BOOLEAN:
+        return new LogicalTypePrimitiveContainer(parent, schema, conversion) {
+          @Override
+          public void add(Object value) {
+            parent.add(conversion.fromBoolean(
+                (Boolean) value, schema, logicalType));
+          }
+        };
+      case INT:
+        return new LogicalTypePrimitiveContainer(parent, schema, conversion) {
+          @Override
+          public void add(Object value) {
+            parent.add(conversion.fromInt(
+                (Integer) value, schema, logicalType));
+          }
+        };
+      case LONG:
+        return new LogicalTypePrimitiveContainer(parent, schema, conversion) {
+          @Override
+          public void add(Object value) {
+            parent.add(conversion.fromLong(
+                (Long) value, schema, logicalType));
+          }
+        };
+      case FLOAT:
+        return new LogicalTypePrimitiveContainer(parent, schema, conversion) {
+          @Override
+          public void add(Object value) {
+            parent.add(conversion.fromFloat(
+                (Float) value, schema, logicalType));
+          }
+        };
+      case DOUBLE:
+        return new LogicalTypePrimitiveContainer(parent, schema, conversion) {
+          @Override
+          public void add(Object value) {
+            parent.add(conversion.fromDouble(
+                (Double) value, schema, logicalType));
+          }
+        };
+      case BYTES:
+        return new ParentValueContainer() {
+          @Override
+          public void add(Object value) {
+            parent.add(conversion.fromBytes(
+                (ByteBuffer) value, schema, logicalType));
+          }
+        };
+      case FIXED:
+        return new ParentValueContainer() {
+          @Override
+          public void add(Object value) {
+            parent.add(conversion.fromFixed(
+                (GenericData.Fixed) value, schema, logicalType));
+          }
+        };
+      case RECORD:
+        return new ParentValueContainer() {
+          @Override
+          public void add(Object value) {
+            parent.add(conversion.fromRecord(
+                (IndexedRecord) value, schema, logicalType));
+          }
+        };
+      case ARRAY:
+        return new ParentValueContainer() {
+          @Override
+          public void add(Object value) {
+            parent.add(conversion.fromArray(
+                (Collection<?>) value, schema, logicalType));
+          }
+        };
+      case MAP:
+        return new ParentValueContainer() {
+          @Override
+          public void add(Object value) {
+            parent.add(conversion.fromMap(
+                (Map<?, ?>) value, schema, logicalType));
+          }
+        };
+      case ENUM:
+        return new ParentValueContainer() {
+          @Override
+          public void add(Object value) {
+            parent.add(conversion.fromEnumSymbol(
+                (GenericEnumSymbol) value, schema, logicalType));
+          }
+        };
+      default:
+        return new LogicalTypePrimitiveContainer(parent, schema, conversion);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b24a1d1/parquet-avro/src/main/resources/META-INF/LICENSE
----------------------------------------------------------------------
diff --git a/parquet-avro/src/main/resources/META-INF/LICENSE b/parquet-avro/src/main/resources/META-INF/LICENSE
new file mode 100644
index 0000000..20b23c9
--- /dev/null
+++ b/parquet-avro/src/main/resources/META-INF/LICENSE
@@ -0,0 +1,186 @@
+
+                                 Apache License
+                           Version 2.0, January 2004
+                        http://www.apache.org/licenses/
+
+   TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+   1. Definitions.
+
+      "License" shall mean the terms and conditions for use, reproduction,
+      and distribution as defined by Sections 1 through 9 of this document.
+
+      "Licensor" shall mean the copyright owner or entity authorized by
+      the copyright owner that is granting the License.
+
+      "Legal Entity" shall mean the union of the acting entity and all
+      other entities that control, are controlled by, or are under common
+      control with that entity. For the purposes of this definition,
+      "control" means (i) the power, direct or indirect, to cause the
+      direction or management of such entity, whether by contract or
+      otherwise, or (ii) ownership of fifty percent (50%) or more of the
+      outstanding shares, or (iii) beneficial ownership of such entity.
+
+      "You" (or "Your") shall mean an individual or Legal Entity
+      exercising permissions granted by this License.
+
+      "Source" form shall mean the preferred form for making modifications,
+      including but not limited to software source code, documentation
+      source, and configuration files.
+
+      "Object" form shall mean any form resulting from mechanical
+      transformation or translation of a Source form, including but
+      not limited to compiled object code, generated documentation,
+      and conversions to other media types.
+
+      "Work" shall mean the work of authorship, whether in Source or
+      Object form, made available under the License, as indicated by a
+      copyright notice that is included in or attached to the work
+      (an example is provided in the Appendix below).
+
+      "Derivative Works" shall mean any work, whether in Source or Object
+      form, that is based on (or derived from) the Work and for which the
+      editorial revisions, annotations, elaborations, or other modifications
+      represent, as a whole, an original work of authorship. For the purposes
+      of this License, Derivative Works shall not include works that remain
+      separable from, or merely link (or bind by name) to the interfaces of,
+      the Work and Derivative Works thereof.
+
+      "Contribution" shall mean any work of authorship, including
+      the original version of the Work and any modifications or additions
+      to that Work or Derivative Works thereof, that is intentionally
+      submitted to Licensor for inclusion in the Work by the copyright owner
+      or by an individual or Legal Entity authorized to submit on behalf of
+      the copyright owner. For the purposes of this definition, "submitted"
+      means any form of electronic, verbal, or written communication sent
+      to the Licensor or its representatives, including but not limited to
+      communication on electronic mailing lists, source code control systems,
+      and issue tracking systems that are managed by, or on behalf of, the
+      Licensor for the purpose of discussing and improving the Work, but
+      excluding communication that is conspicuously marked or otherwise
+      designated in writing by the copyright owner as "Not a Contribution."
+
+      "Contributor" shall mean Licensor and any individual or Legal Entity
+      on behalf of whom a Contribution has been received by Licensor and
+      subsequently incorporated within the Work.
+
+   2. Grant of Copyright License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      copyright license to reproduce, prepare Derivative Works of,
+      publicly display, publicly perform, sublicense, and distribute the
+      Work and such Derivative Works in Source or Object form.
+
+   3. Grant of Patent License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      (except as stated in this section) patent license to make, have made,
+      use, offer to sell, sell, import, and otherwise transfer the Work,
+      where such license applies only to those patent claims licensable
+      by such Contributor that are necessarily infringed by their
+      Contribution(s) alone or by combination of their Contribution(s)
+      with the Work to which such Contribution(s) was submitted. If You
+      institute patent litigation against any entity (including a
+      cross-claim or counterclaim in a lawsuit) alleging that the Work
+      or a Contribution incorporated within the Work constitutes direct
+      or contributory patent infringement, then any patent licenses
+      granted to You under this License for that Work shall terminate
+      as of the date such litigation is filed.
+
+   4. Redistribution. You may reproduce and distribute copies of the
+      Work or Derivative Works thereof in any medium, with or without
+      modifications, and in Source or Object form, provided that You
+      meet the following conditions:
+
+      (a) You must give any other recipients of the Work or
+          Derivative Works a copy of this License; and
+
+      (b) You must cause any modified files to carry prominent notices
+          stating that You changed the files; and
+
+      (c) You must retain, in the Source form of any Derivative Works
+          that You distribute, all copyright, patent, trademark, and
+          attribution notices from the Source form of the Work,
+          excluding those notices that do not pertain to any part of
+          the Derivative Works; and
+
+      (d) If the Work includes a "NOTICE" text file as part of its
+          distribution, then any Derivative Works that You distribute must
+          include a readable copy of the attribution notices contained
+          within such NOTICE file, excluding those notices that do not
+          pertain to any part of the Derivative Works, in at least one
+          of the following places: within a NOTICE text file distributed
+          as part of the Derivative Works; within the Source form or
+          documentation, if provided along with the Derivative Works; or,
+          within a display generated by the Derivative Works, if and
+          wherever such third-party notices normally appear. The contents
+          of the NOTICE file are for informational purposes only and
+          do not modify the License. You may add Your own attribution
+          notices within Derivative Works that You distribute, alongside
+          or as an addendum to the NOTICE text from the Work, provided
+          that such additional attribution notices cannot be construed
+          as modifying the License.
+
+      You may add Your own copyright statement to Your modifications and
+      may provide additional or different license terms and conditions
+      for use, reproduction, or distribution of Your modifications, or
+      for any such Derivative Works as a whole, provided Your use,
+      reproduction, and distribution of the Work otherwise complies with
+      the conditions stated in this License.
+
+   5. Submission of Contributions. Unless You explicitly state otherwise,
+      any Contribution intentionally submitted for inclusion in the Work
+      by You to the Licensor shall be under the terms and conditions of
+      this License, without any additional terms or conditions.
+      Notwithstanding the above, nothing herein shall supersede or modify
+      the terms of any separate license agreement you may have executed
+      with Licensor regarding such Contributions.
+
+   6. Trademarks. This License does not grant permission to use the trade
+      names, trademarks, service marks, or product names of the Licensor,
+      except as required for reasonable and customary use in describing the
+      origin of the Work and reproducing the content of the NOTICE file.
+
+   7. Disclaimer of Warranty. Unless required by applicable law or
+      agreed to in writing, Licensor provides the Work (and each
+      Contributor provides its Contributions) on an "AS IS" BASIS,
+      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+      implied, including, without limitation, any warranties or conditions
+      of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+      PARTICULAR PURPOSE. You are solely responsible for determining the
+      appropriateness of using or redistributing the Work and assume any
+      risks associated with Your exercise of permissions under this License.
+
+   8. Limitation of Liability. In no event and under no legal theory,
+      whether in tort (including negligence), contract, or otherwise,
+      unless required by applicable law (such as deliberate and grossly
+      negligent acts) or agreed to in writing, shall any Contributor be
+      liable to You for damages, including any direct, indirect, special,
+      incidental, or consequential damages of any character arising as a
+      result of this License or out of the use or inability to use the
+      Work (including but not limited to damages for loss of goodwill,
+      work stoppage, computer failure or malfunction, or any and all
+      other commercial damages or losses), even if such Contributor
+      has been advised of the possibility of such damages.
+
+   9. Accepting Warranty or Additional Liability. While redistributing
+      the Work or Derivative Works thereof, You may choose to offer,
+      and charge a fee for, acceptance of support, warranty, indemnity,
+      or other liability obligations and/or rights consistent with this
+      License. However, in accepting such obligations, You may act only
+      on Your own behalf and on Your sole responsibility, not on behalf
+      of any other Contributor, and only if You agree to indemnify,
+      defend, and hold each Contributor harmless for any liability
+      incurred by, or claims asserted against, such Contributor by reason
+      of your accepting any such warranty or additional liability.
+
+   END OF TERMS AND CONDITIONS
+
+--------------------------------------------------------------------------------
+
+This product includes code from Apache Avro.
+
+Copyright: 2014 The Apache Software Foundation.
+Home page: https://avro.apache.org/
+License: http://www.apache.org/licenses/LICENSE-2.0
+

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b24a1d1/parquet-avro/src/main/resources/META-INF/NOTICE
----------------------------------------------------------------------
diff --git a/parquet-avro/src/main/resources/META-INF/NOTICE b/parquet-avro/src/main/resources/META-INF/NOTICE
new file mode 100644
index 0000000..7b5682c
--- /dev/null
+++ b/parquet-avro/src/main/resources/META-INF/NOTICE
@@ -0,0 +1,18 @@
+
+Apache Parquet MR (Incubating)
+Copyright 2014-2015 The Apache Software Foundation
+
+This product includes software developed at
+The Apache Software Foundation (http://www.apache.org/).
+
+--------------------------------------------------------------------------------
+
+This product includes code from Apache Avro, which includes the following in
+its NOTICE file:
+
+  Apache Avro
+  Copyright 2010-2015 The Apache Software Foundation
+
+  This product includes software developed at
+  The Apache Software Foundation (http://www.apache.org/).
+

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b24a1d1/parquet-avro/src/test/java/org/apache/parquet/avro/AvroTestUtil.java
----------------------------------------------------------------------
diff --git a/parquet-avro/src/test/java/org/apache/parquet/avro/AvroTestUtil.java b/parquet-avro/src/test/java/org/apache/parquet/avro/AvroTestUtil.java
index d5fe11a..f4682d6 100644
--- a/parquet-avro/src/test/java/org/apache/parquet/avro/AvroTestUtil.java
+++ b/parquet-avro/src/test/java/org/apache/parquet/avro/AvroTestUtil.java
@@ -19,11 +19,21 @@
 package org.apache.parquet.avro;
 
 import com.google.common.collect.Lists;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.List;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.hadoop.ParquetReader;
+import org.apache.parquet.hadoop.ParquetWriter;
 import org.codehaus.jackson.node.NullNode;
+import org.junit.Assert;
+import org.junit.rules.TemporaryFolder;
 
 public class AvroTestUtil {
 
@@ -66,4 +76,47 @@ public class AvroTestUtil {
     return record;
   }
 
+  public static <D> List<D> read(GenericData model, Schema schema, File file) throws IOException {
+    List<D> data = new ArrayList<D>();
+    Configuration conf = new Configuration(false);
+    AvroReadSupport.setRequestedProjection(conf, schema);
+    AvroReadSupport.setAvroReadSchema(conf, schema);
+    ParquetReader<D> fileReader = AvroParquetReader
+        .<D>builder(new Path(file.toString()))
+        .withDataModel(model) // reflect disables compatibility
+        .withConf(conf)
+        .build();
+
+    try {
+      D datum;
+      while ((datum = fileReader.read()) != null) {
+        data.add(datum);
+      }
+    } finally {
+      fileReader.close();
+    }
+
+    return data;
+  }
+
+  @SuppressWarnings("unchecked")
+  public static <D> File write(TemporaryFolder temp, GenericData model, Schema schema, D... data) throws IOException {
+    File file = temp.newFile();
+    Assert.assertTrue(file.delete());
+    ParquetWriter<D> writer = AvroParquetWriter
+        .<D>builder(new Path(file.toString()))
+        .withDataModel(model)
+        .withSchema(schema)
+        .build();
+
+    try {
+      for (D datum : data) {
+        writer.write(datum);
+      }
+    } finally {
+      writer.close();
+    }
+
+    return file;
+  }
 }

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b24a1d1/parquet-avro/src/test/java/org/apache/parquet/avro/TestAvroSchemaConverter.java
----------------------------------------------------------------------
diff --git a/parquet-avro/src/test/java/org/apache/parquet/avro/TestAvroSchemaConverter.java b/parquet-avro/src/test/java/org/apache/parquet/avro/TestAvroSchemaConverter.java
index b393615..942e3b1 100644
--- a/parquet-avro/src/test/java/org/apache/parquet/avro/TestAvroSchemaConverter.java
+++ b/parquet-avro/src/test/java/org/apache/parquet/avro/TestAvroSchemaConverter.java
@@ -20,16 +20,37 @@ package org.apache.parquet.avro;
 
 import com.google.common.collect.Lists;
 import com.google.common.io.Resources;
-import java.util.Arrays;
-import java.util.Collections;
+import org.apache.avro.LogicalTypes;
 import org.apache.avro.Schema;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.MessageTypeParser;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
+import org.apache.parquet.schema.Types;
 import org.codehaus.jackson.node.NullNode;
+import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
-import org.apache.parquet.schema.MessageType;
-import org.apache.parquet.schema.MessageTypeParser;
+import java.util.Arrays;
+import java.util.Collections;
 
+import static org.apache.avro.Schema.Type.INT;
+import static org.apache.avro.Schema.Type.LONG;
+import static org.apache.parquet.schema.OriginalType.DATE;
+import static org.apache.parquet.schema.OriginalType.TIMESTAMP_MICROS;
+import static org.apache.parquet.schema.OriginalType.TIMESTAMP_MILLIS;
+import static org.apache.parquet.schema.OriginalType.TIME_MICROS;
+import static org.apache.parquet.schema.OriginalType.TIME_MILLIS;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BOOLEAN;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.DOUBLE;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FLOAT;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT96;
+import static org.apache.parquet.schema.Type.Repetition.REQUIRED;
 import static org.junit.Assert.assertEquals;
 
 public class TestAvroSchemaConverter {
@@ -131,7 +152,7 @@ public class TestAvroSchemaConverter {
 
   @Test(expected = IllegalArgumentException.class)
   public void testTopLevelMustBeARecord() {
-    new AvroSchemaConverter().convert(Schema.create(Schema.Type.INT));
+    new AvroSchemaConverter().convert(Schema.create(INT));
   }
 
   @Test
@@ -270,7 +291,7 @@ public class TestAvroSchemaConverter {
   @Test
   public void testOptionalFields() throws Exception {
     Schema schema = Schema.createRecord("record1", null, null, false);
-    Schema optionalInt = optional(Schema.create(Schema.Type.INT));
+    Schema optionalInt = optional(Schema.create(INT));
     schema.setFields(Arrays.asList(
         new Schema.Field("myint", optionalInt, null, NullNode.getInstance())
     ));
@@ -284,7 +305,7 @@ public class TestAvroSchemaConverter {
   @Test
   public void testOptionalMapValue() throws Exception {
     Schema schema = Schema.createRecord("record1", null, null, false);
-    Schema optionalIntMap = Schema.createMap(optional(Schema.create(Schema.Type.INT)));
+    Schema optionalIntMap = Schema.createMap(optional(Schema.create(INT)));
     schema.setFields(Arrays.asList(
         new Schema.Field("myintmap", optionalIntMap, null, null)
     ));
@@ -303,7 +324,7 @@ public class TestAvroSchemaConverter {
   @Test
   public void testOptionalArrayElement() throws Exception {
     Schema schema = Schema.createRecord("record1", null, null, false);
-    Schema optionalIntArray = Schema.createArray(optional(Schema.create(Schema.Type.INT)));
+    Schema optionalIntArray = Schema.createArray(optional(Schema.create(INT)));
     schema.setFields(Arrays.asList(
         new Schema.Field("myintarray", optionalIntArray, null, null)
     ));
@@ -323,7 +344,7 @@ public class TestAvroSchemaConverter {
     Schema schema = Schema.createRecord("record2", null, null, false);
     Schema multipleTypes = Schema.createUnion(Arrays.asList(Schema.create(Schema.Type
             .NULL),
-        Schema.create(Schema.Type.INT),
+        Schema.create(INT),
         Schema.create(Schema.Type.FLOAT)));
     schema.setFields(Arrays.asList(
         new Schema.Field("myunion", multipleTypes, null, NullNode.getInstance())));
@@ -396,7 +417,7 @@ public class TestAvroSchemaConverter {
   @Test
   public void testOldAvroListOfLists() throws Exception {
     Schema listOfLists = optional(Schema.createArray(Schema.createArray(
-        Schema.create(Schema.Type.INT))));
+        Schema.create(INT))));
     Schema schema = Schema.createRecord("AvroCompatListInList", null, null, false);
     schema.setFields(Lists.newArrayList(
         new Schema.Field("listOfLists", listOfLists, null, NullNode.getInstance())
@@ -425,7 +446,7 @@ public class TestAvroSchemaConverter {
   @Test
   public void testOldThriftListOfLists() throws Exception {
     Schema listOfLists = optional(Schema.createArray(Schema.createArray(
-        Schema.create(Schema.Type.INT))));
+        Schema.create(INT))));
     Schema schema = Schema.createRecord("ThriftCompatListInList", null, null, false);
     schema.setFields(Lists.newArrayList(
         new Schema.Field("listOfLists", listOfLists, null, NullNode.getInstance())
@@ -458,7 +479,7 @@ public class TestAvroSchemaConverter {
     // group's name, but it must be 2-level because the repeated group doesn't
     // contain an optional or repeated element as required for 3-level lists
     Schema listOfLists = optional(Schema.createArray(Schema.createArray(
-        Schema.create(Schema.Type.INT))));
+        Schema.create(INT))));
     Schema schema = Schema.createRecord("UnknownTwoLevelListInList", null, null, false);
     schema.setFields(Lists.newArrayList(
         new Schema.Field("listOfLists", listOfLists, null, NullNode.getInstance())
@@ -488,7 +509,7 @@ public class TestAvroSchemaConverter {
   @Test
   public void testParquetMapWithoutMapKeyValueAnnotation() throws Exception {
     Schema schema = Schema.createRecord("myrecord", null, null, false);
-    Schema map = Schema.createMap(Schema.create(Schema.Type.INT));
+    Schema map = Schema.createMap(Schema.create(INT));
     schema.setFields(Collections.singletonList(new Schema.Field("mymap", map, null, null)));
     String parquetSchema =
         "message myrecord {\n" +
@@ -504,9 +525,240 @@ public class TestAvroSchemaConverter {
     testParquetToAvroConversion(NEW_BEHAVIOR, schema, parquetSchema);
   }
 
+  @Test
+  public void testDecimalBytesType() throws Exception {
+    Schema schema = Schema.createRecord("myrecord", null, null, false);
+    Schema decimal = LogicalTypes.decimal(9, 2).addToSchema(
+        Schema.create(Schema.Type.BYTES));
+    schema.setFields(Collections.singletonList(
+        new Schema.Field("dec", decimal, null, null)));
+
+    testRoundTripConversion(schema,
+        "message myrecord {\n" +
+            "  required binary dec (DECIMAL(9,2));\n" +
+            "}\n");
+  }
+
+  @Test
+  public void testDecimalFixedType() throws Exception {
+    Schema schema = Schema.createRecord("myrecord", null, null, false);
+    Schema decimal = LogicalTypes.decimal(9, 2).addToSchema(
+        Schema.createFixed("dec", null, null, 8));
+    schema.setFields(Collections.singletonList(
+        new Schema.Field("dec", decimal, null, null)));
+
+    testRoundTripConversion(schema,
+        "message myrecord {\n" +
+            "  required fixed_len_byte_array(8) dec (DECIMAL(9,2));\n" +
+            "}\n");
+  }
+
+  @Test
+  public void testDecimalIntegerType() throws Exception {
+    Schema expected = Schema.createRecord("myrecord", null, null, false,
+        Arrays.asList(new Schema.Field(
+            "dec", Schema.create(INT), null, null)));
+
+    // the decimal portion is lost because it isn't valid in Avro
+    testParquetToAvroConversion(expected,
+        "message myrecord {\n" +
+            "  required int32 dec (DECIMAL(9,2));\n" +
+            "}\n");
+  }
+
+  @Test
+  public void testDecimalLongType() throws Exception {
+    Schema expected = Schema.createRecord("myrecord", null, null, false,
+        Arrays.asList(new Schema.Field("dec", Schema.create(LONG), null, null)));
+
+    // the decimal portion is lost because it isn't valid in Avro
+    testParquetToAvroConversion(expected,
+        "message myrecord {\n" +
+            "  required int64 dec (DECIMAL(9,2));\n" +
+            "}\n");
+  }
+
+  @Test
+  public void testDateType() throws Exception {
+    Schema date = LogicalTypes.date().addToSchema(Schema.create(INT));
+    Schema expected = Schema.createRecord("myrecord", null, null, false,
+        Arrays.asList(new Schema.Field("date", date, null, null)));
+
+    testRoundTripConversion(expected,
+        "message myrecord {\n" +
+            "  required int32 date (DATE);\n" +
+            "}\n");
+
+    for (PrimitiveTypeName primitive : new PrimitiveTypeName[]
+        {INT64, INT96, FLOAT, DOUBLE, BOOLEAN, BINARY, FIXED_LEN_BYTE_ARRAY}) {
+      final PrimitiveType type;
+      if (primitive == FIXED_LEN_BYTE_ARRAY) {
+        type = new PrimitiveType(REQUIRED, primitive, 12, "test", DATE);
+      } else {
+        type = new PrimitiveType(REQUIRED, primitive, "test", DATE);
+      }
+
+      assertThrows("Should not allow TIME_MICROS with " + primitive,
+          IllegalArgumentException.class, new Runnable() {
+            @Override
+            public void run() {
+              new AvroSchemaConverter().convert(message(type));
+            }
+          });
+    }
+  }
+
+  @Test
+  public void testTimeMillisType() throws Exception {
+    Schema date = LogicalTypes.timeMillis().addToSchema(Schema.create(INT));
+    Schema expected = Schema.createRecord("myrecord", null, null, false,
+        Arrays.asList(new Schema.Field("time", date, null, null)));
+
+    testRoundTripConversion(expected,
+        "message myrecord {\n" +
+            "  required int32 time (TIME_MILLIS);\n" +
+            "}\n");
+
+    for (PrimitiveTypeName primitive : new PrimitiveTypeName[]
+        {INT64, INT96, FLOAT, DOUBLE, BOOLEAN, BINARY, FIXED_LEN_BYTE_ARRAY}) {
+      final PrimitiveType type;
+      if (primitive == FIXED_LEN_BYTE_ARRAY) {
+        type = new PrimitiveType(REQUIRED, primitive, 12, "test", TIME_MILLIS);
+      } else {
+        type = new PrimitiveType(REQUIRED, primitive, "test", TIME_MILLIS);
+      }
+
+      assertThrows("Should not allow TIME_MICROS with " + primitive,
+          IllegalArgumentException.class, new Runnable() {
+            @Override
+            public void run() {
+              new AvroSchemaConverter().convert(message(type));
+            }
+          });
+    }
+  }
+
+  @Test
+  public void testTimeMicrosType() throws Exception {
+    Schema date = LogicalTypes.timeMicros().addToSchema(Schema.create(LONG));
+    Schema expected = Schema.createRecord("myrecord", null, null, false,
+        Arrays.asList(new Schema.Field("time", date, null, null)));
+
+    testRoundTripConversion(expected,
+        "message myrecord {\n" +
+            "  required int64 time (TIME_MICROS);\n" +
+            "}\n");
+
+    for (PrimitiveTypeName primitive : new PrimitiveTypeName[]
+        {INT32, INT96, FLOAT, DOUBLE, BOOLEAN, BINARY, FIXED_LEN_BYTE_ARRAY}) {
+      final PrimitiveType type;
+      if (primitive == FIXED_LEN_BYTE_ARRAY) {
+        type = new PrimitiveType(REQUIRED, primitive, 12, "test", TIME_MICROS);
+      } else {
+        type = new PrimitiveType(REQUIRED, primitive, "test", TIME_MICROS);
+      }
+
+      assertThrows("Should not allow TIME_MICROS with " + primitive,
+          IllegalArgumentException.class, new Runnable() {
+            @Override
+            public void run() {
+              new AvroSchemaConverter().convert(message(type));
+            }
+          });
+    }
+  }
+
+  @Test
+  public void testTimestampMillisType() throws Exception {
+    Schema date = LogicalTypes.timestampMillis().addToSchema(Schema.create(LONG));
+    Schema expected = Schema.createRecord("myrecord", null, null, false,
+        Arrays.asList(new Schema.Field("timestamp", date, null, null)));
+
+    testRoundTripConversion(expected,
+        "message myrecord {\n" +
+            "  required int64 timestamp (TIMESTAMP_MILLIS);\n" +
+            "}\n");
+
+    for (PrimitiveTypeName primitive : new PrimitiveTypeName[]
+        {INT32, INT96, FLOAT, DOUBLE, BOOLEAN, BINARY, FIXED_LEN_BYTE_ARRAY}) {
+      final PrimitiveType type;
+      if (primitive == FIXED_LEN_BYTE_ARRAY) {
+        type = new PrimitiveType(REQUIRED, primitive, 12, "test", TIMESTAMP_MILLIS);
+      } else {
+        type = new PrimitiveType(REQUIRED, primitive, "test", TIMESTAMP_MILLIS);
+      }
+
+      assertThrows("Should not allow TIMESTAMP_MILLIS with " + primitive,
+          IllegalArgumentException.class, new Runnable() {
+            @Override
+            public void run() {
+              new AvroSchemaConverter().convert(message(type));
+            }
+          });
+    }
+  }
+
+  @Test
+  public void testTimestampMicrosType() throws Exception {
+    Schema date = LogicalTypes.timestampMicros().addToSchema(Schema.create(LONG));
+    Schema expected = Schema.createRecord("myrecord", null, null, false,
+        Arrays.asList(new Schema.Field("timestamp", date, null, null)));
+
+    testRoundTripConversion(expected,
+        "message myrecord {\n" +
+            "  required int64 timestamp (TIMESTAMP_MICROS);\n" +
+            "}\n");
+
+    for (PrimitiveTypeName primitive : new PrimitiveTypeName[]
+        {INT32, INT96, FLOAT, DOUBLE, BOOLEAN, BINARY, FIXED_LEN_BYTE_ARRAY}) {
+      final PrimitiveType type;
+      if (primitive == FIXED_LEN_BYTE_ARRAY) {
+        type = new PrimitiveType(REQUIRED, primitive, 12, "test", TIMESTAMP_MICROS);
+      } else {
+        type = new PrimitiveType(REQUIRED, primitive, "test", TIMESTAMP_MICROS);
+      }
+
+      assertThrows("Should not allow TIMESTAMP_MICROS with " + primitive,
+          IllegalArgumentException.class, new Runnable() {
+            @Override
+            public void run() {
+              new AvroSchemaConverter().convert(message(type));
+            }
+          });
+    }
+  }
+
   public static Schema optional(Schema original) {
     return Schema.createUnion(Lists.newArrayList(
         Schema.create(Schema.Type.NULL),
         original));
   }
+
+  public static MessageType message(PrimitiveType primitive) {
+    return Types.buildMessage()
+        .addField(primitive)
+        .named("myrecord");
+  }
+
+  /**
+   * A convenience method to avoid a large number of @Test(expected=...) tests
+   * @param message A String message to describe this assertion
+   * @param expected An Exception class that the Runnable should throw
+   * @param runnable A Runnable that is expected to throw the exception
+   */
+  public static void assertThrows(
+      String message, Class<? extends Exception> expected, Runnable runnable) {
+    try {
+      runnable.run();
+      Assert.fail("No exception was thrown (" + message + "), expected: " +
+          expected.getName());
+    } catch (Exception actual) {
+      try {
+        Assert.assertEquals(message, expected, actual.getClass());
+      } catch (AssertionError e) {
+        e.addSuppressed(actual);
+        throw e;
+      }
+    }
+  }
 }