You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by bl...@apache.org on 2016/04/20 17:41:29 UTC

[1/2] parquet-mr git commit: PARQUET-358: Add support for Avro's logical types API.

Repository: parquet-mr
Updated Branches:
  refs/heads/master 82b8ecc32 -> 6b24a1d1b


http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b24a1d1/parquet-avro/src/test/java/org/apache/parquet/avro/TestCircularReferences.java
----------------------------------------------------------------------
diff --git a/parquet-avro/src/test/java/org/apache/parquet/avro/TestCircularReferences.java b/parquet-avro/src/test/java/org/apache/parquet/avro/TestCircularReferences.java
new file mode 100644
index 0000000..d2f80ed
--- /dev/null
+++ b/parquet-avro/src/test/java/org/apache/parquet/avro/TestCircularReferences.java
@@ -0,0 +1,383 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.parquet.avro;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.IdentityHashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.avro.Conversion;
+import org.apache.avro.LogicalType;
+import org.apache.avro.LogicalTypes;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericData.Record;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.avro.util.Utf8;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+/**
+ * This class is based on org.apache.avro.TestCircularReferences
+ *
+ * The main difference between this class and the Avro version is that this one
+ * uses a place-holder schema for the circular reference from Child to Parent.
+ * This avoids creating a schema for Parent that references itself and can't be
+ * converted to a Parquet schema. The place-holder schema must also have a
+ * referenceable logical type.
+ */
+public class TestCircularReferences {
+
+  @Rule
+  public TemporaryFolder temp = new TemporaryFolder();
+
+  public static class Reference extends LogicalType {
+    private static final String REFERENCE = "reference";
+    private static final String REF_FIELD_NAME = "ref-field-name";
+
+    private final String refFieldName;
+
+    public Reference(String refFieldName) {
+      super(REFERENCE);
+      this.refFieldName = refFieldName;
+    }
+
+    public Reference(Schema schema) {
+      super(REFERENCE);
+      this.refFieldName = schema.getProp(REF_FIELD_NAME);
+    }
+
+    @Override
+    public Schema addToSchema(Schema schema) {
+      super.addToSchema(schema);
+      schema.addProp(REF_FIELD_NAME, refFieldName);
+      return schema;
+    }
+
+    @Override
+    public String getName() {
+      return REFERENCE;
+    }
+
+    public String getRefFieldName() {
+      return refFieldName;
+    }
+
+    @Override
+    public void validate(Schema schema) {
+      super.validate(schema);
+      if (schema.getField(refFieldName) == null) {
+        throw new IllegalArgumentException("Invalid field name for reference field: " + refFieldName);
+      }
+    }
+  }
+
+  public static class Referenceable extends LogicalType {
+    private static final String REFERENCEABLE = "referenceable";
+    private static final String ID_FIELD_NAME = "id-field-name";
+
+    private final String idFieldName;
+
+    public Referenceable(String idFieldName) {
+      super(REFERENCEABLE);
+      this.idFieldName = idFieldName;
+    }
+
+    public Referenceable(Schema schema) {
+      super(REFERENCEABLE);
+      this.idFieldName = schema.getProp(ID_FIELD_NAME);
+    }
+
+    @Override
+    public Schema addToSchema(Schema schema) {
+      super.addToSchema(schema);
+      schema.addProp(ID_FIELD_NAME, idFieldName);
+      return schema;
+    }
+
+    @Override
+    public String getName() {
+      return REFERENCEABLE;
+    }
+
+    public String getIdFieldName() {
+      return idFieldName;
+    }
+
+    @Override
+    public void validate(Schema schema) {
+      super.validate(schema);
+      Schema.Field idField = schema.getField(idFieldName);
+      if (idField == null || idField.schema().getType() != Schema.Type.LONG) {
+        throw new IllegalArgumentException("Invalid ID field: " + idFieldName + ": " + idField);
+      }
+    }
+  }
+
+  @BeforeClass
+  public static void addReferenceTypes() {
+    LogicalTypes.register(Referenceable.REFERENCEABLE, new LogicalTypes.LogicalTypeFactory() {
+      @Override
+      public LogicalType fromSchema(Schema schema) {
+        return new Referenceable(schema);
+      }
+    });
+    LogicalTypes.register(Reference.REFERENCE, new LogicalTypes.LogicalTypeFactory() {
+      @Override
+      public LogicalType fromSchema(Schema schema) {
+        return new Reference(schema);
+      }
+    });
+  }
+
+  public static class ReferenceManager {
+    private interface Callback {
+      void set(Object referenceable);
+    }
+
+    private final Map<Long, Object> references = new HashMap<Long, Object>();
+    private final Map<Object, Long> ids = new IdentityHashMap<Object, Long>();
+    private final Map<Long, List<Callback>> callbacksById = new HashMap<Long, List<Callback>>();
+    private final ReferenceableTracker tracker = new ReferenceableTracker();
+    private final ReferenceHandler handler = new ReferenceHandler();
+
+    public ReferenceableTracker getTracker() {
+      return tracker;
+    }
+
+    public ReferenceHandler getHandler() {
+      return handler;
+    }
+
+    public class ReferenceableTracker extends Conversion<IndexedRecord> {
+      @Override
+      @SuppressWarnings("unchecked")
+      public Class<IndexedRecord> getConvertedType() {
+        return (Class) Record.class;
+      }
+
+      @Override
+      public String getLogicalTypeName() {
+        return Referenceable.REFERENCEABLE;
+      }
+
+      @Override
+      public IndexedRecord fromRecord(IndexedRecord value, Schema schema, LogicalType type) {
+        // read side
+        long id = getId(value, schema);
+
+        // keep track of this for later references
+        references.put(id, value);
+
+        // call any callbacks waiting to resolve this id
+        List<Callback> callbacks = callbacksById.get(id);
+        for (Callback callback : callbacks) {
+          callback.set(value);
+        }
+
+        return value;
+      }
+
+      @Override
+      public IndexedRecord toRecord(IndexedRecord value, Schema schema, LogicalType type) {
+        // write side
+        long id = getId(value, schema);
+
+        // keep track of this for later references
+        //references.put(id, value);
+        ids.put(value, id);
+
+        return value;
+      }
+
+      private long getId(IndexedRecord referenceable, Schema schema) {
+        Referenceable info = (Referenceable) schema.getLogicalType();
+        int idField = schema.getField(info.getIdFieldName()).pos();
+        return (Long) referenceable.get(idField);
+      }
+    }
+
+    public class ReferenceHandler extends Conversion<IndexedRecord> {
+      @Override
+      @SuppressWarnings("unchecked")
+      public Class<IndexedRecord> getConvertedType() {
+        return (Class) Record.class;
+      }
+
+      @Override
+      public String getLogicalTypeName() {
+        return Reference.REFERENCE;
+      }
+
+      @Override
+      public IndexedRecord fromRecord(final IndexedRecord record, Schema schema, LogicalType type) {
+        // read side: resolve the record or save a callback
+        final Schema.Field refField = schema.getField(((Reference) type).getRefFieldName());
+
+        Long id = (Long) record.get(refField.pos());
+        if (id != null) {
+          if (references.containsKey(id)) {
+            record.put(refField.pos(), references.get(id));
+
+          } else {
+            List<Callback> callbacks = callbacksById.get(id);
+            if (callbacks == null) {
+              callbacks = new ArrayList<Callback>();
+              callbacksById.put(id, callbacks);
+            }
+            // add a callback to resolve this reference when the id is available
+            callbacks.add(new Callback() {
+              @Override
+              public void set(Object referenceable) {
+                record.put(refField.pos(), referenceable);
+              }
+            });
+          }
+        }
+
+        return record;
+      }
+
+      @Override
+      public IndexedRecord toRecord(IndexedRecord record, Schema schema, LogicalType type) {
+        // write side: replace a referenced field with its id
+        Schema.Field refField = schema.getField(((Reference) type).getRefFieldName());
+        IndexedRecord referenced = (IndexedRecord) record.get(refField.pos());
+        if (referenced == null) {
+          return record;
+        }
+
+        // hijack the field to return the id instead of the ref
+        return new HijackingIndexedRecord(record, refField.pos(), ids.get(referenced));
+      }
+    }
+
+    private static class HijackingIndexedRecord implements IndexedRecord {
+      private final IndexedRecord wrapped;
+      private final int index;
+      private final Object data;
+
+      public HijackingIndexedRecord(IndexedRecord wrapped, int index, Object data) {
+        this.wrapped = wrapped;
+        this.index = index;
+        this.data = data;
+      }
+
+      @Override
+      public void put(int i, Object v) {
+        throw new RuntimeException("[BUG] This is a read-only class.");
+      }
+
+      @Override
+      public Object get(int i) {
+        if (i == index) {
+          return data;
+        }
+        return wrapped.get(i);
+      }
+
+      @Override
+      public Schema getSchema() {
+        return wrapped.getSchema();
+      }
+    }
+  }
+
+  @Test
+  public void test() throws IOException {
+    ReferenceManager manager = new ReferenceManager();
+    GenericData model = new GenericData();
+    model.addLogicalTypeConversion(manager.getTracker());
+    model.addLogicalTypeConversion(manager.getHandler());
+
+    Schema parentSchema = Schema.createRecord("Parent", null, null, false);
+
+    Schema placeholderSchema = Schema.createRecord("Placeholder", null, null, false);
+    List<Schema.Field> placeholderFields = new ArrayList<Schema.Field>();
+    placeholderFields.add( // at least one field is needed to be a valid schema
+        new Schema.Field("id", Schema.create(Schema.Type.LONG), null, null));
+    placeholderSchema.setFields(placeholderFields);
+
+    Referenceable idRef = new Referenceable("id");
+
+    Schema parentRefSchema = Schema.createUnion(
+        Schema.create(Schema.Type.NULL),
+        Schema.create(Schema.Type.LONG),
+        idRef.addToSchema(placeholderSchema));
+
+    Reference parentRef = new Reference("parent");
+
+    List<Schema.Field> childFields = new ArrayList<Schema.Field>();
+    childFields.add(new Schema.Field("c", Schema.create(Schema.Type.STRING), null, null));
+    childFields.add(new Schema.Field("parent", parentRefSchema, null, null));
+    Schema childSchema = parentRef.addToSchema(
+        Schema.createRecord("Child", null, null, false, childFields));
+
+    List<Schema.Field> parentFields = new ArrayList<Schema.Field>();
+    parentFields.add(new Schema.Field("id", Schema.create(Schema.Type.LONG), null, null));
+    parentFields.add(new Schema.Field("p", Schema.create(Schema.Type.STRING), null, null));
+    parentFields.add(new Schema.Field("child", childSchema, null, null));
+    parentSchema.setFields(parentFields);
+
+    Schema schema = idRef.addToSchema(parentSchema);
+
+    System.out.println("Schema: " + schema.toString(true));
+
+    Record parent = new Record(schema);
+    parent.put("id", 1L);
+    parent.put("p", "parent data!");
+
+    Record child = new Record(childSchema);
+    child.put("c", "child data!");
+    child.put("parent", parent);
+
+    parent.put("child", child);
+
+    // serialization round trip
+    File data = AvroTestUtil.write(temp, model, schema, parent);
+    List<Record> records = AvroTestUtil.read(model, schema, data);
+
+    Record actual = records.get(0);
+
+    // because the record is a recursive structure, equals won't work
+    Assert.assertEquals("Should correctly read back the parent id",
+        1L, actual.get("id"));
+    Assert.assertEquals("Should correctly read back the parent data",
+        new Utf8("parent data!"), actual.get("p"));
+
+    Record actualChild = (Record) actual.get("child");
+    Assert.assertEquals("Should correctly read back the child data",
+        new Utf8("child data!"), actualChild.get("c"));
+    Object childParent = actualChild.get("parent");
+    Assert.assertTrue("Should have a parent Record object",
+        childParent instanceof Record);
+
+    Record childParentRecord = (Record) actualChild.get("parent");
+    Assert.assertEquals("Should have the right parent id",
+        1L, childParentRecord.get("id"));
+    Assert.assertEquals("Should have the right parent data",
+        new Utf8("parent data!"), childParentRecord.get("p"));
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b24a1d1/parquet-avro/src/test/java/org/apache/parquet/avro/TestGenericLogicalTypes.java
----------------------------------------------------------------------
diff --git a/parquet-avro/src/test/java/org/apache/parquet/avro/TestGenericLogicalTypes.java b/parquet-avro/src/test/java/org/apache/parquet/avro/TestGenericLogicalTypes.java
new file mode 100644
index 0000000..6809fff
--- /dev/null
+++ b/parquet-avro/src/test/java/org/apache/parquet/avro/TestGenericLogicalTypes.java
@@ -0,0 +1,271 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.avro;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.avro.Conversion;
+import org.apache.avro.Conversions;
+import org.apache.avro.LogicalType;
+import org.apache.avro.LogicalTypes;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.util.Utf8;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import java.io.File;
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.UUID;
+
+import static org.apache.avro.Schema.Type.STRING;
+import static org.apache.parquet.avro.AvroTestUtil.field;
+import static org.apache.parquet.avro.AvroTestUtil.instance;
+import static org.apache.parquet.avro.AvroTestUtil.optionalField;
+import static org.apache.parquet.avro.AvroTestUtil.read;
+import static org.apache.parquet.avro.AvroTestUtil.record;
+
+/**
+ * This class is based on org.apache.avro.generic.TestGenericLogicalTypes
+ */
+public class TestGenericLogicalTypes {
+
+  @Rule
+  public TemporaryFolder temp = new TemporaryFolder();
+
+  public static final GenericData GENERIC = new GenericData();
+  public static final LogicalType DECIMAL_9_2 = LogicalTypes.decimal(9, 2);
+  public static final BigDecimal D1 = new BigDecimal("-34.34");
+  public static final BigDecimal D2 = new BigDecimal("117230.00");
+
+
+  @BeforeClass
+  public static void addDecimalAndUUID() {
+    GENERIC.addLogicalTypeConversion(new Conversions.DecimalConversion());
+    GENERIC.addLogicalTypeConversion(new Conversions.UUIDConversion());
+  }
+
+  private <T> List<T> getFieldValues(Collection<GenericRecord> records, String field,
+                                     Class<T> expectedClass) {
+    List<T> values = new ArrayList<T>();
+    for (GenericRecord record : records) {
+      values.add(expectedClass.cast(record.get(field)));
+    }
+    return values;
+  }
+
+  @Test
+  public void testReadUUID() throws IOException {
+    Schema uuidSchema = record("R",
+        field("uuid", LogicalTypes.uuid().addToSchema(Schema.create(STRING))));
+    GenericRecord u1 = instance(uuidSchema, "uuid", UUID.randomUUID());
+    GenericRecord u2 = instance(uuidSchema, "uuid", UUID.randomUUID());
+
+    Schema stringSchema = record("R", field("uuid", Schema.create(STRING)));
+    GenericRecord s1 = instance(stringSchema, "uuid", u1.get("uuid").toString());
+    GenericRecord s2 = instance(stringSchema, "uuid", u2.get("uuid").toString());
+
+    File test = write(stringSchema, s1, s2);
+    Assert.assertEquals("Should convert Strings to UUIDs",
+        Arrays.asList(u1, u2), read(GENERIC, uuidSchema, test));
+  }
+
+  @Test
+  public void testWriteUUIDReadStringSchema() throws IOException {
+    Schema uuidSchema = record("R",
+        field("uuid", LogicalTypes.uuid().addToSchema(Schema.create(STRING))));
+    GenericRecord u1 = instance(uuidSchema, "uuid", UUID.randomUUID());
+    GenericRecord u2 = instance(uuidSchema, "uuid", UUID.randomUUID());
+
+    Schema stringUuidSchema = Schema.create(STRING);
+    stringUuidSchema.addProp(GenericData.STRING_PROP, "String");
+    Schema stringSchema = record("R", field("uuid", stringUuidSchema));
+    GenericRecord s1 = instance(stringSchema, "uuid", u1.get("uuid").toString());
+    GenericRecord s2 = instance(stringSchema, "uuid", u2.get("uuid").toString());
+
+    File test = write(GENERIC, uuidSchema, u1, u2);
+    Assert.assertEquals("Should read UUIDs as Strings",
+        Arrays.asList(s1, s2), read(GENERIC, stringSchema, test));
+  }
+
+  @Test
+  public void testWriteUUIDReadStringMissingLogicalType() throws IOException {
+    Schema uuidSchema = record("R",
+        field("uuid", LogicalTypes.uuid().addToSchema(Schema.create(STRING))));
+    GenericRecord u1 = instance(uuidSchema, "uuid", UUID.randomUUID());
+    GenericRecord u2 = instance(uuidSchema, "uuid", UUID.randomUUID());
+
+    GenericRecord s1 = instance(uuidSchema, "uuid", new Utf8(u1.get("uuid").toString()));
+    GenericRecord s2 = instance(uuidSchema, "uuid", new Utf8(u2.get("uuid").toString()));
+
+    File test = write(GENERIC, uuidSchema, u1, u2);
+    Assert.assertEquals("Should read UUIDs as Strings",
+        Arrays.asList(s1, s2), read(GenericData.get(), uuidSchema, test));
+  }
+
+  @Test
+  public void testWriteNullableUUID() throws IOException {
+    Schema nullableUuidSchema = record("R",
+        optionalField("uuid", LogicalTypes.uuid().addToSchema(Schema.create(STRING))));
+    GenericRecord u1 = instance(nullableUuidSchema, "uuid", UUID.randomUUID());
+    GenericRecord u2 = instance(nullableUuidSchema, "uuid", UUID.randomUUID());
+
+    Schema stringUuidSchema = Schema.create(STRING);
+    stringUuidSchema.addProp(GenericData.STRING_PROP, "String");
+    Schema nullableStringSchema = record("R", optionalField("uuid", stringUuidSchema));
+    GenericRecord s1 = instance(nullableStringSchema, "uuid", u1.get("uuid").toString());
+    GenericRecord s2 = instance(nullableStringSchema, "uuid", u2.get("uuid").toString());
+
+    File test = write(GENERIC, nullableUuidSchema, u1, u2);
+    Assert.assertEquals("Should read UUIDs as Strings",
+        Arrays.asList(s1, s2), read(GENERIC, nullableStringSchema, test));
+  }
+
+  @Test
+  public void testReadDecimalFixed() throws IOException {
+    Schema fixedSchema = Schema.createFixed("aFixed", null, null, 4);
+    Schema fixedRecord = record("R", field("dec", fixedSchema));
+    Schema decimalSchema = DECIMAL_9_2.addToSchema(
+        Schema.createFixed("aFixed", null, null, 4));
+    Schema decimalRecord = record("R", field("dec", decimalSchema));
+
+    GenericRecord r1 = instance(decimalRecord, "dec", D1);
+    GenericRecord r2 = instance(decimalRecord, "dec", D2);
+    List<GenericRecord> expected = Arrays.asList(r1, r2);
+
+    Conversion<BigDecimal> conversion = new Conversions.DecimalConversion();
+
+    // use the conversion directly instead of relying on the write side
+    GenericRecord r1fixed = instance(fixedRecord, "dec",
+        conversion.toFixed(D1, fixedSchema, DECIMAL_9_2));
+    GenericRecord r2fixed = instance(fixedRecord, "dec",
+        conversion.toFixed(D2, fixedSchema, DECIMAL_9_2));
+
+    File test = write(fixedRecord, r1fixed, r2fixed);
+    Assert.assertEquals("Should convert fixed to BigDecimals",
+        expected, read(GENERIC, decimalRecord, test));
+  }
+
+  @Test
+  public void testWriteDecimalFixed() throws IOException {
+    Schema fixedSchema = Schema.createFixed("aFixed", null, null, 4);
+    Schema fixedRecord = record("R", field("dec", fixedSchema));
+    Schema decimalSchema = DECIMAL_9_2.addToSchema(
+        Schema.createFixed("aFixed", null, null, 4));
+    Schema decimalRecord = record("R", field("dec", decimalSchema));
+
+    GenericRecord r1 = instance(decimalRecord, "dec", D1);
+    GenericRecord r2 = instance(decimalRecord, "dec", D2);
+
+    Conversion<BigDecimal> conversion = new Conversions.DecimalConversion();
+
+    // use the conversion directly instead of relying on the write side
+    GenericRecord r1fixed = instance(fixedRecord, "dec",
+        conversion.toFixed(D1, fixedSchema, DECIMAL_9_2));
+    GenericRecord r2fixed = instance(fixedRecord, "dec",
+        conversion.toFixed(D2, fixedSchema, DECIMAL_9_2));
+    List<GenericRecord> expected = Arrays.asList(r1fixed, r2fixed);
+
+    File test = write(GENERIC, decimalRecord, r1, r2);
+    Assert.assertEquals("Should read BigDecimals as fixed",
+        expected, read(GENERIC, fixedRecord, test));
+  }
+
+  @Test
+  public void testReadDecimalBytes() throws IOException {
+    Schema bytesSchema = Schema.create(Schema.Type.BYTES);
+    Schema bytesRecord = record("R", field("dec", bytesSchema));
+    Schema decimalSchema = DECIMAL_9_2.addToSchema(Schema.create(Schema.Type.BYTES));
+    Schema decimalRecord = record("R", field("dec", decimalSchema));
+
+    GenericRecord r1 = instance(decimalRecord, "dec", D1);
+    GenericRecord r2 = instance(decimalRecord, "dec", D2);
+    List<GenericRecord> expected = Arrays.asList(r1, r2);
+
+    Conversion<BigDecimal> conversion = new Conversions.DecimalConversion();
+
+    // use the conversion directly instead of relying on the write side
+    GenericRecord r1bytes = instance(bytesRecord, "dec",
+        conversion.toBytes(D1, bytesSchema, DECIMAL_9_2));
+    GenericRecord r2bytes = instance(bytesRecord, "dec",
+        conversion.toBytes(D2, bytesSchema, DECIMAL_9_2));
+
+    File test = write(bytesRecord, r1bytes, r2bytes);
+    Assert.assertEquals("Should convert bytes to BigDecimals",
+        expected, read(GENERIC, decimalRecord, test));
+  }
+
+  @Test
+  public void testWriteDecimalBytes() throws IOException {
+    Schema bytesSchema = Schema.create(Schema.Type.BYTES);
+    Schema bytesRecord = record("R", field("dec", bytesSchema));
+    Schema decimalSchema = DECIMAL_9_2.addToSchema(Schema.create(Schema.Type.BYTES));
+    Schema decimalRecord = record("R", field("dec", decimalSchema));
+
+    GenericRecord r1 = instance(decimalRecord, "dec", D1);
+    GenericRecord r2 = instance(decimalRecord, "dec", D2);
+
+    Conversion<BigDecimal> conversion = new Conversions.DecimalConversion();
+
+    // use the conversion directly instead of relying on the write side
+    GenericRecord r1bytes = instance(bytesRecord, "dec",
+        conversion.toBytes(D1, bytesSchema, DECIMAL_9_2));
+    GenericRecord r2bytes = instance(bytesRecord, "dec",
+        conversion.toBytes(D2, bytesSchema, DECIMAL_9_2));
+
+    List<GenericRecord> expected = Arrays.asList(r1bytes, r2bytes);
+
+    File test = write(GENERIC, decimalRecord, r1, r2);
+    Assert.assertEquals("Should read BigDecimals as bytes",
+        expected, read(GENERIC, bytesRecord, test));
+  }
+
+  private <D> File write(Schema schema, D... data) throws IOException {
+    return write(GenericData.get(), schema, data);
+  }
+
+  private <D> File write(GenericData model, Schema schema, D... data) throws IOException {
+    return AvroTestUtil.write(temp, model, schema, data);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b24a1d1/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWrite.java
----------------------------------------------------------------------
diff --git a/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWrite.java b/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWrite.java
index 855a5b1..4fa71ea 100644
--- a/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWrite.java
+++ b/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWrite.java
@@ -19,18 +19,23 @@
 package org.apache.parquet.avro;
 
 import com.google.common.base.Charsets;
-import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 import com.google.common.io.Resources;
 import java.io.File;
+import java.math.BigDecimal;
+import java.math.BigInteger;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Random;
+import org.apache.avro.Conversions;
+import org.apache.avro.LogicalTypes;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericFixed;
@@ -39,12 +44,16 @@ import org.apache.avro.generic.GenericRecordBuilder;
 import org.apache.avro.util.Utf8;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.parquet.hadoop.ParquetReader;
 import org.apache.parquet.hadoop.ParquetWriter;
 import org.apache.parquet.hadoop.api.WriteSupport;
 import org.apache.parquet.io.api.Binary;
 import org.apache.parquet.io.api.RecordConsumer;
 import org.apache.parquet.schema.MessageTypeParser;
+import org.junit.Assert;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
@@ -225,6 +234,113 @@ public class TestReadWrite {
     assertEquals(ImmutableMap.of(str("a"), 1, str("b"), 2), nextRecord.get("mymap"));
   }
 
+  @Rule
+  public TemporaryFolder temp = new TemporaryFolder();
+
+  @Test
+  public void testDecimalValues() throws Exception {
+    Schema decimalSchema = Schema.createRecord("myrecord", null, null, false);
+    Schema decimal = LogicalTypes.decimal(9, 2).addToSchema(
+        Schema.create(Schema.Type.BYTES));
+    decimalSchema.setFields(Collections.singletonList(
+        new Schema.Field("dec", decimal, null, null)));
+
+    // add the decimal conversion to a generic data model
+    GenericData decimalSupport = new GenericData();
+    decimalSupport.addLogicalTypeConversion(new Conversions.DecimalConversion());
+
+    File file = temp.newFile("decimal.parquet");
+    file.delete();
+    Path path = new Path(file.toString());
+
+    ParquetWriter<GenericRecord> writer = AvroParquetWriter
+        .<GenericRecord>builder(path)
+        .withDataModel(decimalSupport)
+        .withSchema(decimalSchema)
+        .build();
+
+    Random random = new Random(34L);
+    GenericRecordBuilder builder = new GenericRecordBuilder(decimalSchema);
+    List<GenericRecord> expected = Lists.newArrayList();
+    for (int i = 0; i < 1000; i += 1) {
+      BigDecimal dec = new BigDecimal(new BigInteger(31, random), 2);
+      builder.set("dec", dec);
+
+      GenericRecord rec = builder.build();
+      expected.add(rec);
+      writer.write(builder.build());
+    }
+    writer.close();
+
+    ParquetReader<GenericRecord> reader = AvroParquetReader
+        .<GenericRecord>builder(path)
+        .withDataModel(decimalSupport)
+        .disableCompatibility()
+        .build();
+    List<GenericRecord> records = Lists.newArrayList();
+    GenericRecord rec;
+    while ((rec = reader.read()) != null) {
+      records.add(rec);
+    }
+    reader.close();
+
+    Assert.assertTrue("dec field should be a BigDecimal instance",
+        records.get(0).get("dec") instanceof BigDecimal);
+    Assert.assertEquals("Content should match", expected, records);
+  }
+
+  @Test
+  public void testFixedDecimalValues() throws Exception {
+    Schema decimalSchema = Schema.createRecord("myrecord", null, null, false);
+    Schema decimal = LogicalTypes.decimal(9, 2).addToSchema(
+        Schema.createFixed("dec", null, null, 4));
+    decimalSchema.setFields(Collections.singletonList(
+        new Schema.Field("dec", decimal, null, null)));
+
+    // add the decimal conversion to a generic data model
+    GenericData decimalSupport = new GenericData();
+    decimalSupport.addLogicalTypeConversion(new Conversions.DecimalConversion());
+
+    File file = temp.newFile("decimal.parquet");
+    file.delete();
+    Path path = new Path(file.toString());
+
+    ParquetWriter<GenericRecord> writer = AvroParquetWriter
+        .<GenericRecord>builder(path)
+        .withDataModel(decimalSupport)
+        .withSchema(decimalSchema)
+        .build();
+
+    Random random = new Random(34L);
+    GenericRecordBuilder builder = new GenericRecordBuilder(decimalSchema);
+    List<GenericRecord> expected = Lists.newArrayList();
+    for (int i = 0; i < 1000; i += 1) {
+      BigDecimal dec = new BigDecimal(new BigInteger(31, random), 2);
+      builder.set("dec", dec);
+
+      GenericRecord rec = builder.build();
+      expected.add(rec);
+      writer.write(builder.build());
+    }
+    writer.close();
+
+    ParquetReader<GenericRecord> reader = AvroParquetReader
+        .<GenericRecord>builder(path)
+        .withDataModel(decimalSupport)
+        .disableCompatibility()
+        .build();
+    List<GenericRecord> records = Lists.newArrayList();
+    GenericRecord rec;
+    while ((rec = reader.read()) != null) {
+      records.add(rec);
+    }
+    reader.close();
+
+    Assert.assertTrue("dec field should be a BigDecimal instance",
+        records.get(0).get("dec") instanceof BigDecimal);
+    Assert.assertEquals("Content should match", expected, records);
+  }
+
   @Test
   public void testAll() throws Exception {
     Schema schema = new Schema.Parser().parse(

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b24a1d1/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWriteOldListBehavior.java
----------------------------------------------------------------------
diff --git a/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWriteOldListBehavior.java b/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWriteOldListBehavior.java
index 64caacc..af6f938 100644
--- a/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWriteOldListBehavior.java
+++ b/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWriteOldListBehavior.java
@@ -47,7 +47,6 @@ import org.junit.Assert;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
-import sun.net.www.content.text.Generic;
 
 import static org.apache.parquet.avro.AvroTestUtil.array;
 import static org.apache.parquet.avro.AvroTestUtil.optional;

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b24a1d1/parquet-avro/src/test/java/org/apache/parquet/avro/TestReflectLogicalTypes.java
----------------------------------------------------------------------
diff --git a/parquet-avro/src/test/java/org/apache/parquet/avro/TestReflectLogicalTypes.java b/parquet-avro/src/test/java/org/apache/parquet/avro/TestReflectLogicalTypes.java
new file mode 100644
index 0000000..401e698
--- /dev/null
+++ b/parquet-avro/src/test/java/org/apache/parquet/avro/TestReflectLogicalTypes.java
@@ -0,0 +1,705 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.avro;
+
+import org.apache.avro.Conversion;
+import org.apache.avro.Conversions;
+import org.apache.avro.LogicalType;
+import org.apache.avro.LogicalTypes;
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaBuilder;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.avro.reflect.AvroSchema;
+import org.apache.avro.reflect.ReflectData;
+import org.apache.avro.specific.SpecificData;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import java.io.File;
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.UUID;
+
+import static org.apache.parquet.avro.AvroTestUtil.read;
+
+/**
+ * This class is based on org.apache.avro.reflect.TestReflectLogicalTypes
+ *
+ * Tests various logical types
+ * * string => UUID
+ * * fixed and bytes => Decimal
+ * * record => Pair
+ */
+public class TestReflectLogicalTypes {
+  @Rule
+  public TemporaryFolder temp = new TemporaryFolder();
+
+  public static final ReflectData REFLECT = new ReflectData();
+
+  @BeforeClass
+  public static void addUUID() {
+    REFLECT.addLogicalTypeConversion(new Conversions.UUIDConversion());
+    REFLECT.addLogicalTypeConversion(new Conversions.DecimalConversion());
+  }
+
+  @Test
+  public void testReflectedSchema() {
+    Schema expected = SchemaBuilder.record(RecordWithUUIDList.class.getName())
+        .fields()
+        .name("uuids").type().array().items().stringType().noDefault()
+        .endRecord();
+    expected.getField("uuids").schema().addProp(
+        SpecificData.CLASS_PROP, List.class.getName());
+    LogicalTypes.uuid().addToSchema(
+        expected.getField("uuids").schema().getElementType());
+
+    Schema actual = REFLECT.getSchema(RecordWithUUIDList.class);
+
+    Assert.assertEquals("Should use the UUID logical type", expected, actual);
+  }
+
+  // this can be static because the schema only comes from reflection
+  public static class DecimalRecordBytes {
+    // scale is required and will not be set by the conversion
+    @AvroSchema("{" +
+        "\"type\": \"bytes\"," +
+        "\"logicalType\": \"decimal\"," +
+        "\"precision\": 9," +
+        "\"scale\": 2" +
+        "}")
+    private BigDecimal decimal;
+
+    @Override
+    public boolean equals(Object other) {
+      if (this == other) {
+        return true;
+      }
+
+      if (other == null || getClass() != other.getClass()) {
+        return false;
+      }
+
+      DecimalRecordBytes that = (DecimalRecordBytes) other;
+      if (decimal == null) {
+        return (that.decimal == null);
+      }
+
+      return decimal.equals(that.decimal);
+    }
+
+    @Override
+    public int hashCode() {
+      return decimal != null ? decimal.hashCode() : 0;
+    }
+  }
+
+  @Test
+  public void testDecimalBytes() throws IOException {
+    Schema schema = REFLECT.getSchema(DecimalRecordBytes.class);
+    Assert.assertEquals("Should have the correct record name",
+        "org.apache.parquet.avro.TestReflectLogicalTypes$",
+        schema.getNamespace());
+    Assert.assertEquals("Should have the correct record name",
+        "DecimalRecordBytes",
+        schema.getName());
+    Assert.assertEquals("Should have the correct logical type",
+        LogicalTypes.decimal(9, 2),
+        LogicalTypes.fromSchema(schema.getField("decimal").schema()));
+
+    DecimalRecordBytes record = new DecimalRecordBytes();
+    record.decimal = new BigDecimal("3.14");
+
+    File test = write(REFLECT, schema, record);
+    Assert.assertEquals("Should match the decimal after round trip",
+        Arrays.asList(record),
+        read(REFLECT, schema, test));
+  }
+
+  // this can be static because the schema only comes from reflection
+  public static class DecimalRecordFixed {
+    // scale is required and will not be set by the conversion
+    @AvroSchema("{" +
+        "\"name\": \"decimal_9\"," +
+        "\"type\": \"fixed\"," +
+        "\"size\": 4," +
+        "\"logicalType\": \"decimal\"," +
+        "\"precision\": 9," +
+        "\"scale\": 2" +
+        "}")
+    private BigDecimal decimal;
+
+    @Override
+    public boolean equals(Object other) {
+      if (this == other) {
+        return true;
+      }
+
+      if (other == null || getClass() != other.getClass()) {
+        return false;
+      }
+
+      DecimalRecordFixed that = (DecimalRecordFixed) other;
+      if (decimal == null) {
+        return (that.decimal == null);
+      }
+
+      return decimal.equals(that.decimal);
+    }
+
+    @Override
+    public int hashCode() {
+      return decimal != null ? decimal.hashCode() : 0;
+    }
+  }
+
+  @Test
+  public void testDecimalFixed() throws IOException {
+    Schema schema = REFLECT.getSchema(DecimalRecordFixed.class);
+    Assert.assertEquals("Should have the correct record name",
+        "org.apache.parquet.avro.TestReflectLogicalTypes$",
+        schema.getNamespace());
+    Assert.assertEquals("Should have the correct record name",
+        "DecimalRecordFixed",
+        schema.getName());
+    Assert.assertEquals("Should have the correct logical type",
+        LogicalTypes.decimal(9, 2),
+        LogicalTypes.fromSchema(schema.getField("decimal").schema()));
+
+    DecimalRecordFixed record = new DecimalRecordFixed();
+    record.decimal = new BigDecimal("3.14");
+
+    File test = write(REFLECT, schema, record);
+    Assert.assertEquals("Should match the decimal after round trip",
+        Arrays.asList(record),
+        read(REFLECT, schema, test));
+  }
+
+  public static class Pair<X, Y> {
+    private final X first;
+    private final Y second;
+
+    private Pair(X first, Y second) {
+      this.first = first;
+      this.second = second;
+    }
+
+    @Override
+    public boolean equals(Object other) {
+      if (this == other) {
+        return true;
+      }
+
+      if (other == null || getClass() != other.getClass()) {
+        return false;
+      }
+
+      Pair<?, ?> that = (Pair<?, ?>) other;
+      if (first == null) {
+        if (that.first != null) {
+          return false;
+        }
+      } else if (first.equals(that.first)) {
+        return false;
+      }
+
+      if (second == null) {
+        if (that.second != null) {
+          return false;
+        }
+      } else if (second.equals(that.second)) {
+        return false;
+      }
+
+      return true;
+    }
+
+    @Override
+    public int hashCode() {
+      return Arrays.hashCode(new Object[] {first, second});
+    }
+
+    public static <X, Y> Pair<X, Y> of(X first, Y second) {
+      return new Pair<X, Y>(first, second);
+    }
+  }
+
+  public static class PairRecord {
+    @AvroSchema("{" +
+        "\"name\": \"Pair\"," +
+        "\"type\": \"record\"," +
+        "\"fields\": [" +
+        "    {\"name\": \"x\", \"type\": \"long\"}," +
+        "    {\"name\": \"y\", \"type\": \"long\"}" +
+        "  ]," +
+        "\"logicalType\": \"pair\"" +
+        "}")
+    Pair<Long, Long> pair;
+  }
+
+  @Test
+  @SuppressWarnings("unchecked")
+  public void testPairRecord() throws IOException {
+    ReflectData model = new ReflectData();
+    model.addLogicalTypeConversion(new Conversion<Pair>() {
+      @Override
+      public Class<Pair> getConvertedType() {
+        return Pair.class;
+      }
+
+      @Override
+      public String getLogicalTypeName() {
+        return "pair";
+      }
+
+      @Override
+      public Pair fromRecord(IndexedRecord value, Schema schema, LogicalType type) {
+        return Pair.of(value.get(0), value.get(1));
+      }
+
+      @Override
+      public IndexedRecord toRecord(Pair value, Schema schema, LogicalType type) {
+        GenericData.Record record = new GenericData.Record(schema);
+        record.put(0, value.first);
+        record.put(1, value.second);
+        return record;
+      }
+    });
+
+    LogicalTypes.register("pair", new LogicalTypes.LogicalTypeFactory() {
+      private final LogicalType PAIR = new LogicalType("pair");
+      @Override
+      public LogicalType fromSchema(Schema schema) {
+        return PAIR;
+      }
+    });
+
+    Schema schema = model.getSchema(PairRecord.class);
+    Assert.assertEquals("Should have the correct record name",
+        "org.apache.parquet.avro.TestReflectLogicalTypes$",
+        schema.getNamespace());
+    Assert.assertEquals("Should have the correct record name",
+        "PairRecord",
+        schema.getName());
+    Assert.assertEquals("Should have the correct logical type",
+        "pair",
+        LogicalTypes.fromSchema(schema.getField("pair").schema()).getName());
+
+    PairRecord record = new PairRecord();
+    record.pair = Pair.of(34L, 35L);
+    List<PairRecord> expected = new ArrayList<PairRecord>();
+    expected.add(record);
+
+    File test = write(model, schema, record);
+    Pair<Long, Long> actual = AvroTestUtil
+        .<PairRecord>read(model, schema, test)
+        .get(0).pair;
+    Assert.assertEquals("Data should match after serialization round-trip",
+        34L, (long) actual.first);
+    Assert.assertEquals("Data should match after serialization round-trip",
+        35L, (long) actual.second);
+  }
+
+  @Test
+  public void testReadUUID() throws IOException {
+    Schema uuidSchema = SchemaBuilder.record(RecordWithUUID.class.getName())
+        .fields().requiredString("uuid").endRecord();
+    LogicalTypes.uuid().addToSchema(uuidSchema.getField("uuid").schema());
+
+    UUID u1 = UUID.randomUUID();
+    UUID u2 = UUID.randomUUID();
+
+    RecordWithStringUUID r1 = new RecordWithStringUUID();
+    r1.uuid = u1.toString();
+    RecordWithStringUUID r2 = new RecordWithStringUUID();
+    r2.uuid = u2.toString();
+
+    List<RecordWithUUID> expected = Arrays.asList(
+        new RecordWithUUID(), new RecordWithUUID());
+    expected.get(0).uuid = u1;
+    expected.get(1).uuid = u2;
+
+    File test = write(
+        ReflectData.get().getSchema(RecordWithStringUUID.class), r1, r2);
+
+    Assert.assertEquals("Should convert Strings to UUIDs",
+        expected, read(REFLECT, uuidSchema, test));
+
+    // verify that the field's type overrides the logical type
+    Schema uuidStringSchema = SchemaBuilder
+        .record(RecordWithStringUUID.class.getName())
+        .fields().requiredString("uuid").endRecord();
+    LogicalTypes.uuid().addToSchema(uuidStringSchema.getField("uuid").schema());
+
+    Assert.assertEquals("Should not convert to UUID if accessor is String",
+        Arrays.asList(r1, r2),
+        read(REFLECT, uuidStringSchema, test));
+  }
+
+  @Test
+  public void testWriteUUID() throws IOException {
+    Schema uuidSchema = SchemaBuilder.record(RecordWithUUID.class.getName())
+        .fields().requiredString("uuid").endRecord();
+    LogicalTypes.uuid().addToSchema(uuidSchema.getField("uuid").schema());
+
+    UUID u1 = UUID.randomUUID();
+    UUID u2 = UUID.randomUUID();
+
+    RecordWithUUID r1 = new RecordWithUUID();
+    r1.uuid = u1;
+    RecordWithUUID r2 = new RecordWithUUID();
+    r2.uuid = u2;
+
+    List<RecordWithStringUUID> expected = Arrays.asList(
+        new RecordWithStringUUID(), new RecordWithStringUUID());
+    expected.get(0).uuid = u1.toString();
+    expected.get(1).uuid = u2.toString();
+
+    File test = write(REFLECT, uuidSchema, r1, r2);
+
+    // verify that the field's type overrides the logical type
+    Schema uuidStringSchema = SchemaBuilder
+        .record(RecordWithStringUUID.class.getName())
+        .fields().requiredString("uuid").endRecord();
+
+    Assert.assertEquals("Should read uuid as String without UUID conversion",
+        expected,
+        read(REFLECT, uuidStringSchema, test));
+
+    LogicalTypes.uuid().addToSchema(uuidStringSchema.getField("uuid").schema());
+    Assert.assertEquals("Should read uuid as String without UUID logical type",
+        expected,
+        read(ReflectData.get(), uuidStringSchema, test));
+  }
+
+  @Test
+  public void testWriteNullableUUID() throws IOException {
+    Schema nullableUuidSchema = SchemaBuilder.record(RecordWithUUID.class.getName())
+        .fields().optionalString("uuid").endRecord();
+    LogicalTypes.uuid().addToSchema(
+        nullableUuidSchema.getField("uuid").schema().getTypes().get(1));
+
+    UUID u1 = UUID.randomUUID();
+    UUID u2 = UUID.randomUUID();
+
+    RecordWithUUID r1 = new RecordWithUUID();
+    r1.uuid = u1;
+    RecordWithUUID r2 = new RecordWithUUID();
+    r2.uuid = u2;
+
+    List<RecordWithStringUUID> expected = Arrays.asList(
+        new RecordWithStringUUID(), new RecordWithStringUUID());
+    expected.get(0).uuid = u1.toString();
+    expected.get(1).uuid = u2.toString();
+
+    File test = write(REFLECT, nullableUuidSchema, r1, r2);
+
+    // verify that the field's type overrides the logical type
+    Schema nullableUuidStringSchema = SchemaBuilder
+        .record(RecordWithStringUUID.class.getName())
+        .fields().optionalString("uuid").endRecord();
+
+    Assert.assertEquals("Should read uuid as String without UUID conversion",
+        expected,
+        read(REFLECT, nullableUuidStringSchema, test));
+  }
+
+  @Test(expected = ClassCastException.class)
+  public void testWriteUUIDMissingLogicalType() throws IOException {
+    Schema uuidSchema = SchemaBuilder.record(RecordWithUUID.class.getName())
+        .fields().requiredString("uuid").endRecord();
+    LogicalTypes.uuid().addToSchema(uuidSchema.getField("uuid").schema());
+
+    UUID u1 = UUID.randomUUID();
+    UUID u2 = UUID.randomUUID();
+
+    RecordWithUUID r1 = new RecordWithUUID();
+    r1.uuid = u1;
+    RecordWithUUID r2 = new RecordWithUUID();
+    r2.uuid = u2;
+
+    // write without using REFLECT, which has the logical type
+    File test = write(uuidSchema, r1, r2);
+
+    // verify that the field's type overrides the logical type
+    Schema uuidStringSchema = SchemaBuilder
+        .record(RecordWithStringUUID.class.getName())
+        .fields().requiredString("uuid").endRecord();
+
+    // this fails with an AppendWriteException wrapping ClassCastException
+    // because the UUID isn't converted to a CharSequence expected internally
+    read(ReflectData.get(), uuidStringSchema, test);
+  }
+
+  @Test
+  public void testReadUUIDGenericRecord() throws IOException {
+    Schema uuidSchema = SchemaBuilder.record("RecordWithUUID")
+        .fields().requiredString("uuid").endRecord();
+    LogicalTypes.uuid().addToSchema(uuidSchema.getField("uuid").schema());
+
+    UUID u1 = UUID.randomUUID();
+    UUID u2 = UUID.randomUUID();
+
+    RecordWithStringUUID r1 = new RecordWithStringUUID();
+    r1.uuid = u1.toString();
+    RecordWithStringUUID r2 = new RecordWithStringUUID();
+    r2.uuid = u2.toString();
+
+    List<GenericData.Record> expected = Arrays.asList(
+        new GenericData.Record(uuidSchema), new GenericData.Record(uuidSchema));
+    expected.get(0).put("uuid", u1);
+    expected.get(1).put("uuid", u2);
+
+    File test = write(
+        ReflectData.get().getSchema(RecordWithStringUUID.class), r1, r2);
+
+    Assert.assertEquals("Should convert Strings to UUIDs",
+        expected, read(REFLECT, uuidSchema, test));
+
+    // verify that the field's type overrides the logical type
+    Schema uuidStringSchema = SchemaBuilder
+        .record(RecordWithStringUUID.class.getName())
+        .fields().requiredString("uuid").endRecord();
+    LogicalTypes.uuid().addToSchema(uuidSchema.getField("uuid").schema());
+
+    Assert.assertEquals("Should not convert to UUID if accessor is String",
+        Arrays.asList(r1, r2),
+        read(REFLECT, uuidStringSchema, test));
+  }
+
+  @Test
+  public void testReadUUIDArray() throws IOException {
+    Schema uuidArraySchema = SchemaBuilder.record(RecordWithUUIDArray.class.getName())
+        .fields()
+        .name("uuids").type().array().items().stringType().noDefault()
+        .endRecord();
+    LogicalTypes.uuid().addToSchema(
+        uuidArraySchema.getField("uuids").schema().getElementType());
+
+    UUID u1 = UUID.randomUUID();
+    UUID u2 = UUID.randomUUID();
+
+    GenericRecord r = new GenericData.Record(uuidArraySchema);
+    r.put("uuids", Arrays.asList(u1.toString(), u2.toString()));
+
+    RecordWithUUIDArray expected = new RecordWithUUIDArray();
+    expected.uuids = new UUID[] {u1, u2};
+
+    File test = write(uuidArraySchema, r);
+
+    Assert.assertEquals("Should convert Strings to UUIDs",
+        expected,
+        read(REFLECT, uuidArraySchema, test).get(0));
+  }
+
+  @Test
+  public void testWriteUUIDArray() throws IOException {
+    Schema uuidArraySchema = SchemaBuilder.record(RecordWithUUIDArray.class.getName())
+        .fields()
+        .name("uuids").type().array().items().stringType().noDefault()
+        .endRecord();
+    LogicalTypes.uuid().addToSchema(
+        uuidArraySchema.getField("uuids").schema().getElementType());
+
+    Schema stringArraySchema = SchemaBuilder.record("RecordWithUUIDArray")
+        .fields()
+        .name("uuids").type().array().items().stringType().noDefault()
+        .endRecord();
+    stringArraySchema.getField("uuids").schema()
+        .addProp(SpecificData.CLASS_PROP, List.class.getName());
+
+    UUID u1 = UUID.randomUUID();
+    UUID u2 = UUID.randomUUID();
+
+    GenericRecord expected = new GenericData.Record(stringArraySchema);
+    List<String> uuids = new ArrayList<String>();
+    uuids.add(u1.toString());
+    uuids.add(u2.toString());
+    expected.put("uuids", uuids);
+
+    RecordWithUUIDArray r = new RecordWithUUIDArray();
+    r.uuids = new UUID[] {u1, u2};
+
+    File test = write(REFLECT, uuidArraySchema, r);
+
+    Assert.assertEquals("Should read UUIDs as Strings",
+        expected,
+        read(ReflectData.get(), stringArraySchema, test).get(0));
+  }
+
+  @Test
+  public void testReadUUIDList() throws IOException {
+    Schema uuidListSchema = SchemaBuilder.record(RecordWithUUIDList.class.getName())
+        .fields()
+        .name("uuids").type().array().items().stringType().noDefault()
+        .endRecord();
+    uuidListSchema.getField("uuids").schema().addProp(
+        SpecificData.CLASS_PROP, List.class.getName());
+    LogicalTypes.uuid().addToSchema(
+        uuidListSchema.getField("uuids").schema().getElementType());
+
+    UUID u1 = UUID.randomUUID();
+    UUID u2 = UUID.randomUUID();
+
+    GenericRecord r = new GenericData.Record(uuidListSchema);
+    r.put("uuids", Arrays.asList(u1.toString(), u2.toString()));
+
+    RecordWithUUIDList expected = new RecordWithUUIDList();
+    expected.uuids = Arrays.asList(u1, u2);
+
+    File test = write(uuidListSchema, r);
+
+    Assert.assertEquals("Should convert Strings to UUIDs",
+        expected, read(REFLECT, uuidListSchema, test).get(0));
+  }
+
+  @Test
+  public void testWriteUUIDList() throws IOException {
+    Schema uuidListSchema = SchemaBuilder.record(RecordWithUUIDList.class.getName())
+        .fields()
+        .name("uuids").type().array().items().stringType().noDefault()
+        .endRecord();
+    uuidListSchema.getField("uuids").schema().addProp(
+        SpecificData.CLASS_PROP, List.class.getName());
+    LogicalTypes.uuid().addToSchema(
+        uuidListSchema.getField("uuids").schema().getElementType());
+
+    Schema stringArraySchema = SchemaBuilder.record("RecordWithUUIDArray")
+        .fields()
+        .name("uuids").type().array().items().stringType().noDefault()
+        .endRecord();
+    stringArraySchema.getField("uuids").schema()
+        .addProp(SpecificData.CLASS_PROP, List.class.getName());
+
+    UUID u1 = UUID.randomUUID();
+    UUID u2 = UUID.randomUUID();
+
+    GenericRecord expected = new GenericData.Record(stringArraySchema);
+    expected.put("uuids", Arrays.asList(u1.toString(), u2.toString()));
+
+    RecordWithUUIDList r = new RecordWithUUIDList();
+    r.uuids = Arrays.asList(u1, u2);
+
+    File test = write(REFLECT, uuidListSchema, r);
+
+    Assert.assertEquals("Should read UUIDs as Strings",
+        expected,
+        read(REFLECT, stringArraySchema, test).get(0));
+  }
+
+  private <D> File write(Schema schema, D... data) throws IOException {
+    return write(ReflectData.get(), schema, data);
+  }
+
+  @SuppressWarnings("unchecked")
+  private <D> File write(GenericData model, Schema schema, D... data) throws IOException {
+    return AvroTestUtil.write(temp, model, schema, data);
+  }
+}
+
+class RecordWithUUID {
+  UUID uuid;
+
+  @Override
+  public int hashCode() {
+    return uuid.hashCode();
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (obj == null) {
+      return false;
+    }
+    if (!(obj instanceof RecordWithUUID)) {
+      return false;
+    }
+    RecordWithUUID that = (RecordWithUUID) obj;
+    return this.uuid.equals(that.uuid);
+  }
+}
+
+class RecordWithStringUUID {
+  String uuid;
+
+  @Override
+  public int hashCode() {
+    return uuid.hashCode();
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (obj == null) {
+      return false;
+    }
+    if (!(obj instanceof RecordWithStringUUID)) {
+      return false;
+    }
+    RecordWithStringUUID that = (RecordWithStringUUID) obj;
+    return this.uuid.equals(that.uuid);
+  }
+}
+
+class RecordWithUUIDArray {
+  UUID[] uuids;
+
+  @Override
+  public int hashCode() {
+    return Arrays.hashCode(uuids);
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (obj == null) {
+      return false;
+    }
+    if (!(obj instanceof RecordWithUUIDArray)) {
+      return false;
+    }
+    RecordWithUUIDArray that = (RecordWithUUIDArray) obj;
+    return Arrays.equals(this.uuids, that.uuids);
+  }
+}
+
+class RecordWithUUIDList {
+  List<UUID> uuids;
+
+  @Override
+  public int hashCode() {
+    return uuids.hashCode();
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (obj == null) {
+      return false;
+    }
+    if (!(obj instanceof RecordWithUUIDList)) {
+      return false;
+    }
+    RecordWithUUIDList that = (RecordWithUUIDList) obj;
+    return this.uuids.equals(that.uuids);
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b24a1d1/parquet-column/src/main/java/org/apache/parquet/io/api/Binary.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/io/api/Binary.java b/parquet-column/src/main/java/org/apache/parquet/io/api/Binary.java
index 4a167bd..30787f0 100644
--- a/parquet-column/src/main/java/org/apache/parquet/io/api/Binary.java
+++ b/parquet-column/src/main/java/org/apache/parquet/io/api/Binary.java
@@ -25,6 +25,10 @@ import java.io.OutputStream;
 import java.io.Serializable;
 import java.io.UnsupportedEncodingException;
 import java.nio.ByteBuffer;
+import java.nio.CharBuffer;
+import java.nio.charset.CharacterCodingException;
+import java.nio.charset.CharsetEncoder;
+import java.nio.charset.StandardCharsets;
 import java.util.Arrays;
 
 import org.apache.parquet.io.ParquetEncodingException;
@@ -209,26 +213,33 @@ abstract public class Binary implements Comparable<Binary>, Serializable {
 
   }
 
-  private static class FromStringBinary extends ByteArrayBackedBinary {
-    public FromStringBinary(String value) {
-      // reused is false, because we do not
-      // hold on to the underlying bytes,
-      // and nobody else has a handle to them
+  private static class FromStringBinary extends ByteBufferBackedBinary {
+    public FromStringBinary(CharSequence value) {
+      // reused is false, because we do not hold on to the buffer after
+      // conversion, and nobody else has a handle to it
       super(encodeUTF8(value), false);
     }
 
-    private static byte[] encodeUTF8(String value) {
-      try {
-        return value.getBytes("UTF-8");
-      } catch (UnsupportedEncodingException e) {
-        throw new ParquetEncodingException("UTF-8 not supported.", e);
-      }
-    }
-
     @Override
     public String toString() {
       return "Binary{\"" + toStringUsingUTF8() + "\"}";
     }
+
+    private static final ThreadLocal<CharsetEncoder> ENCODER =
+        new ThreadLocal<CharsetEncoder>() {
+          @Override
+          protected CharsetEncoder initialValue() {
+            return StandardCharsets.UTF_8.newEncoder();
+          }
+        };
+
+    private static ByteBuffer encodeUTF8(CharSequence value) {
+      try {
+        return ENCODER.get().encode(CharBuffer.wrap(value));
+      } catch (CharacterCodingException e) {
+        throw new ParquetEncodingException("UTF-8 not supported.", e);
+      }
+    }
   }
 
   public static Binary fromReusedByteArray(final byte[] value, final int offset, final int length) {
@@ -359,6 +370,13 @@ abstract public class Binary implements Comparable<Binary>, Serializable {
     private int offset;
     private int length;
 
+    public ByteBufferBackedBinary(ByteBuffer value, boolean isBackingBytesReused) {
+      this.value = value;
+      this.offset = value.position();
+      this.length = value.remaining();
+      this.isBackingBytesReused = isBackingBytesReused;
+    }
+
     public ByteBufferBackedBinary(ByteBuffer value, int offset, int length, boolean isBackingBytesReused) {
       this.value = value;
       this.offset = offset;
@@ -521,11 +539,11 @@ abstract public class Binary implements Comparable<Binary>, Serializable {
   }
 
   public static Binary fromReusedByteBuffer(final ByteBuffer value) {
-    return new ByteBufferBackedBinary(value, value.position(), value.remaining(), true);
+    return new ByteBufferBackedBinary(value, true);
   }
 
   public static Binary fromConstantByteBuffer(final ByteBuffer value) {
-    return new ByteBufferBackedBinary(value, value.position(), value.remaining(), false);
+    return new ByteBufferBackedBinary(value, false);
   }
 
   @Deprecated
@@ -536,7 +554,12 @@ abstract public class Binary implements Comparable<Binary>, Serializable {
     return fromReusedByteBuffer(value); // Assume producer intends to reuse byte[]
   }
 
-  public static Binary fromString(final String value) {
+  public static Binary fromString(String value) {
+    // this method is for binary backward-compatibility
+    return fromString((CharSequence) value);
+  }
+
+  public static Binary fromString(CharSequence value) {
     return new FromStringBinary(value);
   }
 

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b24a1d1/parquet-column/src/main/java/org/apache/parquet/schema/Types.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/schema/Types.java b/parquet-column/src/main/java/org/apache/parquet/schema/Types.java
index d3cc97b..9af71af 100644
--- a/parquet-column/src/main/java/org/apache/parquet/schema/Types.java
+++ b/parquet-column/src/main/java/org/apache/parquet/schema/Types.java
@@ -336,9 +336,9 @@ public class Types {
      * @param length an int length
      * @return this builder for method chaining
      */
-    public BasePrimitiveBuilder<P, THIS> length(int length) {
+    public THIS length(int length) {
       this.length = length;
-      return this;
+      return self();
     }
 
     /**
@@ -351,9 +351,9 @@ public class Types {
      * @param precision an int precision value for the DECIMAL
      * @return this builder for method chaining
      */
-    public BasePrimitiveBuilder<P, THIS> precision(int precision) {
+    public THIS precision(int precision) {
       this.precision = precision;
-      return this;
+      return self();
     }
 
     /**
@@ -369,9 +369,9 @@ public class Types {
      * @param scale an int scale value for the DECIMAL
      * @return this builder for method chaining
      */
-    public BasePrimitiveBuilder<P, THIS> scale(int scale) {
+    public THIS scale(int scale) {
       this.scale = scale;
-      return this;
+      return self();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b24a1d1/parquet-column/src/test/java/org/apache/parquet/io/api/TestBinary.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/org/apache/parquet/io/api/TestBinary.java b/parquet-column/src/test/java/org/apache/parquet/io/api/TestBinary.java
index 88aa4b2..a541e1b 100644
--- a/parquet-column/src/test/java/org/apache/parquet/io/api/TestBinary.java
+++ b/parquet-column/src/test/java/org/apache/parquet/io/api/TestBinary.java
@@ -198,16 +198,6 @@ public class TestBinary {
     Binary copy = bao.binary.copy();
 
     assertSame(copy, bao.binary);
-
-    mutate(bao.original);
-
-    byte[] expected = testString.getBytes(UTF8);
-    mutate(expected);
-
-    assertArrayEquals(expected, copy.getBytes());
-    assertArrayEquals(expected, copy.getBytesUnsafe());
-    assertArrayEquals(expected, copy.copy().getBytesUnsafe());
-    assertArrayEquals(expected, copy.copy().getBytes());
   }
 
   private void testReusedCopy(BinaryFactory bf) throws Exception {

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b24a1d1/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index f606faa..9314a53 100644
--- a/pom.xml
+++ b/pom.xml
@@ -93,6 +93,7 @@
     <fastutil.version>6.5.7</fastutil.version>
     <semver.api.version>0.9.33</semver.api.version>
     <slf4j.version>1.7.5</slf4j.version>
+    <avro.version>1.8.0</avro.version>
   </properties>
 
   <modules>


[2/2] parquet-mr git commit: PARQUET-358: Add support for Avro's logical types API.

Posted by bl...@apache.org.
PARQUET-358: Add support for Avro's logical types API.

This adds support for Avro's logical types API to parquet-avro.

* The logical types API was introduced in Avro 1.8.0, so this bumps the Avro dependency version to 1.8.0.
* Types supported are: decimal, date, time-millis, time-micros, timestamp-millis, and timestamp-micros
* Tests have been copied from Avro and ported to the parquet-avro API

Author: Ryan Blue <bl...@apache.org>

Closes #318 from rdblue/PARQUET-358-add-avro-logical-types-api and squashes the following commits:

bd81f9c [Ryan Blue] PARQUET-358: Fix review items.
0a882ee [Ryan Blue] PARQUET-358: Add logical types circular reference test.
5124618 [Ryan Blue] PARQUET-358: Add license documentation for code from Avro.
dcb14be [Ryan Blue] PARQUET-358: Add support for Avro's logical types API.


Project: http://git-wip-us.apache.org/repos/asf/parquet-mr/repo
Commit: http://git-wip-us.apache.org/repos/asf/parquet-mr/commit/6b24a1d1
Tree: http://git-wip-us.apache.org/repos/asf/parquet-mr/tree/6b24a1d1
Diff: http://git-wip-us.apache.org/repos/asf/parquet-mr/diff/6b24a1d1

Branch: refs/heads/master
Commit: 6b24a1d1b5e2792a7821ad172a45e38d2b04f9b8
Parents: 82b8ecc
Author: Ryan Blue <bl...@apache.org>
Authored: Wed Apr 20 08:41:22 2016 -0700
Committer: Ryan Blue <bl...@apache.org>
Committed: Wed Apr 20 08:41:22 2016 -0700

----------------------------------------------------------------------
 LICENSE                                         |   8 +
 NOTICE                                          |  11 +
 parquet-avro/pom.xml                            |   4 -
 .../avro/AvroIndexedRecordConverter.java        |  18 +-
 .../apache/parquet/avro/AvroReadSupport.java    |   4 +-
 .../parquet/avro/AvroRecordConverter.java       | 121 +++-
 .../parquet/avro/AvroSchemaConverter.java       | 147 ++--
 .../apache/parquet/avro/AvroWriteSupport.java   | 167 +++--
 .../parquet/avro/ParentValueContainer.java      | 175 +++++
 .../src/main/resources/META-INF/LICENSE         | 186 +++++
 parquet-avro/src/main/resources/META-INF/NOTICE |  18 +
 .../org/apache/parquet/avro/AvroTestUtil.java   |  53 ++
 .../parquet/avro/TestAvroSchemaConverter.java   | 278 +++++++-
 .../parquet/avro/TestCircularReferences.java    | 383 ++++++++++
 .../parquet/avro/TestGenericLogicalTypes.java   | 271 +++++++
 .../org/apache/parquet/avro/TestReadWrite.java  | 118 +++-
 .../avro/TestReadWriteOldListBehavior.java      |   1 -
 .../parquet/avro/TestReflectLogicalTypes.java   | 705 +++++++++++++++++++
 .../java/org/apache/parquet/io/api/Binary.java  |  55 +-
 .../java/org/apache/parquet/schema/Types.java   |  12 +-
 .../org/apache/parquet/io/api/TestBinary.java   |  10 -
 pom.xml                                         |   1 +
 22 files changed, 2593 insertions(+), 153 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b24a1d1/LICENSE
----------------------------------------------------------------------
diff --git a/LICENSE b/LICENSE
index b759148..b006581 100644
--- a/LICENSE
+++ b/LICENSE
@@ -178,6 +178,14 @@
 
 --------------------------------------------------------------------------------
 
+This product includes code from Apache Avro.
+
+Copyright: 2014 The Apache Software Foundation.
+Home page: https://avro.apache.org/
+License: http://www.apache.org/licenses/LICENSE-2.0
+
+--------------------------------------------------------------------------------
+
 This project includes code from Daniel Lemire's JavaFastPFOR project. The
 "Lemire" bit packing source code produced by parquet-generator is derived from
 the JavaFastPFOR project.

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b24a1d1/NOTICE
----------------------------------------------------------------------
diff --git a/NOTICE b/NOTICE
index c6e3bf2..a9b6c56 100644
--- a/NOTICE
+++ b/NOTICE
@@ -43,3 +43,14 @@ with the following copyright notice:
   See the License for the specific language governing permissions and
   limitations under the License.
 
+--------------------------------------------------------------------------------
+
+This product includes code from Apache Avro, which includes the following in
+its NOTICE file:
+
+  Apache Avro
+  Copyright 2010-2015 The Apache Software Foundation
+
+  This product includes software developed at
+  The Apache Software Foundation (http://www.apache.org/).
+

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b24a1d1/parquet-avro/pom.xml
----------------------------------------------------------------------
diff --git a/parquet-avro/pom.xml b/parquet-avro/pom.xml
index aad197d..50c37db 100644
--- a/parquet-avro/pom.xml
+++ b/parquet-avro/pom.xml
@@ -32,10 +32,6 @@
   <name>Apache Parquet Avro</name>
   <url>https://parquet.apache.org</url>
 
-  <properties>
-    <avro.version>1.7.6</avro.version>
-  </properties>
-
   <dependencies>
     <dependency>
       <groupId>org.apache.parquet</groupId>

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b24a1d1/parquet-avro/src/main/java/org/apache/parquet/avro/AvroIndexedRecordConverter.java
----------------------------------------------------------------------
diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroIndexedRecordConverter.java b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroIndexedRecordConverter.java
index 06c66d6..48eab4d 100644
--- a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroIndexedRecordConverter.java
+++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroIndexedRecordConverter.java
@@ -21,6 +21,8 @@ package org.apache.parquet.avro;
 import java.lang.reflect.Constructor;
 import java.util.HashMap;
 import java.util.Map;
+import org.apache.avro.Conversion;
+import org.apache.avro.LogicalType;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericArray;
 import org.apache.avro.generic.GenericData;
@@ -111,6 +113,11 @@ class AvroIndexedRecordConverter<T extends IndexedRecord> extends GroupConverter
 
   @SuppressWarnings("unchecked")
   private static <T> Class<T> getDatumClass(GenericData model, Schema schema) {
+    if (model.getConversionFor(schema.getLogicalType()) != null) {
+      // use generic classes to pass data to conversions
+      return null;
+    }
+
     if (model instanceof SpecificData) {
       return (Class<T>) ((SpecificData) model).getClass(schema);
     }
@@ -133,7 +140,16 @@ class AvroIndexedRecordConverter<T extends IndexedRecord> extends GroupConverter
   }
 
   private static Converter newConverter(Schema schema, Type type,
-      GenericData model, ParentValueContainer parent) {
+      GenericData model, ParentValueContainer setter) {
+
+    LogicalType logicalType = schema.getLogicalType();
+    // the expected type is always null because it is determined by the parent
+    // datum class, which never helps for generic. when logical types are added
+    // to specific, this should pass the expected type here.
+    Conversion<?> conversion = model.getConversionFor(logicalType);
+    ParentValueContainer parent = ParentValueContainer
+        .getConversionContainer(setter, conversion, schema);
+
     if (schema.getType().equals(Schema.Type.BOOLEAN)) {
       return new AvroConverters.FieldBooleanConverter(parent);
     } else if (schema.getType().equals(Schema.Type.INT)) {

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b24a1d1/parquet-avro/src/main/java/org/apache/parquet/avro/AvroReadSupport.java
----------------------------------------------------------------------
diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroReadSupport.java b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroReadSupport.java
index e73e8af..7d55bf5 100644
--- a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroReadSupport.java
+++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroReadSupport.java
@@ -110,9 +110,9 @@ public class AvroReadSupport<T> extends ReadSupport<T> {
     MessageType parquetSchema = readContext.getRequestedSchema();
     Schema avroSchema;
 
-    if (readContext.getReadSupportMetadata().get(AVRO_READ_SCHEMA_METADATA_KEY) != null) {
+    if (metadata.get(AVRO_READ_SCHEMA_METADATA_KEY) != null) {
       // use the Avro read schema provided by the user
-      avroSchema = new Schema.Parser().parse(readContext.getReadSupportMetadata().get(AVRO_READ_SCHEMA_METADATA_KEY));
+      avroSchema = new Schema.Parser().parse(metadata.get(AVRO_READ_SCHEMA_METADATA_KEY));
     } else if (keyValueMetaData.get(AVRO_SCHEMA_METADATA_KEY) != null) {
       // use the Avro schema from the file metadata if present
       avroSchema = new Schema.Parser().parse(keyValueMetaData.get(AVRO_SCHEMA_METADATA_KEY));

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b24a1d1/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordConverter.java
----------------------------------------------------------------------
diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordConverter.java b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordConverter.java
index 38a761c..10bb29b 100644
--- a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordConverter.java
+++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordConverter.java
@@ -29,6 +29,7 @@ import it.unimi.dsi.fastutil.shorts.ShortArrayList;
 import java.lang.reflect.Field;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
+import java.lang.reflect.Modifier;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
@@ -36,8 +37,14 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.LinkedHashMap;
+import org.apache.avro.AvroTypeException;
+import org.apache.avro.Conversion;
+import org.apache.avro.LogicalType;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericData;
+import org.apache.avro.reflect.AvroIgnore;
+import org.apache.avro.reflect.AvroName;
 import org.apache.avro.reflect.ReflectData;
 import org.apache.avro.reflect.Stringable;
 import org.apache.avro.specific.SpecificData;
@@ -69,7 +76,8 @@ class AvroRecordConverter<T> extends AvroConverters.AvroGroupConverter {
   private static final String JAVA_CLASS_PROP = "java-class";
   private static final String JAVA_KEY_CLASS_PROP = "java-key-class";
 
-  protected T currentRecord;
+  protected T currentRecord = null;
+  private ParentValueContainer rootContainer = null;
   private final Converter[] converters;
 
   private final Schema avroSchema;
@@ -80,6 +88,15 @@ class AvroRecordConverter<T> extends AvroConverters.AvroGroupConverter {
   public AvroRecordConverter(MessageType parquetSchema, Schema avroSchema,
                              GenericData baseModel) {
     this(null, parquetSchema, avroSchema, baseModel);
+    LogicalType logicalType = avroSchema.getLogicalType();
+    Conversion<?> conversion = baseModel.getConversionFor(logicalType);
+    this.rootContainer = ParentValueContainer.getConversionContainer(new ParentValueContainer() {
+      @Override
+      @SuppressWarnings("unchecked")
+      public void add(Object value) {
+        AvroRecordConverter.this.currentRecord = (T) value;
+      }
+    }, conversion, avroSchema);
   }
 
   public AvroRecordConverter(ParentValueContainer parent,
@@ -101,6 +118,8 @@ class AvroRecordConverter<T> extends AvroConverters.AvroGroupConverter {
       recordClass = getDatumClass(avroSchema, model);
     }
 
+    Map<String, Class<?>> fields = getFieldsByName(recordClass, false);
+
     int parquetFieldIndex = 0;
     for (Type parquetField: parquetSchema.getFields()) {
       final Schema.Field avroField = getAvroField(parquetField.getName());
@@ -112,8 +131,10 @@ class AvroRecordConverter<T> extends AvroConverters.AvroGroupConverter {
           AvroRecordConverter.this.set(avroField.name(), finalAvroIndex, value);
         }
       };
+
+      Class<?> fieldClass = fields.get(avroField.name());
       converters[parquetFieldIndex] = newConverter(
-          nonNullSchema, parquetField, this.model, container);
+          nonNullSchema, parquetField, this.model, fieldClass, container);
 
       // @Stringable doesn't affect the reflected schema; must be enforced here
       if (recordClass != null &&
@@ -147,6 +168,43 @@ class AvroRecordConverter<T> extends AvroConverters.AvroGroupConverter {
     }
   }
 
+  // this was taken from Avro's ReflectData
+  private static Map<String, Class<?>> getFieldsByName(Class<?> recordClass,
+                                                       boolean excludeJava) {
+    Map<String, Class<?>> fields = new LinkedHashMap<String, Class<?>>();
+
+    if (recordClass != null) {
+      Class<?> current = recordClass;
+      do {
+        if (excludeJava && current.getPackage() != null
+            && current.getPackage().getName().startsWith("java.")) {
+          break; // skip java built-in classes
+        }
+        for (Field field : current.getDeclaredFields()) {
+          if (field.isAnnotationPresent(AvroIgnore.class) ||
+              isTransientOrStatic(field)) {
+            continue;
+          }
+          AvroName altName = field.getAnnotation(AvroName.class);
+          Class<?> existing = fields.put(
+              altName != null ? altName.value() : field.getName(),
+              field.getType());
+          if (existing != null) {
+            throw new AvroTypeException(
+                current + " contains two fields named: " + field.getName());
+          }
+        }
+        current = current.getSuperclass();
+      } while (current != null);
+    }
+
+    return fields;
+  }
+
+  private static boolean isTransientOrStatic(Field field) {
+    return (field.getModifiers() & (Modifier.TRANSIENT | Modifier.STATIC)) != 0;
+  }
+
   private Schema.Field getAvroField(String parquetFieldName) {
     Schema.Field avroField = avroSchema.getField(parquetFieldName);
     if (avroField != null) {
@@ -164,12 +222,28 @@ class AvroRecordConverter<T> extends AvroConverters.AvroGroupConverter {
         parquetFieldName));
   }
 
+  private static Converter newConverter(
+      Schema schema, Type type, GenericData model, ParentValueContainer setter) {
+    return newConverter(schema, type, model, null, setter);
+  }
+
   private static Converter newConverter(Schema schema, Type type,
-      GenericData model, ParentValueContainer parent) {
+      GenericData model, Class<?> knownClass, ParentValueContainer setter) {
+    LogicalType logicalType = schema.getLogicalType();
+    Conversion<?> conversion;
+    if (knownClass != null) {
+      conversion = model.getConversionByClass(knownClass, logicalType);
+    } else {
+      conversion = model.getConversionFor(logicalType);
+    }
+
+    ParentValueContainer parent = ParentValueContainer
+        .getConversionContainer(setter, conversion, schema);
+
     if (schema.getType().equals(Schema.Type.BOOLEAN)) {
       return new AvroConverters.FieldBooleanConverter(parent);
     } else if (schema.getType().equals(Schema.Type.INT)) {
-      Class<?> datumClass = getDatumClass(schema, model);
+      Class<?> datumClass = getDatumClass(conversion, knownClass, schema, model);
       if (datumClass == null) {
         return new AvroConverters.FieldIntegerConverter(parent);
       } else if (datumClass == byte.class || datumClass == Byte.class) {
@@ -187,7 +261,7 @@ class AvroRecordConverter<T> extends AvroConverters.AvroGroupConverter {
     } else if (schema.getType().equals(Schema.Type.DOUBLE)) {
       return new AvroConverters.FieldDoubleConverter(parent);
     } else if (schema.getType().equals(Schema.Type.BYTES)) {
-      Class<?> datumClass = getDatumClass(schema, model);
+      Class<?> datumClass = getDatumClass(conversion, knownClass, schema, model);
       if (datumClass == null) {
         return new AvroConverters.FieldByteBufferConverter(parent);
       } else if (datumClass.isArray() && datumClass.getComponentType() == byte.class) {
@@ -201,7 +275,7 @@ class AvroRecordConverter<T> extends AvroConverters.AvroGroupConverter {
     } else if (schema.getType().equals(Schema.Type.ENUM)) {
       return new AvroConverters.FieldEnumConverter(parent, schema, model);
     } else if (schema.getType().equals(Schema.Type.ARRAY)) {
-      Class<?> datumClass = getDatumClass(schema, model);
+      Class<?> datumClass = getDatumClass(conversion, knownClass, schema, model);
       if (datumClass != null && datumClass.isArray()) {
         return new AvroArrayConverter(
             parent, type.asGroupType(), schema, model, datumClass);
@@ -265,8 +339,24 @@ class AvroRecordConverter<T> extends AvroConverters.AvroGroupConverter {
     }
   }
 
-  @SuppressWarnings("unchecked")
   private static <T> Class<T> getDatumClass(Schema schema, GenericData model) {
+    return getDatumClass(null, null, schema, model);
+  }
+
+  @SuppressWarnings("unchecked")
+  private static <T> Class<T> getDatumClass(Conversion<?> conversion,
+                                            Class<T> knownClass,
+                                            Schema schema, GenericData model) {
+    if (conversion != null) {
+      // use generic classes to pass data to conversions
+      return null;
+    }
+
+    // known class can be set when using reflect
+    if (knownClass != null) {
+      return knownClass;
+    }
+
     if (model instanceof SpecificData) {
       // this works for reflect as well
       return ((SpecificData) model).getClass(schema);
@@ -314,6 +404,9 @@ class AvroRecordConverter<T> extends AvroConverters.AvroGroupConverter {
     fillInDefaults();
     if (parent != null) {
       parent.add(currentRecord);
+    } else {
+      // this applies any converters needed for the root value
+      rootContainer.add(currentRecord);
     }
   }
 
@@ -502,10 +595,10 @@ class AvroRecordConverter<T> extends AvroConverters.AvroGroupConverter {
       // matching it against the element schema.
       if (isElementType(repeatedType, elementSchema)) {
         // the element type is the repeated type (and required)
-        converter = newConverter(elementSchema, repeatedType, model, setter);
+        converter = newConverter(elementSchema, repeatedType, model, elementClass, setter);
       } else {
         // the element is wrapped in a synthetic group and may be optional
-        converter = new PrimitiveElementConverter(
+        converter = new ArrayElementConverter(
             repeatedType.asGroupType(), elementSchema, model, setter);
       }
     }
@@ -643,20 +736,20 @@ class AvroRecordConverter<T> extends AvroConverters.AvroGroupConverter {
      *   }
      * </pre>
      */
-    final class PrimitiveElementConverter extends GroupConverter {
+    final class ArrayElementConverter extends GroupConverter {
       private boolean isSet;
       private final Converter elementConverter;
 
-      public PrimitiveElementConverter(GroupType repeatedType,
-                                       Schema elementSchema, GenericData model,
-                                       final ParentValueContainer setter) {
+      public ArrayElementConverter(GroupType repeatedType,
+                                   Schema elementSchema, GenericData model,
+                                   final ParentValueContainer setter) {
         Type elementType = repeatedType.getType(0);
         Preconditions.checkArgument(
             !elementClass.isPrimitive() || elementType.isRepetition(REQUIRED),
             "Cannot convert list of optional elements to primitive array");
         Schema nonNullElementSchema = AvroSchemaConverter.getNonNull(elementSchema);
         this.elementConverter = newConverter(
-            nonNullElementSchema, elementType, model, new ParentValueContainer() {
+            nonNullElementSchema, elementType, model, elementClass, new ParentValueContainer() {
               @Override
               public void add(Object value) {
                 isSet = true;

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b24a1d1/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java
----------------------------------------------------------------------
diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java
index 6cfa8d1..6b9b94c 100644
--- a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java
+++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java
@@ -18,20 +18,26 @@
  */
 package org.apache.parquet.avro;
 
-import java.util.*;
-
+import org.apache.avro.LogicalType;
+import org.apache.avro.LogicalTypes;
 import org.apache.avro.Schema;
 
 import org.apache.hadoop.conf.Configuration;
-import org.codehaus.jackson.node.NullNode;
 import org.apache.parquet.schema.ConversionPatterns;
+import org.apache.parquet.schema.DecimalMetadata;
 import org.apache.parquet.schema.GroupType;
 import org.apache.parquet.schema.MessageType;
 import org.apache.parquet.schema.OriginalType;
 import org.apache.parquet.schema.PrimitiveType;
-import org.apache.parquet.schema.Type;
 import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
+import org.apache.parquet.schema.Type;
+import org.apache.parquet.schema.Types;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
 
+import static org.apache.avro.JsonProperties.NULL_VALUE;
 import static org.apache.parquet.avro.AvroWriteSupport.WRITE_OLD_LIST_STRUCTURE;
 import static org.apache.parquet.avro.AvroWriteSupport.WRITE_OLD_LIST_STRUCTURE_DEFAULT;
 import static org.apache.parquet.schema.OriginalType.*;
@@ -113,26 +119,28 @@ public class AvroSchemaConverter {
     return convertField(fieldName, schema, Type.Repetition.REQUIRED);
   }
 
+  @SuppressWarnings("deprecation")
   private Type convertField(String fieldName, Schema schema, Type.Repetition repetition) {
+    Types.PrimitiveBuilder<PrimitiveType> builder;
     Schema.Type type = schema.getType();
     if (type.equals(Schema.Type.BOOLEAN)) {
-      return primitive(fieldName, BOOLEAN, repetition);
+      builder = Types.primitive(BOOLEAN, repetition);
     } else if (type.equals(Schema.Type.INT)) {
-      return primitive(fieldName, INT32, repetition);
+      builder = Types.primitive(INT32, repetition);
     } else if (type.equals(Schema.Type.LONG)) {
-      return primitive(fieldName, INT64, repetition);
+      builder = Types.primitive(INT64, repetition);
     } else if (type.equals(Schema.Type.FLOAT)) {
-      return primitive(fieldName, FLOAT, repetition);
+      builder = Types.primitive(FLOAT, repetition);
     } else if (type.equals(Schema.Type.DOUBLE)) {
-      return primitive(fieldName, DOUBLE, repetition);
+      builder = Types.primitive(DOUBLE, repetition);
     } else if (type.equals(Schema.Type.BYTES)) {
-      return primitive(fieldName, BINARY, repetition);
+      builder = Types.primitive(BINARY, repetition);
     } else if (type.equals(Schema.Type.STRING)) {
-      return primitive(fieldName, BINARY, repetition, UTF8);
+      builder = Types.primitive(BINARY, repetition).as(UTF8);
     } else if (type.equals(Schema.Type.RECORD)) {
       return new GroupType(repetition, fieldName, convertFields(schema.getFields()));
     } else if (type.equals(Schema.Type.ENUM)) {
-      return primitive(fieldName, BINARY, repetition, ENUM);
+      builder = Types.primitive(BINARY, repetition).as(ENUM);
     } else if (type.equals(Schema.Type.ARRAY)) {
       if (writeOldListStructure) {
         return ConversionPatterns.listType(repetition, fieldName,
@@ -146,16 +154,36 @@ public class AvroSchemaConverter {
       // avro map key type is always string
       return ConversionPatterns.stringKeyMapType(repetition, fieldName, valType);
     } else if (type.equals(Schema.Type.FIXED)) {
-      return primitive(fieldName, FIXED_LEN_BYTE_ARRAY, repetition,
-                       schema.getFixedSize(), null);
+      builder = Types.primitive(FIXED_LEN_BYTE_ARRAY, repetition)
+          .length(schema.getFixedSize());
     } else if (type.equals(Schema.Type.UNION)) {
       return convertUnion(fieldName, schema, repetition);
+    } else {
+      throw new UnsupportedOperationException("Cannot convert Avro type " + type);
     }
-    throw new UnsupportedOperationException("Cannot convert Avro type " + type);
+
+    // schema translation can only be done for known logical types because this
+    // creates an equivalence
+    LogicalType logicalType = schema.getLogicalType();
+    if (logicalType != null) {
+      if (logicalType instanceof LogicalTypes.Decimal) {
+        builder = builder.as(DECIMAL)
+            .precision(((LogicalTypes.Decimal) logicalType).getPrecision())
+            .scale(((LogicalTypes.Decimal) logicalType).getScale());
+
+      } else {
+        OriginalType annotation = convertLogicalType(logicalType);
+        if (annotation != null) {
+          builder.as(annotation);
+        }
+      }
+    }
+
+    return builder.named(fieldName);
   }
 
   private Type convertUnion(String fieldName, Schema schema, Type.Repetition repetition) {
-    List<Schema> nonNullSchemas = new ArrayList(schema.getTypes().size());
+    List<Schema> nonNullSchemas = new ArrayList<Schema>(schema.getTypes().size());
     for (Schema childSchema : schema.getTypes()) {
       if (childSchema.getType().equals(Schema.Type.NULL)) {
         if (Type.Repetition.REQUIRED == repetition) {
@@ -175,7 +203,7 @@ public class AvroSchemaConverter {
         return convertField(fieldName, nonNullSchemas.get(0), repetition);
 
       default: // complex union type
-        List<Type> unionTypes = new ArrayList(nonNullSchemas.size());
+        List<Type> unionTypes = new ArrayList<Type>(nonNullSchemas.size());
         int index = 0;
         for (Schema childSchema : nonNullSchemas) {
           unionTypes.add( convertField("member" + index++, childSchema, Type.Repetition.OPTIONAL));
@@ -188,24 +216,6 @@ public class AvroSchemaConverter {
     return convertField(field.name(), field.schema());
   }
 
-  private PrimitiveType primitive(String name,
-      PrimitiveType.PrimitiveTypeName primitive, Type.Repetition repetition,
-      int typeLength, OriginalType originalType) {
-    return new PrimitiveType(repetition, primitive, typeLength, name,
-                             originalType);
-  }
-
-  private PrimitiveType primitive(String name,
-      PrimitiveType.PrimitiveTypeName primitive, Type.Repetition repetition,
-      OriginalType originalType) {
-    return new PrimitiveType(repetition, primitive, name, originalType);
-  }
-
-  private PrimitiveType primitive(String name,
-      PrimitiveType.PrimitiveTypeName primitive, Type.Repetition repetition) {
-    return new PrimitiveType(repetition, primitive, name, null);
-  }
-
   public Schema convert(MessageType parquetSchema) {
     return convertFields(parquetSchema.getName(), parquetSchema.getFields());
   }
@@ -217,10 +227,11 @@ public class AvroSchemaConverter {
       if (parquetType.isRepetition(REPEATED)) {
         throw new UnsupportedOperationException("REPEATED not supported outside LIST or MAP. Type: " + parquetType);
       } else if (parquetType.isRepetition(Type.Repetition.OPTIONAL)) {
-        fields.add(new Schema.Field(parquetType.getName(), optional(fieldSchema), null,
-            NullNode.getInstance()));
+        fields.add(new Schema.Field(
+            parquetType.getName(), optional(fieldSchema), null, NULL_VALUE));
       } else { // REQUIRED
-        fields.add(new Schema.Field(parquetType.getName(), fieldSchema, null, null));
+        fields.add(new Schema.Field(
+            parquetType.getName(), fieldSchema, null, (Object) null));
       }
     }
     Schema schema = Schema.createRecord(name, null, null, false);
@@ -230,10 +241,11 @@ public class AvroSchemaConverter {
 
   private Schema convertField(final Type parquetType) {
     if (parquetType.isPrimitive()) {
+      final PrimitiveType asPrimitive = parquetType.asPrimitiveType();
       final PrimitiveTypeName parquetPrimitiveTypeName =
-          parquetType.asPrimitiveType().getPrimitiveTypeName();
-      final OriginalType originalType = parquetType.getOriginalType();
-      return parquetPrimitiveTypeName.convert(
+          asPrimitive.getPrimitiveTypeName();
+      final OriginalType annotation = parquetType.getOriginalType();
+      Schema schema = parquetPrimitiveTypeName.convert(
           new PrimitiveType.PrimitiveTypeNameConverter<Schema, RuntimeException>() {
             @Override
             public Schema convertBOOLEAN(PrimitiveTypeName primitiveTypeName) {
@@ -266,13 +278,24 @@ public class AvroSchemaConverter {
             }
             @Override
             public Schema convertBINARY(PrimitiveTypeName primitiveTypeName) {
-              if (originalType == OriginalType.UTF8 || originalType == OriginalType.ENUM) {
+              if (annotation == OriginalType.UTF8 || annotation == OriginalType.ENUM) {
                 return Schema.create(Schema.Type.STRING);
               } else {
                 return Schema.create(Schema.Type.BYTES);
               }
             }
           });
+
+      LogicalType logicalType = convertOriginalType(
+          annotation, asPrimitive.getDecimalMetadata());
+      if (logicalType != null && (annotation != DECIMAL ||
+          parquetPrimitiveTypeName == BINARY ||
+          parquetPrimitiveTypeName == FIXED_LEN_BYTE_ARRAY)) {
+        schema = logicalType.addToSchema(schema);
+      }
+
+      return schema;
+
     } else {
       GroupType parquetGroupType = parquetType.asGroupType();
       OriginalType originalType = parquetGroupType.getOriginalType();
@@ -335,6 +358,46 @@ public class AvroSchemaConverter {
     }
   }
 
+  private OriginalType convertLogicalType(LogicalType logicalType) {
+    if (logicalType == null) {
+      return null;
+    } else if (logicalType instanceof LogicalTypes.Decimal) {
+      return OriginalType.DECIMAL;
+    } else if (logicalType instanceof LogicalTypes.Date) {
+      return OriginalType.DATE;
+    } else if (logicalType instanceof LogicalTypes.TimeMillis) {
+      return OriginalType.TIME_MILLIS;
+    } else if (logicalType instanceof LogicalTypes.TimeMicros) {
+      return OriginalType.TIME_MICROS;
+    } else if (logicalType instanceof LogicalTypes.TimestampMillis) {
+      return OriginalType.TIMESTAMP_MILLIS;
+    } else if (logicalType instanceof LogicalTypes.TimestampMicros) {
+      return OriginalType.TIMESTAMP_MICROS;
+    }
+    return null;
+  }
+
+  private LogicalType convertOriginalType(OriginalType annotation, DecimalMetadata meta) {
+    if (annotation == null) {
+      return null;
+    }
+    switch (annotation) {
+      case DECIMAL:
+        return LogicalTypes.decimal(meta.getPrecision(), meta.getScale());
+      case DATE:
+        return LogicalTypes.date();
+      case TIME_MILLIS:
+        return LogicalTypes.timeMillis();
+      case TIME_MICROS:
+        return LogicalTypes.timeMicros();
+      case TIMESTAMP_MILLIS:
+        return LogicalTypes.timestampMillis();
+      case TIMESTAMP_MICROS:
+        return LogicalTypes.timestampMicros();
+    }
+    return null;
+  }
+
   /**
    * Implements the rules for interpreting existing data from the logical type
    * spec for the LIST annotation. This is used to produce the expected schema.

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b24a1d1/parquet-avro/src/main/java/org/apache/parquet/avro/AvroWriteSupport.java
----------------------------------------------------------------------
diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroWriteSupport.java b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroWriteSupport.java
index c75bb03..7fcd88e 100644
--- a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroWriteSupport.java
+++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroWriteSupport.java
@@ -23,6 +23,8 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import org.apache.avro.Conversion;
+import org.apache.avro.LogicalType;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericFixed;
@@ -69,6 +71,8 @@ public class AvroWriteSupport<T> extends WriteSupport<T> {
   private RecordConsumer recordConsumer;
   private MessageType rootSchema;
   private Schema rootAvroSchema;
+  private LogicalType rootLogicalType;
+  private Conversion<?> rootConversion;
   private GenericData model;
   private ListWriter listWriter;
 
@@ -82,6 +86,7 @@ public class AvroWriteSupport<T> extends WriteSupport<T> {
   public AvroWriteSupport(MessageType schema, Schema avroSchema) {
     this.rootSchema = schema;
     this.rootAvroSchema = avroSchema;
+    this.rootLogicalType = rootAvroSchema.getLogicalType();
     this.model = null;
   }
 
@@ -89,6 +94,7 @@ public class AvroWriteSupport<T> extends WriteSupport<T> {
                           GenericData model) {
     this.rootSchema = schema;
     this.rootAvroSchema = avroSchema;
+    this.rootLogicalType = rootAvroSchema.getLogicalType();
     this.model = model;
   }
 
@@ -136,16 +142,25 @@ public class AvroWriteSupport<T> extends WriteSupport<T> {
   // overloaded version for backward compatibility
   @SuppressWarnings("unchecked")
   public void write(IndexedRecord record) {
-    recordConsumer.startMessage();
-    writeRecordFields(rootSchema, rootAvroSchema, record);
-    recordConsumer.endMessage();
+    write((T) record);
   }
 
   @Override
   public void write(T record) {
-    recordConsumer.startMessage();
-    writeRecordFields(rootSchema, rootAvroSchema, record);
-    recordConsumer.endMessage();
+    if (rootLogicalType != null) {
+      Conversion<?> conversion = model.getConversionByClass(
+          record.getClass(), rootLogicalType);
+
+      recordConsumer.startMessage();
+      writeRecordFields(rootSchema, rootAvroSchema,
+          convert(rootAvroSchema, rootLogicalType, conversion, record));
+      recordConsumer.endMessage();
+
+    } else {
+      recordConsumer.startMessage();
+      writeRecordFields(rootSchema, rootAvroSchema, record);
+      recordConsumer.endMessage();
+    }
   }
 
   private void writeRecord(GroupType schema, Schema avroSchema,
@@ -226,6 +241,8 @@ public class AvroWriteSupport<T> extends WriteSupport<T> {
       }
     }
 
+    // TODO: what if the value is null?
+
     // Sparsely populated method of encoding unions, each member has its own
     // set of columns.
     String memberName = "member" + parquetIndex;
@@ -237,44 +254,108 @@ public class AvroWriteSupport<T> extends WriteSupport<T> {
     recordConsumer.endGroup();
   }
 
-  @SuppressWarnings("unchecked")
+  /**
+   * Calls an appropriate write method based on the value.
+   * Value MUST not be null.
+   *
+   * @param type the Parquet type
+   * @param avroSchema the Avro schema
+   * @param value a non-null value to write
+   */
   private void writeValue(Type type, Schema avroSchema, Object value) {
     Schema nonNullAvroSchema = AvroSchemaConverter.getNonNull(avroSchema);
-    Schema.Type avroType = nonNullAvroSchema.getType();
-    if (avroType.equals(Schema.Type.BOOLEAN)) {
-      recordConsumer.addBoolean((Boolean) value);
-    } else if (avroType.equals(Schema.Type.INT)) {
-      if (value instanceof Character) {
-        recordConsumer.addInteger((Character) value);
-      } else {
-        recordConsumer.addInteger(((Number) value).intValue());
-      }
-    } else if (avroType.equals(Schema.Type.LONG)) {
-      recordConsumer.addLong(((Number) value).longValue());
-    } else if (avroType.equals(Schema.Type.FLOAT)) {
-      recordConsumer.addFloat(((Number) value).floatValue());
-    } else if (avroType.equals(Schema.Type.DOUBLE)) {
-      recordConsumer.addDouble(((Number) value).doubleValue());
-    } else if (avroType.equals(Schema.Type.BYTES)) {
-      if (value instanceof byte[]) {
-        recordConsumer.addBinary(Binary.fromReusedByteArray((byte[]) value));
-      } else {
-        recordConsumer.addBinary(Binary.fromReusedByteBuffer((ByteBuffer) value));
-      }
-    } else if (avroType.equals(Schema.Type.STRING)) {
-      recordConsumer.addBinary(fromAvroString(value));
-    } else if (avroType.equals(Schema.Type.RECORD)) {
-      writeRecord(type.asGroupType(), nonNullAvroSchema, value);
-    } else if (avroType.equals(Schema.Type.ENUM)) {
-      recordConsumer.addBinary(Binary.fromString(value.toString()));
-    } else if (avroType.equals(Schema.Type.ARRAY)) {
-      listWriter.writeList(type.asGroupType(), nonNullAvroSchema, value);
-    } else if (avroType.equals(Schema.Type.MAP)) {
-      writeMap(type.asGroupType(), nonNullAvroSchema, (Map<CharSequence, ?>) value);
-    } else if (avroType.equals(Schema.Type.UNION)) {
-      writeUnion(type.asGroupType(), nonNullAvroSchema, value);
-    } else if (avroType.equals(Schema.Type.FIXED)) {
-      recordConsumer.addBinary(Binary.fromReusedByteArray(((GenericFixed) value).bytes()));
+    LogicalType logicalType = nonNullAvroSchema.getLogicalType();
+    if (logicalType != null) {
+      Conversion<?> conversion = model.getConversionByClass(
+          value.getClass(), logicalType);
+      writeValueWithoutConversion(type, nonNullAvroSchema,
+          convert(nonNullAvroSchema, logicalType, conversion, value));
+    } else {
+      writeValueWithoutConversion(type, nonNullAvroSchema, value);
+    }
+  }
+
+  private <D> Object convert(Schema schema, LogicalType logicalType,
+                             Conversion<D> conversion, Object datum) {
+    if (conversion == null) {
+      return datum;
+    }
+    Class<D> fromClass = conversion.getConvertedType();
+    switch (schema.getType()) {
+      case RECORD:  return conversion.toRecord(fromClass.cast(datum), schema, logicalType);
+      case ENUM:    return conversion.toEnumSymbol(fromClass.cast(datum), schema, logicalType);
+      case ARRAY:   return conversion.toArray(fromClass.cast(datum), schema, logicalType);
+      case MAP:     return conversion.toMap(fromClass.cast(datum), schema, logicalType);
+      case FIXED:   return conversion.toFixed(fromClass.cast(datum), schema, logicalType);
+      case STRING:  return conversion.toCharSequence(fromClass.cast(datum), schema, logicalType);
+      case BYTES:   return conversion.toBytes(fromClass.cast(datum), schema, logicalType);
+      case INT:     return conversion.toInt(fromClass.cast(datum), schema, logicalType);
+      case LONG:    return conversion.toLong(fromClass.cast(datum), schema, logicalType);
+      case FLOAT:   return conversion.toFloat(fromClass.cast(datum), schema, logicalType);
+      case DOUBLE:  return conversion.toDouble(fromClass.cast(datum), schema, logicalType);
+      case BOOLEAN: return conversion.toBoolean(fromClass.cast(datum), schema, logicalType);
+    }
+    return datum;
+  }
+
+  /**
+   * Calls an appropriate write method based on the value.
+   * Value must not be null and the schema must not be nullable.
+   *
+   * @param type a Parquet type
+   * @param avroSchema a non-nullable Avro schema
+   * @param value a non-null value to write
+   */
+  @SuppressWarnings("unchecked")
+  private void writeValueWithoutConversion(Type type, Schema avroSchema, Object value) {
+    switch (avroSchema.getType()) {
+      case BOOLEAN:
+        recordConsumer.addBoolean((Boolean) value);
+        break;
+      case INT:
+        if (value instanceof Character) {
+          recordConsumer.addInteger((Character) value);
+        } else {
+          recordConsumer.addInteger(((Number) value).intValue());
+        }
+        break;
+      case LONG:
+        recordConsumer.addLong(((Number) value).longValue());
+        break;
+      case FLOAT:
+        recordConsumer.addFloat(((Number) value).floatValue());
+        break;
+      case DOUBLE:
+        recordConsumer.addDouble(((Number) value).doubleValue());
+        break;
+      case FIXED:
+        recordConsumer.addBinary(Binary.fromReusedByteArray(((GenericFixed) value).bytes()));
+        break;
+      case BYTES:
+        if (value instanceof byte[]) {
+          recordConsumer.addBinary(Binary.fromReusedByteArray((byte[]) value));
+        } else {
+          recordConsumer.addBinary(Binary.fromReusedByteBuffer((ByteBuffer) value));
+        }
+        break;
+      case STRING:
+        recordConsumer.addBinary(fromAvroString(value));
+        break;
+      case RECORD:
+        writeRecord(type.asGroupType(), avroSchema, value);
+        break;
+      case ENUM:
+        recordConsumer.addBinary(Binary.fromString(value.toString()));
+        break;
+      case ARRAY:
+        listWriter.writeList(type.asGroupType(), avroSchema, value);
+        break;
+      case MAP:
+        writeMap(type.asGroupType(), avroSchema, (Map<CharSequence, ?>) value);
+        break;
+      case UNION:
+        writeUnion(type.asGroupType(), avroSchema, value);
+        break;
     }
   }
 
@@ -283,7 +364,7 @@ public class AvroWriteSupport<T> extends WriteSupport<T> {
       Utf8 utf8 = (Utf8) value;
       return Binary.fromReusedByteArray(utf8.getBytes(), 0, utf8.getByteLength());
     }
-    return Binary.fromString(value.toString());
+    return Binary.fromString((CharSequence) value);
   }
 
   private static GenericData getDataModel(Configuration conf) {

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b24a1d1/parquet-avro/src/main/java/org/apache/parquet/avro/ParentValueContainer.java
----------------------------------------------------------------------
diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/ParentValueContainer.java b/parquet-avro/src/main/java/org/apache/parquet/avro/ParentValueContainer.java
index 67b710d..f36f5fc 100644
--- a/parquet-avro/src/main/java/org/apache/parquet/avro/ParentValueContainer.java
+++ b/parquet-avro/src/main/java/org/apache/parquet/avro/ParentValueContainer.java
@@ -18,6 +18,16 @@
  */
 package org.apache.parquet.avro;
 
+import org.apache.avro.Conversion;
+import org.apache.avro.LogicalType;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericEnumSymbol;
+import org.apache.avro.generic.IndexedRecord;
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Map;
+
 abstract class ParentValueContainer {
 
   /**
@@ -60,4 +70,169 @@ abstract class ParentValueContainer {
     add(value);
   }
 
+  static class LogicalTypePrimitiveContainer extends ParentValueContainer {
+    private final ParentValueContainer wrapped;
+    private final Schema schema;
+    private final LogicalType logicalType;
+    private final Conversion conversion;
+
+    public LogicalTypePrimitiveContainer(ParentValueContainer wrapped,
+                                         Schema schema, Conversion conversion) {
+      this.wrapped = wrapped;
+      this.schema = schema;
+      this.logicalType = schema.getLogicalType();
+      this.conversion = conversion;
+    }
+
+    @Override
+    public void addDouble(double value) {
+      wrapped.add(conversion.fromDouble(value, schema, logicalType));
+    }
+
+    @Override
+    public void addFloat(float value) {
+      wrapped.add(conversion.fromFloat(value, schema, logicalType));
+    }
+
+    @Override
+    public void addLong(long value) {
+      wrapped.add(conversion.fromLong(value, schema, logicalType));
+    }
+
+    @Override
+    public void addInt(int value) {
+      wrapped.add(conversion.fromInt(value, schema, logicalType));
+    }
+
+    @Override
+    public void addShort(short value) {
+      wrapped.add(conversion.fromInt((int) value, schema, logicalType));
+    }
+
+    @Override
+    public void addChar(char value) {
+      wrapped.add(conversion.fromInt((int) value, schema, logicalType));
+    }
+
+    @Override
+    public void addByte(byte value) {
+      wrapped.add(conversion.fromInt((int) value, schema, logicalType));
+    }
+
+    @Override
+    public void addBoolean(boolean value) {
+      wrapped.add(conversion.fromBoolean(value, schema, logicalType));
+    }
+  }
+
+  static ParentValueContainer getConversionContainer(
+      final ParentValueContainer parent, final Conversion<?> conversion,
+      final Schema schema) {
+    if (conversion == null) {
+      return parent;
+    }
+
+    final LogicalType logicalType = schema.getLogicalType();
+
+    switch (schema.getType()) {
+      case STRING:
+        return new ParentValueContainer() {
+          @Override
+          public void add(Object value) {
+            parent.add(conversion.fromCharSequence(
+                (CharSequence) value, schema, logicalType));
+          }
+        };
+      case BOOLEAN:
+        return new LogicalTypePrimitiveContainer(parent, schema, conversion) {
+          @Override
+          public void add(Object value) {
+            parent.add(conversion.fromBoolean(
+                (Boolean) value, schema, logicalType));
+          }
+        };
+      case INT:
+        return new LogicalTypePrimitiveContainer(parent, schema, conversion) {
+          @Override
+          public void add(Object value) {
+            parent.add(conversion.fromInt(
+                (Integer) value, schema, logicalType));
+          }
+        };
+      case LONG:
+        return new LogicalTypePrimitiveContainer(parent, schema, conversion) {
+          @Override
+          public void add(Object value) {
+            parent.add(conversion.fromLong(
+                (Long) value, schema, logicalType));
+          }
+        };
+      case FLOAT:
+        return new LogicalTypePrimitiveContainer(parent, schema, conversion) {
+          @Override
+          public void add(Object value) {
+            parent.add(conversion.fromFloat(
+                (Float) value, schema, logicalType));
+          }
+        };
+      case DOUBLE:
+        return new LogicalTypePrimitiveContainer(parent, schema, conversion) {
+          @Override
+          public void add(Object value) {
+            parent.add(conversion.fromDouble(
+                (Double) value, schema, logicalType));
+          }
+        };
+      case BYTES:
+        return new ParentValueContainer() {
+          @Override
+          public void add(Object value) {
+            parent.add(conversion.fromBytes(
+                (ByteBuffer) value, schema, logicalType));
+          }
+        };
+      case FIXED:
+        return new ParentValueContainer() {
+          @Override
+          public void add(Object value) {
+            parent.add(conversion.fromFixed(
+                (GenericData.Fixed) value, schema, logicalType));
+          }
+        };
+      case RECORD:
+        return new ParentValueContainer() {
+          @Override
+          public void add(Object value) {
+            parent.add(conversion.fromRecord(
+                (IndexedRecord) value, schema, logicalType));
+          }
+        };
+      case ARRAY:
+        return new ParentValueContainer() {
+          @Override
+          public void add(Object value) {
+            parent.add(conversion.fromArray(
+                (Collection<?>) value, schema, logicalType));
+          }
+        };
+      case MAP:
+        return new ParentValueContainer() {
+          @Override
+          public void add(Object value) {
+            parent.add(conversion.fromMap(
+                (Map<?, ?>) value, schema, logicalType));
+          }
+        };
+      case ENUM:
+        return new ParentValueContainer() {
+          @Override
+          public void add(Object value) {
+            parent.add(conversion.fromEnumSymbol(
+                (GenericEnumSymbol) value, schema, logicalType));
+          }
+        };
+      default:
+        return new LogicalTypePrimitiveContainer(parent, schema, conversion);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b24a1d1/parquet-avro/src/main/resources/META-INF/LICENSE
----------------------------------------------------------------------
diff --git a/parquet-avro/src/main/resources/META-INF/LICENSE b/parquet-avro/src/main/resources/META-INF/LICENSE
new file mode 100644
index 0000000..20b23c9
--- /dev/null
+++ b/parquet-avro/src/main/resources/META-INF/LICENSE
@@ -0,0 +1,186 @@
+
+                                 Apache License
+                           Version 2.0, January 2004
+                        http://www.apache.org/licenses/
+
+   TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+   1. Definitions.
+
+      "License" shall mean the terms and conditions for use, reproduction,
+      and distribution as defined by Sections 1 through 9 of this document.
+
+      "Licensor" shall mean the copyright owner or entity authorized by
+      the copyright owner that is granting the License.
+
+      "Legal Entity" shall mean the union of the acting entity and all
+      other entities that control, are controlled by, or are under common
+      control with that entity. For the purposes of this definition,
+      "control" means (i) the power, direct or indirect, to cause the
+      direction or management of such entity, whether by contract or
+      otherwise, or (ii) ownership of fifty percent (50%) or more of the
+      outstanding shares, or (iii) beneficial ownership of such entity.
+
+      "You" (or "Your") shall mean an individual or Legal Entity
+      exercising permissions granted by this License.
+
+      "Source" form shall mean the preferred form for making modifications,
+      including but not limited to software source code, documentation
+      source, and configuration files.
+
+      "Object" form shall mean any form resulting from mechanical
+      transformation or translation of a Source form, including but
+      not limited to compiled object code, generated documentation,
+      and conversions to other media types.
+
+      "Work" shall mean the work of authorship, whether in Source or
+      Object form, made available under the License, as indicated by a
+      copyright notice that is included in or attached to the work
+      (an example is provided in the Appendix below).
+
+      "Derivative Works" shall mean any work, whether in Source or Object
+      form, that is based on (or derived from) the Work and for which the
+      editorial revisions, annotations, elaborations, or other modifications
+      represent, as a whole, an original work of authorship. For the purposes
+      of this License, Derivative Works shall not include works that remain
+      separable from, or merely link (or bind by name) to the interfaces of,
+      the Work and Derivative Works thereof.
+
+      "Contribution" shall mean any work of authorship, including
+      the original version of the Work and any modifications or additions
+      to that Work or Derivative Works thereof, that is intentionally
+      submitted to Licensor for inclusion in the Work by the copyright owner
+      or by an individual or Legal Entity authorized to submit on behalf of
+      the copyright owner. For the purposes of this definition, "submitted"
+      means any form of electronic, verbal, or written communication sent
+      to the Licensor or its representatives, including but not limited to
+      communication on electronic mailing lists, source code control systems,
+      and issue tracking systems that are managed by, or on behalf of, the
+      Licensor for the purpose of discussing and improving the Work, but
+      excluding communication that is conspicuously marked or otherwise
+      designated in writing by the copyright owner as "Not a Contribution."
+
+      "Contributor" shall mean Licensor and any individual or Legal Entity
+      on behalf of whom a Contribution has been received by Licensor and
+      subsequently incorporated within the Work.
+
+   2. Grant of Copyright License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      copyright license to reproduce, prepare Derivative Works of,
+      publicly display, publicly perform, sublicense, and distribute the
+      Work and such Derivative Works in Source or Object form.
+
+   3. Grant of Patent License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      (except as stated in this section) patent license to make, have made,
+      use, offer to sell, sell, import, and otherwise transfer the Work,
+      where such license applies only to those patent claims licensable
+      by such Contributor that are necessarily infringed by their
+      Contribution(s) alone or by combination of their Contribution(s)
+      with the Work to which such Contribution(s) was submitted. If You
+      institute patent litigation against any entity (including a
+      cross-claim or counterclaim in a lawsuit) alleging that the Work
+      or a Contribution incorporated within the Work constitutes direct
+      or contributory patent infringement, then any patent licenses
+      granted to You under this License for that Work shall terminate
+      as of the date such litigation is filed.
+
+   4. Redistribution. You may reproduce and distribute copies of the
+      Work or Derivative Works thereof in any medium, with or without
+      modifications, and in Source or Object form, provided that You
+      meet the following conditions:
+
+      (a) You must give any other recipients of the Work or
+          Derivative Works a copy of this License; and
+
+      (b) You must cause any modified files to carry prominent notices
+          stating that You changed the files; and
+
+      (c) You must retain, in the Source form of any Derivative Works
+          that You distribute, all copyright, patent, trademark, and
+          attribution notices from the Source form of the Work,
+          excluding those notices that do not pertain to any part of
+          the Derivative Works; and
+
+      (d) If the Work includes a "NOTICE" text file as part of its
+          distribution, then any Derivative Works that You distribute must
+          include a readable copy of the attribution notices contained
+          within such NOTICE file, excluding those notices that do not
+          pertain to any part of the Derivative Works, in at least one
+          of the following places: within a NOTICE text file distributed
+          as part of the Derivative Works; within the Source form or
+          documentation, if provided along with the Derivative Works; or,
+          within a display generated by the Derivative Works, if and
+          wherever such third-party notices normally appear. The contents
+          of the NOTICE file are for informational purposes only and
+          do not modify the License. You may add Your own attribution
+          notices within Derivative Works that You distribute, alongside
+          or as an addendum to the NOTICE text from the Work, provided
+          that such additional attribution notices cannot be construed
+          as modifying the License.
+
+      You may add Your own copyright statement to Your modifications and
+      may provide additional or different license terms and conditions
+      for use, reproduction, or distribution of Your modifications, or
+      for any such Derivative Works as a whole, provided Your use,
+      reproduction, and distribution of the Work otherwise complies with
+      the conditions stated in this License.
+
+   5. Submission of Contributions. Unless You explicitly state otherwise,
+      any Contribution intentionally submitted for inclusion in the Work
+      by You to the Licensor shall be under the terms and conditions of
+      this License, without any additional terms or conditions.
+      Notwithstanding the above, nothing herein shall supersede or modify
+      the terms of any separate license agreement you may have executed
+      with Licensor regarding such Contributions.
+
+   6. Trademarks. This License does not grant permission to use the trade
+      names, trademarks, service marks, or product names of the Licensor,
+      except as required for reasonable and customary use in describing the
+      origin of the Work and reproducing the content of the NOTICE file.
+
+   7. Disclaimer of Warranty. Unless required by applicable law or
+      agreed to in writing, Licensor provides the Work (and each
+      Contributor provides its Contributions) on an "AS IS" BASIS,
+      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+      implied, including, without limitation, any warranties or conditions
+      of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+      PARTICULAR PURPOSE. You are solely responsible for determining the
+      appropriateness of using or redistributing the Work and assume any
+      risks associated with Your exercise of permissions under this License.
+
+   8. Limitation of Liability. In no event and under no legal theory,
+      whether in tort (including negligence), contract, or otherwise,
+      unless required by applicable law (such as deliberate and grossly
+      negligent acts) or agreed to in writing, shall any Contributor be
+      liable to You for damages, including any direct, indirect, special,
+      incidental, or consequential damages of any character arising as a
+      result of this License or out of the use or inability to use the
+      Work (including but not limited to damages for loss of goodwill,
+      work stoppage, computer failure or malfunction, or any and all
+      other commercial damages or losses), even if such Contributor
+      has been advised of the possibility of such damages.
+
+   9. Accepting Warranty or Additional Liability. While redistributing
+      the Work or Derivative Works thereof, You may choose to offer,
+      and charge a fee for, acceptance of support, warranty, indemnity,
+      or other liability obligations and/or rights consistent with this
+      License. However, in accepting such obligations, You may act only
+      on Your own behalf and on Your sole responsibility, not on behalf
+      of any other Contributor, and only if You agree to indemnify,
+      defend, and hold each Contributor harmless for any liability
+      incurred by, or claims asserted against, such Contributor by reason
+      of your accepting any such warranty or additional liability.
+
+   END OF TERMS AND CONDITIONS
+
+--------------------------------------------------------------------------------
+
+This product includes code from Apache Avro.
+
+Copyright: 2014 The Apache Software Foundation.
+Home page: https://avro.apache.org/
+License: http://www.apache.org/licenses/LICENSE-2.0
+

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b24a1d1/parquet-avro/src/main/resources/META-INF/NOTICE
----------------------------------------------------------------------
diff --git a/parquet-avro/src/main/resources/META-INF/NOTICE b/parquet-avro/src/main/resources/META-INF/NOTICE
new file mode 100644
index 0000000..7b5682c
--- /dev/null
+++ b/parquet-avro/src/main/resources/META-INF/NOTICE
@@ -0,0 +1,18 @@
+
+Apache Parquet MR (Incubating)
+Copyright 2014-2015 The Apache Software Foundation
+
+This product includes software developed at
+The Apache Software Foundation (http://www.apache.org/).
+
+--------------------------------------------------------------------------------
+
+This product includes code from Apache Avro, which includes the following in
+its NOTICE file:
+
+  Apache Avro
+  Copyright 2010-2015 The Apache Software Foundation
+
+  This product includes software developed at
+  The Apache Software Foundation (http://www.apache.org/).
+

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b24a1d1/parquet-avro/src/test/java/org/apache/parquet/avro/AvroTestUtil.java
----------------------------------------------------------------------
diff --git a/parquet-avro/src/test/java/org/apache/parquet/avro/AvroTestUtil.java b/parquet-avro/src/test/java/org/apache/parquet/avro/AvroTestUtil.java
index d5fe11a..f4682d6 100644
--- a/parquet-avro/src/test/java/org/apache/parquet/avro/AvroTestUtil.java
+++ b/parquet-avro/src/test/java/org/apache/parquet/avro/AvroTestUtil.java
@@ -19,11 +19,21 @@
 package org.apache.parquet.avro;
 
 import com.google.common.collect.Lists;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.List;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.hadoop.ParquetReader;
+import org.apache.parquet.hadoop.ParquetWriter;
 import org.codehaus.jackson.node.NullNode;
+import org.junit.Assert;
+import org.junit.rules.TemporaryFolder;
 
 public class AvroTestUtil {
 
@@ -66,4 +76,47 @@ public class AvroTestUtil {
     return record;
   }
 
+  public static <D> List<D> read(GenericData model, Schema schema, File file) throws IOException {
+    List<D> data = new ArrayList<D>();
+    Configuration conf = new Configuration(false);
+    AvroReadSupport.setRequestedProjection(conf, schema);
+    AvroReadSupport.setAvroReadSchema(conf, schema);
+    ParquetReader<D> fileReader = AvroParquetReader
+        .<D>builder(new Path(file.toString()))
+        .withDataModel(model) // reflect disables compatibility
+        .withConf(conf)
+        .build();
+
+    try {
+      D datum;
+      while ((datum = fileReader.read()) != null) {
+        data.add(datum);
+      }
+    } finally {
+      fileReader.close();
+    }
+
+    return data;
+  }
+
+  @SuppressWarnings("unchecked")
+  public static <D> File write(TemporaryFolder temp, GenericData model, Schema schema, D... data) throws IOException {
+    File file = temp.newFile();
+    Assert.assertTrue(file.delete());
+    ParquetWriter<D> writer = AvroParquetWriter
+        .<D>builder(new Path(file.toString()))
+        .withDataModel(model)
+        .withSchema(schema)
+        .build();
+
+    try {
+      for (D datum : data) {
+        writer.write(datum);
+      }
+    } finally {
+      writer.close();
+    }
+
+    return file;
+  }
 }

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b24a1d1/parquet-avro/src/test/java/org/apache/parquet/avro/TestAvroSchemaConverter.java
----------------------------------------------------------------------
diff --git a/parquet-avro/src/test/java/org/apache/parquet/avro/TestAvroSchemaConverter.java b/parquet-avro/src/test/java/org/apache/parquet/avro/TestAvroSchemaConverter.java
index b393615..942e3b1 100644
--- a/parquet-avro/src/test/java/org/apache/parquet/avro/TestAvroSchemaConverter.java
+++ b/parquet-avro/src/test/java/org/apache/parquet/avro/TestAvroSchemaConverter.java
@@ -20,16 +20,37 @@ package org.apache.parquet.avro;
 
 import com.google.common.collect.Lists;
 import com.google.common.io.Resources;
-import java.util.Arrays;
-import java.util.Collections;
+import org.apache.avro.LogicalTypes;
 import org.apache.avro.Schema;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.MessageTypeParser;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
+import org.apache.parquet.schema.Types;
 import org.codehaus.jackson.node.NullNode;
+import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
-import org.apache.parquet.schema.MessageType;
-import org.apache.parquet.schema.MessageTypeParser;
+import java.util.Arrays;
+import java.util.Collections;
 
+import static org.apache.avro.Schema.Type.INT;
+import static org.apache.avro.Schema.Type.LONG;
+import static org.apache.parquet.schema.OriginalType.DATE;
+import static org.apache.parquet.schema.OriginalType.TIMESTAMP_MICROS;
+import static org.apache.parquet.schema.OriginalType.TIMESTAMP_MILLIS;
+import static org.apache.parquet.schema.OriginalType.TIME_MICROS;
+import static org.apache.parquet.schema.OriginalType.TIME_MILLIS;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BOOLEAN;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.DOUBLE;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FLOAT;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT96;
+import static org.apache.parquet.schema.Type.Repetition.REQUIRED;
 import static org.junit.Assert.assertEquals;
 
 public class TestAvroSchemaConverter {
@@ -131,7 +152,7 @@ public class TestAvroSchemaConverter {
 
   @Test(expected = IllegalArgumentException.class)
   public void testTopLevelMustBeARecord() {
-    new AvroSchemaConverter().convert(Schema.create(Schema.Type.INT));
+    new AvroSchemaConverter().convert(Schema.create(INT));
   }
 
   @Test
@@ -270,7 +291,7 @@ public class TestAvroSchemaConverter {
   @Test
   public void testOptionalFields() throws Exception {
     Schema schema = Schema.createRecord("record1", null, null, false);
-    Schema optionalInt = optional(Schema.create(Schema.Type.INT));
+    Schema optionalInt = optional(Schema.create(INT));
     schema.setFields(Arrays.asList(
         new Schema.Field("myint", optionalInt, null, NullNode.getInstance())
     ));
@@ -284,7 +305,7 @@ public class TestAvroSchemaConverter {
   @Test
   public void testOptionalMapValue() throws Exception {
     Schema schema = Schema.createRecord("record1", null, null, false);
-    Schema optionalIntMap = Schema.createMap(optional(Schema.create(Schema.Type.INT)));
+    Schema optionalIntMap = Schema.createMap(optional(Schema.create(INT)));
     schema.setFields(Arrays.asList(
         new Schema.Field("myintmap", optionalIntMap, null, null)
     ));
@@ -303,7 +324,7 @@ public class TestAvroSchemaConverter {
   @Test
   public void testOptionalArrayElement() throws Exception {
     Schema schema = Schema.createRecord("record1", null, null, false);
-    Schema optionalIntArray = Schema.createArray(optional(Schema.create(Schema.Type.INT)));
+    Schema optionalIntArray = Schema.createArray(optional(Schema.create(INT)));
     schema.setFields(Arrays.asList(
         new Schema.Field("myintarray", optionalIntArray, null, null)
     ));
@@ -323,7 +344,7 @@ public class TestAvroSchemaConverter {
     Schema schema = Schema.createRecord("record2", null, null, false);
     Schema multipleTypes = Schema.createUnion(Arrays.asList(Schema.create(Schema.Type
             .NULL),
-        Schema.create(Schema.Type.INT),
+        Schema.create(INT),
         Schema.create(Schema.Type.FLOAT)));
     schema.setFields(Arrays.asList(
         new Schema.Field("myunion", multipleTypes, null, NullNode.getInstance())));
@@ -396,7 +417,7 @@ public class TestAvroSchemaConverter {
   @Test
   public void testOldAvroListOfLists() throws Exception {
     Schema listOfLists = optional(Schema.createArray(Schema.createArray(
-        Schema.create(Schema.Type.INT))));
+        Schema.create(INT))));
     Schema schema = Schema.createRecord("AvroCompatListInList", null, null, false);
     schema.setFields(Lists.newArrayList(
         new Schema.Field("listOfLists", listOfLists, null, NullNode.getInstance())
@@ -425,7 +446,7 @@ public class TestAvroSchemaConverter {
   @Test
   public void testOldThriftListOfLists() throws Exception {
     Schema listOfLists = optional(Schema.createArray(Schema.createArray(
-        Schema.create(Schema.Type.INT))));
+        Schema.create(INT))));
     Schema schema = Schema.createRecord("ThriftCompatListInList", null, null, false);
     schema.setFields(Lists.newArrayList(
         new Schema.Field("listOfLists", listOfLists, null, NullNode.getInstance())
@@ -458,7 +479,7 @@ public class TestAvroSchemaConverter {
     // group's name, but it must be 2-level because the repeated group doesn't
     // contain an optional or repeated element as required for 3-level lists
     Schema listOfLists = optional(Schema.createArray(Schema.createArray(
-        Schema.create(Schema.Type.INT))));
+        Schema.create(INT))));
     Schema schema = Schema.createRecord("UnknownTwoLevelListInList", null, null, false);
     schema.setFields(Lists.newArrayList(
         new Schema.Field("listOfLists", listOfLists, null, NullNode.getInstance())
@@ -488,7 +509,7 @@ public class TestAvroSchemaConverter {
   @Test
   public void testParquetMapWithoutMapKeyValueAnnotation() throws Exception {
     Schema schema = Schema.createRecord("myrecord", null, null, false);
-    Schema map = Schema.createMap(Schema.create(Schema.Type.INT));
+    Schema map = Schema.createMap(Schema.create(INT));
     schema.setFields(Collections.singletonList(new Schema.Field("mymap", map, null, null)));
     String parquetSchema =
         "message myrecord {\n" +
@@ -504,9 +525,240 @@ public class TestAvroSchemaConverter {
     testParquetToAvroConversion(NEW_BEHAVIOR, schema, parquetSchema);
   }
 
+  @Test
+  public void testDecimalBytesType() throws Exception {
+    Schema schema = Schema.createRecord("myrecord", null, null, false);
+    Schema decimal = LogicalTypes.decimal(9, 2).addToSchema(
+        Schema.create(Schema.Type.BYTES));
+    schema.setFields(Collections.singletonList(
+        new Schema.Field("dec", decimal, null, null)));
+
+    testRoundTripConversion(schema,
+        "message myrecord {\n" +
+            "  required binary dec (DECIMAL(9,2));\n" +
+            "}\n");
+  }
+
+  @Test
+  public void testDecimalFixedType() throws Exception {
+    Schema schema = Schema.createRecord("myrecord", null, null, false);
+    Schema decimal = LogicalTypes.decimal(9, 2).addToSchema(
+        Schema.createFixed("dec", null, null, 8));
+    schema.setFields(Collections.singletonList(
+        new Schema.Field("dec", decimal, null, null)));
+
+    testRoundTripConversion(schema,
+        "message myrecord {\n" +
+            "  required fixed_len_byte_array(8) dec (DECIMAL(9,2));\n" +
+            "}\n");
+  }
+
+  @Test
+  public void testDecimalIntegerType() throws Exception {
+    Schema expected = Schema.createRecord("myrecord", null, null, false,
+        Arrays.asList(new Schema.Field(
+            "dec", Schema.create(INT), null, null)));
+
+    // the decimal portion is lost because it isn't valid in Avro
+    testParquetToAvroConversion(expected,
+        "message myrecord {\n" +
+            "  required int32 dec (DECIMAL(9,2));\n" +
+            "}\n");
+  }
+
+  @Test
+  public void testDecimalLongType() throws Exception {
+    Schema expected = Schema.createRecord("myrecord", null, null, false,
+        Arrays.asList(new Schema.Field("dec", Schema.create(LONG), null, null)));
+
+    // the decimal portion is lost because it isn't valid in Avro
+    testParquetToAvroConversion(expected,
+        "message myrecord {\n" +
+            "  required int64 dec (DECIMAL(9,2));\n" +
+            "}\n");
+  }
+
+  @Test
+  public void testDateType() throws Exception {
+    Schema date = LogicalTypes.date().addToSchema(Schema.create(INT));
+    Schema expected = Schema.createRecord("myrecord", null, null, false,
+        Arrays.asList(new Schema.Field("date", date, null, null)));
+
+    testRoundTripConversion(expected,
+        "message myrecord {\n" +
+            "  required int32 date (DATE);\n" +
+            "}\n");
+
+    for (PrimitiveTypeName primitive : new PrimitiveTypeName[]
+        {INT64, INT96, FLOAT, DOUBLE, BOOLEAN, BINARY, FIXED_LEN_BYTE_ARRAY}) {
+      final PrimitiveType type;
+      if (primitive == FIXED_LEN_BYTE_ARRAY) {
+        type = new PrimitiveType(REQUIRED, primitive, 12, "test", DATE);
+      } else {
+        type = new PrimitiveType(REQUIRED, primitive, "test", DATE);
+      }
+
+      assertThrows("Should not allow TIME_MICROS with " + primitive,
+          IllegalArgumentException.class, new Runnable() {
+            @Override
+            public void run() {
+              new AvroSchemaConverter().convert(message(type));
+            }
+          });
+    }
+  }
+
+  @Test
+  public void testTimeMillisType() throws Exception {
+    Schema date = LogicalTypes.timeMillis().addToSchema(Schema.create(INT));
+    Schema expected = Schema.createRecord("myrecord", null, null, false,
+        Arrays.asList(new Schema.Field("time", date, null, null)));
+
+    testRoundTripConversion(expected,
+        "message myrecord {\n" +
+            "  required int32 time (TIME_MILLIS);\n" +
+            "}\n");
+
+    for (PrimitiveTypeName primitive : new PrimitiveTypeName[]
+        {INT64, INT96, FLOAT, DOUBLE, BOOLEAN, BINARY, FIXED_LEN_BYTE_ARRAY}) {
+      final PrimitiveType type;
+      if (primitive == FIXED_LEN_BYTE_ARRAY) {
+        type = new PrimitiveType(REQUIRED, primitive, 12, "test", TIME_MILLIS);
+      } else {
+        type = new PrimitiveType(REQUIRED, primitive, "test", TIME_MILLIS);
+      }
+
+      assertThrows("Should not allow TIME_MICROS with " + primitive,
+          IllegalArgumentException.class, new Runnable() {
+            @Override
+            public void run() {
+              new AvroSchemaConverter().convert(message(type));
+            }
+          });
+    }
+  }
+
+  @Test
+  public void testTimeMicrosType() throws Exception {
+    Schema date = LogicalTypes.timeMicros().addToSchema(Schema.create(LONG));
+    Schema expected = Schema.createRecord("myrecord", null, null, false,
+        Arrays.asList(new Schema.Field("time", date, null, null)));
+
+    testRoundTripConversion(expected,
+        "message myrecord {\n" +
+            "  required int64 time (TIME_MICROS);\n" +
+            "}\n");
+
+    for (PrimitiveTypeName primitive : new PrimitiveTypeName[]
+        {INT32, INT96, FLOAT, DOUBLE, BOOLEAN, BINARY, FIXED_LEN_BYTE_ARRAY}) {
+      final PrimitiveType type;
+      if (primitive == FIXED_LEN_BYTE_ARRAY) {
+        type = new PrimitiveType(REQUIRED, primitive, 12, "test", TIME_MICROS);
+      } else {
+        type = new PrimitiveType(REQUIRED, primitive, "test", TIME_MICROS);
+      }
+
+      assertThrows("Should not allow TIME_MICROS with " + primitive,
+          IllegalArgumentException.class, new Runnable() {
+            @Override
+            public void run() {
+              new AvroSchemaConverter().convert(message(type));
+            }
+          });
+    }
+  }
+
+  @Test
+  public void testTimestampMillisType() throws Exception {
+    Schema date = LogicalTypes.timestampMillis().addToSchema(Schema.create(LONG));
+    Schema expected = Schema.createRecord("myrecord", null, null, false,
+        Arrays.asList(new Schema.Field("timestamp", date, null, null)));
+
+    testRoundTripConversion(expected,
+        "message myrecord {\n" +
+            "  required int64 timestamp (TIMESTAMP_MILLIS);\n" +
+            "}\n");
+
+    for (PrimitiveTypeName primitive : new PrimitiveTypeName[]
+        {INT32, INT96, FLOAT, DOUBLE, BOOLEAN, BINARY, FIXED_LEN_BYTE_ARRAY}) {
+      final PrimitiveType type;
+      if (primitive == FIXED_LEN_BYTE_ARRAY) {
+        type = new PrimitiveType(REQUIRED, primitive, 12, "test", TIMESTAMP_MILLIS);
+      } else {
+        type = new PrimitiveType(REQUIRED, primitive, "test", TIMESTAMP_MILLIS);
+      }
+
+      assertThrows("Should not allow TIMESTAMP_MILLIS with " + primitive,
+          IllegalArgumentException.class, new Runnable() {
+            @Override
+            public void run() {
+              new AvroSchemaConverter().convert(message(type));
+            }
+          });
+    }
+  }
+
+  @Test
+  public void testTimestampMicrosType() throws Exception {
+    Schema date = LogicalTypes.timestampMicros().addToSchema(Schema.create(LONG));
+    Schema expected = Schema.createRecord("myrecord", null, null, false,
+        Arrays.asList(new Schema.Field("timestamp", date, null, null)));
+
+    testRoundTripConversion(expected,
+        "message myrecord {\n" +
+            "  required int64 timestamp (TIMESTAMP_MICROS);\n" +
+            "}\n");
+
+    for (PrimitiveTypeName primitive : new PrimitiveTypeName[]
+        {INT32, INT96, FLOAT, DOUBLE, BOOLEAN, BINARY, FIXED_LEN_BYTE_ARRAY}) {
+      final PrimitiveType type;
+      if (primitive == FIXED_LEN_BYTE_ARRAY) {
+        type = new PrimitiveType(REQUIRED, primitive, 12, "test", TIMESTAMP_MICROS);
+      } else {
+        type = new PrimitiveType(REQUIRED, primitive, "test", TIMESTAMP_MICROS);
+      }
+
+      assertThrows("Should not allow TIMESTAMP_MICROS with " + primitive,
+          IllegalArgumentException.class, new Runnable() {
+            @Override
+            public void run() {
+              new AvroSchemaConverter().convert(message(type));
+            }
+          });
+    }
+  }
+
   public static Schema optional(Schema original) {
     return Schema.createUnion(Lists.newArrayList(
         Schema.create(Schema.Type.NULL),
         original));
   }
+
+  public static MessageType message(PrimitiveType primitive) {
+    return Types.buildMessage()
+        .addField(primitive)
+        .named("myrecord");
+  }
+
+  /**
+   * A convenience method to avoid a large number of @Test(expected=...) tests
+   * @param message A String message to describe this assertion
+   * @param expected An Exception class that the Runnable should throw
+   * @param runnable A Runnable that is expected to throw the exception
+   */
+  public static void assertThrows(
+      String message, Class<? extends Exception> expected, Runnable runnable) {
+    try {
+      runnable.run();
+      Assert.fail("No exception was thrown (" + message + "), expected: " +
+          expected.getName());
+    } catch (Exception actual) {
+      try {
+        Assert.assertEquals(message, expected, actual.getClass());
+      } catch (AssertionError e) {
+        e.addSuppressed(actual);
+        throw e;
+      }
+    }
+  }
 }