You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by bl...@apache.org on 2016/04/20 17:41:29 UTC
[1/2] parquet-mr git commit: PARQUET-358: Add support for Avro's
logical types API.
Repository: parquet-mr
Updated Branches:
refs/heads/master 82b8ecc32 -> 6b24a1d1b
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b24a1d1/parquet-avro/src/test/java/org/apache/parquet/avro/TestCircularReferences.java
----------------------------------------------------------------------
diff --git a/parquet-avro/src/test/java/org/apache/parquet/avro/TestCircularReferences.java b/parquet-avro/src/test/java/org/apache/parquet/avro/TestCircularReferences.java
new file mode 100644
index 0000000..d2f80ed
--- /dev/null
+++ b/parquet-avro/src/test/java/org/apache/parquet/avro/TestCircularReferences.java
@@ -0,0 +1,383 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.parquet.avro;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.IdentityHashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.avro.Conversion;
+import org.apache.avro.LogicalType;
+import org.apache.avro.LogicalTypes;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericData.Record;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.avro.util.Utf8;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+/**
+ * This class is based on org.apache.avro.TestCircularReferences
+ *
+ * The main difference between this class and the Avro version is that this one
+ * uses a place-holder schema for the circular reference from Child to Parent.
+ * This avoids creating a schema for Parent that references itself and can't be
+ * converted to a Parquet schema. The place-holder schema must also have a
+ * referenceable logical type.
+ */
+public class TestCircularReferences {
+
+ @Rule
+ public TemporaryFolder temp = new TemporaryFolder();
+
+ public static class Reference extends LogicalType {
+ private static final String REFERENCE = "reference";
+ private static final String REF_FIELD_NAME = "ref-field-name";
+
+ private final String refFieldName;
+
+ public Reference(String refFieldName) {
+ super(REFERENCE);
+ this.refFieldName = refFieldName;
+ }
+
+ public Reference(Schema schema) {
+ super(REFERENCE);
+ this.refFieldName = schema.getProp(REF_FIELD_NAME);
+ }
+
+ @Override
+ public Schema addToSchema(Schema schema) {
+ super.addToSchema(schema);
+ schema.addProp(REF_FIELD_NAME, refFieldName);
+ return schema;
+ }
+
+ @Override
+ public String getName() {
+ return REFERENCE;
+ }
+
+ public String getRefFieldName() {
+ return refFieldName;
+ }
+
+ @Override
+ public void validate(Schema schema) {
+ super.validate(schema);
+ if (schema.getField(refFieldName) == null) {
+ throw new IllegalArgumentException("Invalid field name for reference field: " + refFieldName);
+ }
+ }
+ }
+
+ public static class Referenceable extends LogicalType {
+ private static final String REFERENCEABLE = "referenceable";
+ private static final String ID_FIELD_NAME = "id-field-name";
+
+ private final String idFieldName;
+
+ public Referenceable(String idFieldName) {
+ super(REFERENCEABLE);
+ this.idFieldName = idFieldName;
+ }
+
+ public Referenceable(Schema schema) {
+ super(REFERENCEABLE);
+ this.idFieldName = schema.getProp(ID_FIELD_NAME);
+ }
+
+ @Override
+ public Schema addToSchema(Schema schema) {
+ super.addToSchema(schema);
+ schema.addProp(ID_FIELD_NAME, idFieldName);
+ return schema;
+ }
+
+ @Override
+ public String getName() {
+ return REFERENCEABLE;
+ }
+
+ public String getIdFieldName() {
+ return idFieldName;
+ }
+
+ @Override
+ public void validate(Schema schema) {
+ super.validate(schema);
+ Schema.Field idField = schema.getField(idFieldName);
+ if (idField == null || idField.schema().getType() != Schema.Type.LONG) {
+ throw new IllegalArgumentException("Invalid ID field: " + idFieldName + ": " + idField);
+ }
+ }
+ }
+
+ @BeforeClass
+ public static void addReferenceTypes() {
+ LogicalTypes.register(Referenceable.REFERENCEABLE, new LogicalTypes.LogicalTypeFactory() {
+ @Override
+ public LogicalType fromSchema(Schema schema) {
+ return new Referenceable(schema);
+ }
+ });
+ LogicalTypes.register(Reference.REFERENCE, new LogicalTypes.LogicalTypeFactory() {
+ @Override
+ public LogicalType fromSchema(Schema schema) {
+ return new Reference(schema);
+ }
+ });
+ }
+
+ public static class ReferenceManager {
+ private interface Callback {
+ void set(Object referenceable);
+ }
+
+ private final Map<Long, Object> references = new HashMap<Long, Object>();
+ private final Map<Object, Long> ids = new IdentityHashMap<Object, Long>();
+ private final Map<Long, List<Callback>> callbacksById = new HashMap<Long, List<Callback>>();
+ private final ReferenceableTracker tracker = new ReferenceableTracker();
+ private final ReferenceHandler handler = new ReferenceHandler();
+
+ public ReferenceableTracker getTracker() {
+ return tracker;
+ }
+
+ public ReferenceHandler getHandler() {
+ return handler;
+ }
+
+ public class ReferenceableTracker extends Conversion<IndexedRecord> {
+ @Override
+ @SuppressWarnings("unchecked")
+ public Class<IndexedRecord> getConvertedType() {
+ return (Class) Record.class;
+ }
+
+ @Override
+ public String getLogicalTypeName() {
+ return Referenceable.REFERENCEABLE;
+ }
+
+ @Override
+ public IndexedRecord fromRecord(IndexedRecord value, Schema schema, LogicalType type) {
+ // read side
+ long id = getId(value, schema);
+
+ // keep track of this for later references
+ references.put(id, value);
+
+ // call any callbacks waiting to resolve this id
+ List<Callback> callbacks = callbacksById.get(id);
+ for (Callback callback : callbacks) {
+ callback.set(value);
+ }
+
+ return value;
+ }
+
+ @Override
+ public IndexedRecord toRecord(IndexedRecord value, Schema schema, LogicalType type) {
+ // write side
+ long id = getId(value, schema);
+
+ // keep track of this for later references
+ //references.put(id, value);
+ ids.put(value, id);
+
+ return value;
+ }
+
+ private long getId(IndexedRecord referenceable, Schema schema) {
+ Referenceable info = (Referenceable) schema.getLogicalType();
+ int idField = schema.getField(info.getIdFieldName()).pos();
+ return (Long) referenceable.get(idField);
+ }
+ }
+
+ public class ReferenceHandler extends Conversion<IndexedRecord> {
+ @Override
+ @SuppressWarnings("unchecked")
+ public Class<IndexedRecord> getConvertedType() {
+ return (Class) Record.class;
+ }
+
+ @Override
+ public String getLogicalTypeName() {
+ return Reference.REFERENCE;
+ }
+
+ @Override
+ public IndexedRecord fromRecord(final IndexedRecord record, Schema schema, LogicalType type) {
+ // read side: resolve the record or save a callback
+ final Schema.Field refField = schema.getField(((Reference) type).getRefFieldName());
+
+ Long id = (Long) record.get(refField.pos());
+ if (id != null) {
+ if (references.containsKey(id)) {
+ record.put(refField.pos(), references.get(id));
+
+ } else {
+ List<Callback> callbacks = callbacksById.get(id);
+ if (callbacks == null) {
+ callbacks = new ArrayList<Callback>();
+ callbacksById.put(id, callbacks);
+ }
+ // add a callback to resolve this reference when the id is available
+ callbacks.add(new Callback() {
+ @Override
+ public void set(Object referenceable) {
+ record.put(refField.pos(), referenceable);
+ }
+ });
+ }
+ }
+
+ return record;
+ }
+
+ @Override
+ public IndexedRecord toRecord(IndexedRecord record, Schema schema, LogicalType type) {
+ // write side: replace a referenced field with its id
+ Schema.Field refField = schema.getField(((Reference) type).getRefFieldName());
+ IndexedRecord referenced = (IndexedRecord) record.get(refField.pos());
+ if (referenced == null) {
+ return record;
+ }
+
+ // hijack the field to return the id instead of the ref
+ return new HijackingIndexedRecord(record, refField.pos(), ids.get(referenced));
+ }
+ }
+
+ private static class HijackingIndexedRecord implements IndexedRecord {
+ private final IndexedRecord wrapped;
+ private final int index;
+ private final Object data;
+
+ public HijackingIndexedRecord(IndexedRecord wrapped, int index, Object data) {
+ this.wrapped = wrapped;
+ this.index = index;
+ this.data = data;
+ }
+
+ @Override
+ public void put(int i, Object v) {
+ throw new RuntimeException("[BUG] This is a read-only class.");
+ }
+
+ @Override
+ public Object get(int i) {
+ if (i == index) {
+ return data;
+ }
+ return wrapped.get(i);
+ }
+
+ @Override
+ public Schema getSchema() {
+ return wrapped.getSchema();
+ }
+ }
+ }
+
+ @Test
+ public void test() throws IOException {
+ ReferenceManager manager = new ReferenceManager();
+ GenericData model = new GenericData();
+ model.addLogicalTypeConversion(manager.getTracker());
+ model.addLogicalTypeConversion(manager.getHandler());
+
+ Schema parentSchema = Schema.createRecord("Parent", null, null, false);
+
+ Schema placeholderSchema = Schema.createRecord("Placeholder", null, null, false);
+ List<Schema.Field> placeholderFields = new ArrayList<Schema.Field>();
+ placeholderFields.add( // at least one field is needed to be a valid schema
+ new Schema.Field("id", Schema.create(Schema.Type.LONG), null, null));
+ placeholderSchema.setFields(placeholderFields);
+
+ Referenceable idRef = new Referenceable("id");
+
+ Schema parentRefSchema = Schema.createUnion(
+ Schema.create(Schema.Type.NULL),
+ Schema.create(Schema.Type.LONG),
+ idRef.addToSchema(placeholderSchema));
+
+ Reference parentRef = new Reference("parent");
+
+ List<Schema.Field> childFields = new ArrayList<Schema.Field>();
+ childFields.add(new Schema.Field("c", Schema.create(Schema.Type.STRING), null, null));
+ childFields.add(new Schema.Field("parent", parentRefSchema, null, null));
+ Schema childSchema = parentRef.addToSchema(
+ Schema.createRecord("Child", null, null, false, childFields));
+
+ List<Schema.Field> parentFields = new ArrayList<Schema.Field>();
+ parentFields.add(new Schema.Field("id", Schema.create(Schema.Type.LONG), null, null));
+ parentFields.add(new Schema.Field("p", Schema.create(Schema.Type.STRING), null, null));
+ parentFields.add(new Schema.Field("child", childSchema, null, null));
+ parentSchema.setFields(parentFields);
+
+ Schema schema = idRef.addToSchema(parentSchema);
+
+ System.out.println("Schema: " + schema.toString(true));
+
+ Record parent = new Record(schema);
+ parent.put("id", 1L);
+ parent.put("p", "parent data!");
+
+ Record child = new Record(childSchema);
+ child.put("c", "child data!");
+ child.put("parent", parent);
+
+ parent.put("child", child);
+
+ // serialization round trip
+ File data = AvroTestUtil.write(temp, model, schema, parent);
+ List<Record> records = AvroTestUtil.read(model, schema, data);
+
+ Record actual = records.get(0);
+
+ // because the record is a recursive structure, equals won't work
+ Assert.assertEquals("Should correctly read back the parent id",
+ 1L, actual.get("id"));
+ Assert.assertEquals("Should correctly read back the parent data",
+ new Utf8("parent data!"), actual.get("p"));
+
+ Record actualChild = (Record) actual.get("child");
+ Assert.assertEquals("Should correctly read back the child data",
+ new Utf8("child data!"), actualChild.get("c"));
+ Object childParent = actualChild.get("parent");
+ Assert.assertTrue("Should have a parent Record object",
+ childParent instanceof Record);
+
+ Record childParentRecord = (Record) actualChild.get("parent");
+ Assert.assertEquals("Should have the right parent id",
+ 1L, childParentRecord.get("id"));
+ Assert.assertEquals("Should have the right parent data",
+ new Utf8("parent data!"), childParentRecord.get("p"));
+ }
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b24a1d1/parquet-avro/src/test/java/org/apache/parquet/avro/TestGenericLogicalTypes.java
----------------------------------------------------------------------
diff --git a/parquet-avro/src/test/java/org/apache/parquet/avro/TestGenericLogicalTypes.java b/parquet-avro/src/test/java/org/apache/parquet/avro/TestGenericLogicalTypes.java
new file mode 100644
index 0000000..6809fff
--- /dev/null
+++ b/parquet-avro/src/test/java/org/apache/parquet/avro/TestGenericLogicalTypes.java
@@ -0,0 +1,271 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.avro;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.avro.Conversion;
+import org.apache.avro.Conversions;
+import org.apache.avro.LogicalType;
+import org.apache.avro.LogicalTypes;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.util.Utf8;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import java.io.File;
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.UUID;
+
+import static org.apache.avro.Schema.Type.STRING;
+import static org.apache.parquet.avro.AvroTestUtil.field;
+import static org.apache.parquet.avro.AvroTestUtil.instance;
+import static org.apache.parquet.avro.AvroTestUtil.optionalField;
+import static org.apache.parquet.avro.AvroTestUtil.read;
+import static org.apache.parquet.avro.AvroTestUtil.record;
+
+/**
+ * This class is based on org.apache.avro.generic.TestGenericLogicalTypes
+ */
+public class TestGenericLogicalTypes {
+
+ @Rule
+ public TemporaryFolder temp = new TemporaryFolder();
+
+ public static final GenericData GENERIC = new GenericData();
+ public static final LogicalType DECIMAL_9_2 = LogicalTypes.decimal(9, 2);
+ public static final BigDecimal D1 = new BigDecimal("-34.34");
+ public static final BigDecimal D2 = new BigDecimal("117230.00");
+
+
+ @BeforeClass
+ public static void addDecimalAndUUID() {
+ GENERIC.addLogicalTypeConversion(new Conversions.DecimalConversion());
+ GENERIC.addLogicalTypeConversion(new Conversions.UUIDConversion());
+ }
+
+ private <T> List<T> getFieldValues(Collection<GenericRecord> records, String field,
+ Class<T> expectedClass) {
+ List<T> values = new ArrayList<T>();
+ for (GenericRecord record : records) {
+ values.add(expectedClass.cast(record.get(field)));
+ }
+ return values;
+ }
+
+ @Test
+ public void testReadUUID() throws IOException {
+ Schema uuidSchema = record("R",
+ field("uuid", LogicalTypes.uuid().addToSchema(Schema.create(STRING))));
+ GenericRecord u1 = instance(uuidSchema, "uuid", UUID.randomUUID());
+ GenericRecord u2 = instance(uuidSchema, "uuid", UUID.randomUUID());
+
+ Schema stringSchema = record("R", field("uuid", Schema.create(STRING)));
+ GenericRecord s1 = instance(stringSchema, "uuid", u1.get("uuid").toString());
+ GenericRecord s2 = instance(stringSchema, "uuid", u2.get("uuid").toString());
+
+ File test = write(stringSchema, s1, s2);
+ Assert.assertEquals("Should convert Strings to UUIDs",
+ Arrays.asList(u1, u2), read(GENERIC, uuidSchema, test));
+ }
+
+ @Test
+ public void testWriteUUIDReadStringSchema() throws IOException {
+ Schema uuidSchema = record("R",
+ field("uuid", LogicalTypes.uuid().addToSchema(Schema.create(STRING))));
+ GenericRecord u1 = instance(uuidSchema, "uuid", UUID.randomUUID());
+ GenericRecord u2 = instance(uuidSchema, "uuid", UUID.randomUUID());
+
+ Schema stringUuidSchema = Schema.create(STRING);
+ stringUuidSchema.addProp(GenericData.STRING_PROP, "String");
+ Schema stringSchema = record("R", field("uuid", stringUuidSchema));
+ GenericRecord s1 = instance(stringSchema, "uuid", u1.get("uuid").toString());
+ GenericRecord s2 = instance(stringSchema, "uuid", u2.get("uuid").toString());
+
+ File test = write(GENERIC, uuidSchema, u1, u2);
+ Assert.assertEquals("Should read UUIDs as Strings",
+ Arrays.asList(s1, s2), read(GENERIC, stringSchema, test));
+ }
+
+ @Test
+ public void testWriteUUIDReadStringMissingLogicalType() throws IOException {
+ Schema uuidSchema = record("R",
+ field("uuid", LogicalTypes.uuid().addToSchema(Schema.create(STRING))));
+ GenericRecord u1 = instance(uuidSchema, "uuid", UUID.randomUUID());
+ GenericRecord u2 = instance(uuidSchema, "uuid", UUID.randomUUID());
+
+ GenericRecord s1 = instance(uuidSchema, "uuid", new Utf8(u1.get("uuid").toString()));
+ GenericRecord s2 = instance(uuidSchema, "uuid", new Utf8(u2.get("uuid").toString()));
+
+ File test = write(GENERIC, uuidSchema, u1, u2);
+ Assert.assertEquals("Should read UUIDs as Strings",
+ Arrays.asList(s1, s2), read(GenericData.get(), uuidSchema, test));
+ }
+
+ @Test
+ public void testWriteNullableUUID() throws IOException {
+ Schema nullableUuidSchema = record("R",
+ optionalField("uuid", LogicalTypes.uuid().addToSchema(Schema.create(STRING))));
+ GenericRecord u1 = instance(nullableUuidSchema, "uuid", UUID.randomUUID());
+ GenericRecord u2 = instance(nullableUuidSchema, "uuid", UUID.randomUUID());
+
+ Schema stringUuidSchema = Schema.create(STRING);
+ stringUuidSchema.addProp(GenericData.STRING_PROP, "String");
+ Schema nullableStringSchema = record("R", optionalField("uuid", stringUuidSchema));
+ GenericRecord s1 = instance(nullableStringSchema, "uuid", u1.get("uuid").toString());
+ GenericRecord s2 = instance(nullableStringSchema, "uuid", u2.get("uuid").toString());
+
+ File test = write(GENERIC, nullableUuidSchema, u1, u2);
+ Assert.assertEquals("Should read UUIDs as Strings",
+ Arrays.asList(s1, s2), read(GENERIC, nullableStringSchema, test));
+ }
+
+ @Test
+ public void testReadDecimalFixed() throws IOException {
+ Schema fixedSchema = Schema.createFixed("aFixed", null, null, 4);
+ Schema fixedRecord = record("R", field("dec", fixedSchema));
+ Schema decimalSchema = DECIMAL_9_2.addToSchema(
+ Schema.createFixed("aFixed", null, null, 4));
+ Schema decimalRecord = record("R", field("dec", decimalSchema));
+
+ GenericRecord r1 = instance(decimalRecord, "dec", D1);
+ GenericRecord r2 = instance(decimalRecord, "dec", D2);
+ List<GenericRecord> expected = Arrays.asList(r1, r2);
+
+ Conversion<BigDecimal> conversion = new Conversions.DecimalConversion();
+
+ // use the conversion directly instead of relying on the write side
+ GenericRecord r1fixed = instance(fixedRecord, "dec",
+ conversion.toFixed(D1, fixedSchema, DECIMAL_9_2));
+ GenericRecord r2fixed = instance(fixedRecord, "dec",
+ conversion.toFixed(D2, fixedSchema, DECIMAL_9_2));
+
+ File test = write(fixedRecord, r1fixed, r2fixed);
+ Assert.assertEquals("Should convert fixed to BigDecimals",
+ expected, read(GENERIC, decimalRecord, test));
+ }
+
+ @Test
+ public void testWriteDecimalFixed() throws IOException {
+ Schema fixedSchema = Schema.createFixed("aFixed", null, null, 4);
+ Schema fixedRecord = record("R", field("dec", fixedSchema));
+ Schema decimalSchema = DECIMAL_9_2.addToSchema(
+ Schema.createFixed("aFixed", null, null, 4));
+ Schema decimalRecord = record("R", field("dec", decimalSchema));
+
+ GenericRecord r1 = instance(decimalRecord, "dec", D1);
+ GenericRecord r2 = instance(decimalRecord, "dec", D2);
+
+ Conversion<BigDecimal> conversion = new Conversions.DecimalConversion();
+
+ // use the conversion directly instead of relying on the write side
+ GenericRecord r1fixed = instance(fixedRecord, "dec",
+ conversion.toFixed(D1, fixedSchema, DECIMAL_9_2));
+ GenericRecord r2fixed = instance(fixedRecord, "dec",
+ conversion.toFixed(D2, fixedSchema, DECIMAL_9_2));
+ List<GenericRecord> expected = Arrays.asList(r1fixed, r2fixed);
+
+ File test = write(GENERIC, decimalRecord, r1, r2);
+ Assert.assertEquals("Should read BigDecimals as fixed",
+ expected, read(GENERIC, fixedRecord, test));
+ }
+
+ @Test
+ public void testReadDecimalBytes() throws IOException {
+ Schema bytesSchema = Schema.create(Schema.Type.BYTES);
+ Schema bytesRecord = record("R", field("dec", bytesSchema));
+ Schema decimalSchema = DECIMAL_9_2.addToSchema(Schema.create(Schema.Type.BYTES));
+ Schema decimalRecord = record("R", field("dec", decimalSchema));
+
+ GenericRecord r1 = instance(decimalRecord, "dec", D1);
+ GenericRecord r2 = instance(decimalRecord, "dec", D2);
+ List<GenericRecord> expected = Arrays.asList(r1, r2);
+
+ Conversion<BigDecimal> conversion = new Conversions.DecimalConversion();
+
+ // use the conversion directly instead of relying on the write side
+ GenericRecord r1bytes = instance(bytesRecord, "dec",
+ conversion.toBytes(D1, bytesSchema, DECIMAL_9_2));
+ GenericRecord r2bytes = instance(bytesRecord, "dec",
+ conversion.toBytes(D2, bytesSchema, DECIMAL_9_2));
+
+ File test = write(bytesRecord, r1bytes, r2bytes);
+ Assert.assertEquals("Should convert bytes to BigDecimals",
+ expected, read(GENERIC, decimalRecord, test));
+ }
+
+ @Test
+ public void testWriteDecimalBytes() throws IOException {
+ Schema bytesSchema = Schema.create(Schema.Type.BYTES);
+ Schema bytesRecord = record("R", field("dec", bytesSchema));
+ Schema decimalSchema = DECIMAL_9_2.addToSchema(Schema.create(Schema.Type.BYTES));
+ Schema decimalRecord = record("R", field("dec", decimalSchema));
+
+ GenericRecord r1 = instance(decimalRecord, "dec", D1);
+ GenericRecord r2 = instance(decimalRecord, "dec", D2);
+
+ Conversion<BigDecimal> conversion = new Conversions.DecimalConversion();
+
+ // use the conversion directly instead of relying on the write side
+ GenericRecord r1bytes = instance(bytesRecord, "dec",
+ conversion.toBytes(D1, bytesSchema, DECIMAL_9_2));
+ GenericRecord r2bytes = instance(bytesRecord, "dec",
+ conversion.toBytes(D2, bytesSchema, DECIMAL_9_2));
+
+ List<GenericRecord> expected = Arrays.asList(r1bytes, r2bytes);
+
+ File test = write(GENERIC, decimalRecord, r1, r2);
+ Assert.assertEquals("Should read BigDecimals as bytes",
+ expected, read(GENERIC, bytesRecord, test));
+ }
+
+ private <D> File write(Schema schema, D... data) throws IOException {
+ return write(GenericData.get(), schema, data);
+ }
+
+ private <D> File write(GenericData model, Schema schema, D... data) throws IOException {
+ return AvroTestUtil.write(temp, model, schema, data);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b24a1d1/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWrite.java
----------------------------------------------------------------------
diff --git a/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWrite.java b/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWrite.java
index 855a5b1..4fa71ea 100644
--- a/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWrite.java
+++ b/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWrite.java
@@ -19,18 +19,23 @@
package org.apache.parquet.avro;
import com.google.common.base.Charsets;
-import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.io.Resources;
import java.io.File;
+import java.math.BigDecimal;
+import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Random;
+import org.apache.avro.Conversions;
+import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericFixed;
@@ -39,12 +44,16 @@ import org.apache.avro.generic.GenericRecordBuilder;
import org.apache.avro.util.Utf8;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
+import org.apache.parquet.hadoop.ParquetReader;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.api.WriteSupport;
import org.apache.parquet.io.api.Binary;
import org.apache.parquet.io.api.RecordConsumer;
import org.apache.parquet.schema.MessageTypeParser;
+import org.junit.Assert;
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@@ -225,6 +234,113 @@ public class TestReadWrite {
assertEquals(ImmutableMap.of(str("a"), 1, str("b"), 2), nextRecord.get("mymap"));
}
+ @Rule
+ public TemporaryFolder temp = new TemporaryFolder();
+
+ @Test
+ public void testDecimalValues() throws Exception {
+ Schema decimalSchema = Schema.createRecord("myrecord", null, null, false);
+ Schema decimal = LogicalTypes.decimal(9, 2).addToSchema(
+ Schema.create(Schema.Type.BYTES));
+ decimalSchema.setFields(Collections.singletonList(
+ new Schema.Field("dec", decimal, null, null)));
+
+ // add the decimal conversion to a generic data model
+ GenericData decimalSupport = new GenericData();
+ decimalSupport.addLogicalTypeConversion(new Conversions.DecimalConversion());
+
+ File file = temp.newFile("decimal.parquet");
+ file.delete();
+ Path path = new Path(file.toString());
+
+ ParquetWriter<GenericRecord> writer = AvroParquetWriter
+ .<GenericRecord>builder(path)
+ .withDataModel(decimalSupport)
+ .withSchema(decimalSchema)
+ .build();
+
+ Random random = new Random(34L);
+ GenericRecordBuilder builder = new GenericRecordBuilder(decimalSchema);
+ List<GenericRecord> expected = Lists.newArrayList();
+ for (int i = 0; i < 1000; i += 1) {
+ BigDecimal dec = new BigDecimal(new BigInteger(31, random), 2);
+ builder.set("dec", dec);
+
+ GenericRecord rec = builder.build();
+ expected.add(rec);
+ writer.write(builder.build());
+ }
+ writer.close();
+
+ ParquetReader<GenericRecord> reader = AvroParquetReader
+ .<GenericRecord>builder(path)
+ .withDataModel(decimalSupport)
+ .disableCompatibility()
+ .build();
+ List<GenericRecord> records = Lists.newArrayList();
+ GenericRecord rec;
+ while ((rec = reader.read()) != null) {
+ records.add(rec);
+ }
+ reader.close();
+
+ Assert.assertTrue("dec field should be a BigDecimal instance",
+ records.get(0).get("dec") instanceof BigDecimal);
+ Assert.assertEquals("Content should match", expected, records);
+ }
+
+ @Test
+ public void testFixedDecimalValues() throws Exception {
+ Schema decimalSchema = Schema.createRecord("myrecord", null, null, false);
+ Schema decimal = LogicalTypes.decimal(9, 2).addToSchema(
+ Schema.createFixed("dec", null, null, 4));
+ decimalSchema.setFields(Collections.singletonList(
+ new Schema.Field("dec", decimal, null, null)));
+
+ // add the decimal conversion to a generic data model
+ GenericData decimalSupport = new GenericData();
+ decimalSupport.addLogicalTypeConversion(new Conversions.DecimalConversion());
+
+ File file = temp.newFile("decimal.parquet");
+ file.delete();
+ Path path = new Path(file.toString());
+
+ ParquetWriter<GenericRecord> writer = AvroParquetWriter
+ .<GenericRecord>builder(path)
+ .withDataModel(decimalSupport)
+ .withSchema(decimalSchema)
+ .build();
+
+ Random random = new Random(34L);
+ GenericRecordBuilder builder = new GenericRecordBuilder(decimalSchema);
+ List<GenericRecord> expected = Lists.newArrayList();
+ for (int i = 0; i < 1000; i += 1) {
+ BigDecimal dec = new BigDecimal(new BigInteger(31, random), 2);
+ builder.set("dec", dec);
+
+ GenericRecord rec = builder.build();
+ expected.add(rec);
+ writer.write(builder.build());
+ }
+ writer.close();
+
+ ParquetReader<GenericRecord> reader = AvroParquetReader
+ .<GenericRecord>builder(path)
+ .withDataModel(decimalSupport)
+ .disableCompatibility()
+ .build();
+ List<GenericRecord> records = Lists.newArrayList();
+ GenericRecord rec;
+ while ((rec = reader.read()) != null) {
+ records.add(rec);
+ }
+ reader.close();
+
+ Assert.assertTrue("dec field should be a BigDecimal instance",
+ records.get(0).get("dec") instanceof BigDecimal);
+ Assert.assertEquals("Content should match", expected, records);
+ }
+
@Test
public void testAll() throws Exception {
Schema schema = new Schema.Parser().parse(
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b24a1d1/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWriteOldListBehavior.java
----------------------------------------------------------------------
diff --git a/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWriteOldListBehavior.java b/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWriteOldListBehavior.java
index 64caacc..af6f938 100644
--- a/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWriteOldListBehavior.java
+++ b/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWriteOldListBehavior.java
@@ -47,7 +47,6 @@ import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
-import sun.net.www.content.text.Generic;
import static org.apache.parquet.avro.AvroTestUtil.array;
import static org.apache.parquet.avro.AvroTestUtil.optional;
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b24a1d1/parquet-avro/src/test/java/org/apache/parquet/avro/TestReflectLogicalTypes.java
----------------------------------------------------------------------
diff --git a/parquet-avro/src/test/java/org/apache/parquet/avro/TestReflectLogicalTypes.java b/parquet-avro/src/test/java/org/apache/parquet/avro/TestReflectLogicalTypes.java
new file mode 100644
index 0000000..401e698
--- /dev/null
+++ b/parquet-avro/src/test/java/org/apache/parquet/avro/TestReflectLogicalTypes.java
@@ -0,0 +1,705 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.avro;
+
+import org.apache.avro.Conversion;
+import org.apache.avro.Conversions;
+import org.apache.avro.LogicalType;
+import org.apache.avro.LogicalTypes;
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaBuilder;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.avro.reflect.AvroSchema;
+import org.apache.avro.reflect.ReflectData;
+import org.apache.avro.specific.SpecificData;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import java.io.File;
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.UUID;
+
+import static org.apache.parquet.avro.AvroTestUtil.read;
+
+/**
+ * This class is based on org.apache.avro.reflect.TestReflectLogicalTypes
+ *
+ * Tests various logical types
+ * * string => UUID
+ * * fixed and bytes => Decimal
+ * * record => Pair
+ */
+public class TestReflectLogicalTypes {
+ @Rule
+ public TemporaryFolder temp = new TemporaryFolder();
+
+ public static final ReflectData REFLECT = new ReflectData();
+
+ @BeforeClass
+ public static void addUUID() {
+ REFLECT.addLogicalTypeConversion(new Conversions.UUIDConversion());
+ REFLECT.addLogicalTypeConversion(new Conversions.DecimalConversion());
+ }
+
+ @Test
+ public void testReflectedSchema() {
+ Schema expected = SchemaBuilder.record(RecordWithUUIDList.class.getName())
+ .fields()
+ .name("uuids").type().array().items().stringType().noDefault()
+ .endRecord();
+ expected.getField("uuids").schema().addProp(
+ SpecificData.CLASS_PROP, List.class.getName());
+ LogicalTypes.uuid().addToSchema(
+ expected.getField("uuids").schema().getElementType());
+
+ Schema actual = REFLECT.getSchema(RecordWithUUIDList.class);
+
+ Assert.assertEquals("Should use the UUID logical type", expected, actual);
+ }
+
+ // this can be static because the schema only comes from reflection
+ public static class DecimalRecordBytes {
+ // scale is required and will not be set by the conversion
+ @AvroSchema("{" +
+ "\"type\": \"bytes\"," +
+ "\"logicalType\": \"decimal\"," +
+ "\"precision\": 9," +
+ "\"scale\": 2" +
+ "}")
+ private BigDecimal decimal;
+
+ @Override
+ public boolean equals(Object other) {
+ if (this == other) {
+ return true;
+ }
+
+ if (other == null || getClass() != other.getClass()) {
+ return false;
+ }
+
+ DecimalRecordBytes that = (DecimalRecordBytes) other;
+ if (decimal == null) {
+ return (that.decimal == null);
+ }
+
+ return decimal.equals(that.decimal);
+ }
+
+ @Override
+ public int hashCode() {
+ return decimal != null ? decimal.hashCode() : 0;
+ }
+ }
+
+ @Test
+ public void testDecimalBytes() throws IOException {
+ Schema schema = REFLECT.getSchema(DecimalRecordBytes.class);
+ Assert.assertEquals("Should have the correct record name",
+ "org.apache.parquet.avro.TestReflectLogicalTypes$",
+ schema.getNamespace());
+ Assert.assertEquals("Should have the correct record name",
+ "DecimalRecordBytes",
+ schema.getName());
+ Assert.assertEquals("Should have the correct logical type",
+ LogicalTypes.decimal(9, 2),
+ LogicalTypes.fromSchema(schema.getField("decimal").schema()));
+
+ DecimalRecordBytes record = new DecimalRecordBytes();
+ record.decimal = new BigDecimal("3.14");
+
+ File test = write(REFLECT, schema, record);
+ Assert.assertEquals("Should match the decimal after round trip",
+ Arrays.asList(record),
+ read(REFLECT, schema, test));
+ }
+
+ // this can be static because the schema only comes from reflection
+ public static class DecimalRecordFixed {
+ // scale is required and will not be set by the conversion
+ @AvroSchema("{" +
+ "\"name\": \"decimal_9\"," +
+ "\"type\": \"fixed\"," +
+ "\"size\": 4," +
+ "\"logicalType\": \"decimal\"," +
+ "\"precision\": 9," +
+ "\"scale\": 2" +
+ "}")
+ private BigDecimal decimal;
+
+ @Override
+ public boolean equals(Object other) {
+ if (this == other) {
+ return true;
+ }
+
+ if (other == null || getClass() != other.getClass()) {
+ return false;
+ }
+
+ DecimalRecordFixed that = (DecimalRecordFixed) other;
+ if (decimal == null) {
+ return (that.decimal == null);
+ }
+
+ return decimal.equals(that.decimal);
+ }
+
+ @Override
+ public int hashCode() {
+ return decimal != null ? decimal.hashCode() : 0;
+ }
+ }
+
+ @Test
+ public void testDecimalFixed() throws IOException {
+ Schema schema = REFLECT.getSchema(DecimalRecordFixed.class);
+ Assert.assertEquals("Should have the correct record name",
+ "org.apache.parquet.avro.TestReflectLogicalTypes$",
+ schema.getNamespace());
+ Assert.assertEquals("Should have the correct record name",
+ "DecimalRecordFixed",
+ schema.getName());
+ Assert.assertEquals("Should have the correct logical type",
+ LogicalTypes.decimal(9, 2),
+ LogicalTypes.fromSchema(schema.getField("decimal").schema()));
+
+ DecimalRecordFixed record = new DecimalRecordFixed();
+ record.decimal = new BigDecimal("3.14");
+
+ File test = write(REFLECT, schema, record);
+ Assert.assertEquals("Should match the decimal after round trip",
+ Arrays.asList(record),
+ read(REFLECT, schema, test));
+ }
+
+ public static class Pair<X, Y> {
+ private final X first;
+ private final Y second;
+
+ private Pair(X first, Y second) {
+ this.first = first;
+ this.second = second;
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (this == other) {
+ return true;
+ }
+
+ if (other == null || getClass() != other.getClass()) {
+ return false;
+ }
+
+ Pair<?, ?> that = (Pair<?, ?>) other;
+ if (first == null) {
+ if (that.first != null) {
+ return false;
+ }
+ } else if (first.equals(that.first)) {
+ return false;
+ }
+
+ if (second == null) {
+ if (that.second != null) {
+ return false;
+ }
+ } else if (second.equals(that.second)) {
+ return false;
+ }
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ return Arrays.hashCode(new Object[] {first, second});
+ }
+
+ public static <X, Y> Pair<X, Y> of(X first, Y second) {
+ return new Pair<X, Y>(first, second);
+ }
+ }
+
+ public static class PairRecord {
+ @AvroSchema("{" +
+ "\"name\": \"Pair\"," +
+ "\"type\": \"record\"," +
+ "\"fields\": [" +
+ " {\"name\": \"x\", \"type\": \"long\"}," +
+ " {\"name\": \"y\", \"type\": \"long\"}" +
+ " ]," +
+ "\"logicalType\": \"pair\"" +
+ "}")
+ Pair<Long, Long> pair;
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testPairRecord() throws IOException {
+ ReflectData model = new ReflectData();
+ model.addLogicalTypeConversion(new Conversion<Pair>() {
+ @Override
+ public Class<Pair> getConvertedType() {
+ return Pair.class;
+ }
+
+ @Override
+ public String getLogicalTypeName() {
+ return "pair";
+ }
+
+ @Override
+ public Pair fromRecord(IndexedRecord value, Schema schema, LogicalType type) {
+ return Pair.of(value.get(0), value.get(1));
+ }
+
+ @Override
+ public IndexedRecord toRecord(Pair value, Schema schema, LogicalType type) {
+ GenericData.Record record = new GenericData.Record(schema);
+ record.put(0, value.first);
+ record.put(1, value.second);
+ return record;
+ }
+ });
+
+ LogicalTypes.register("pair", new LogicalTypes.LogicalTypeFactory() {
+ private final LogicalType PAIR = new LogicalType("pair");
+ @Override
+ public LogicalType fromSchema(Schema schema) {
+ return PAIR;
+ }
+ });
+
+ Schema schema = model.getSchema(PairRecord.class);
+ Assert.assertEquals("Should have the correct record name",
+ "org.apache.parquet.avro.TestReflectLogicalTypes$",
+ schema.getNamespace());
+ Assert.assertEquals("Should have the correct record name",
+ "PairRecord",
+ schema.getName());
+ Assert.assertEquals("Should have the correct logical type",
+ "pair",
+ LogicalTypes.fromSchema(schema.getField("pair").schema()).getName());
+
+ PairRecord record = new PairRecord();
+ record.pair = Pair.of(34L, 35L);
+ List<PairRecord> expected = new ArrayList<PairRecord>();
+ expected.add(record);
+
+ File test = write(model, schema, record);
+ Pair<Long, Long> actual = AvroTestUtil
+ .<PairRecord>read(model, schema, test)
+ .get(0).pair;
+ Assert.assertEquals("Data should match after serialization round-trip",
+ 34L, (long) actual.first);
+ Assert.assertEquals("Data should match after serialization round-trip",
+ 35L, (long) actual.second);
+ }
+
+ @Test
+ public void testReadUUID() throws IOException {
+ Schema uuidSchema = SchemaBuilder.record(RecordWithUUID.class.getName())
+ .fields().requiredString("uuid").endRecord();
+ LogicalTypes.uuid().addToSchema(uuidSchema.getField("uuid").schema());
+
+ UUID u1 = UUID.randomUUID();
+ UUID u2 = UUID.randomUUID();
+
+ RecordWithStringUUID r1 = new RecordWithStringUUID();
+ r1.uuid = u1.toString();
+ RecordWithStringUUID r2 = new RecordWithStringUUID();
+ r2.uuid = u2.toString();
+
+ List<RecordWithUUID> expected = Arrays.asList(
+ new RecordWithUUID(), new RecordWithUUID());
+ expected.get(0).uuid = u1;
+ expected.get(1).uuid = u2;
+
+ File test = write(
+ ReflectData.get().getSchema(RecordWithStringUUID.class), r1, r2);
+
+ Assert.assertEquals("Should convert Strings to UUIDs",
+ expected, read(REFLECT, uuidSchema, test));
+
+ // verify that the field's type overrides the logical type
+ Schema uuidStringSchema = SchemaBuilder
+ .record(RecordWithStringUUID.class.getName())
+ .fields().requiredString("uuid").endRecord();
+ LogicalTypes.uuid().addToSchema(uuidStringSchema.getField("uuid").schema());
+
+ Assert.assertEquals("Should not convert to UUID if accessor is String",
+ Arrays.asList(r1, r2),
+ read(REFLECT, uuidStringSchema, test));
+ }
+
+ @Test
+ public void testWriteUUID() throws IOException {
+ Schema uuidSchema = SchemaBuilder.record(RecordWithUUID.class.getName())
+ .fields().requiredString("uuid").endRecord();
+ LogicalTypes.uuid().addToSchema(uuidSchema.getField("uuid").schema());
+
+ UUID u1 = UUID.randomUUID();
+ UUID u2 = UUID.randomUUID();
+
+ RecordWithUUID r1 = new RecordWithUUID();
+ r1.uuid = u1;
+ RecordWithUUID r2 = new RecordWithUUID();
+ r2.uuid = u2;
+
+ List<RecordWithStringUUID> expected = Arrays.asList(
+ new RecordWithStringUUID(), new RecordWithStringUUID());
+ expected.get(0).uuid = u1.toString();
+ expected.get(1).uuid = u2.toString();
+
+ File test = write(REFLECT, uuidSchema, r1, r2);
+
+ // verify that the field's type overrides the logical type
+ Schema uuidStringSchema = SchemaBuilder
+ .record(RecordWithStringUUID.class.getName())
+ .fields().requiredString("uuid").endRecord();
+
+ Assert.assertEquals("Should read uuid as String without UUID conversion",
+ expected,
+ read(REFLECT, uuidStringSchema, test));
+
+ LogicalTypes.uuid().addToSchema(uuidStringSchema.getField("uuid").schema());
+ Assert.assertEquals("Should read uuid as String without UUID logical type",
+ expected,
+ read(ReflectData.get(), uuidStringSchema, test));
+ }
+
+ @Test
+ public void testWriteNullableUUID() throws IOException {
+ Schema nullableUuidSchema = SchemaBuilder.record(RecordWithUUID.class.getName())
+ .fields().optionalString("uuid").endRecord();
+ LogicalTypes.uuid().addToSchema(
+ nullableUuidSchema.getField("uuid").schema().getTypes().get(1));
+
+ UUID u1 = UUID.randomUUID();
+ UUID u2 = UUID.randomUUID();
+
+ RecordWithUUID r1 = new RecordWithUUID();
+ r1.uuid = u1;
+ RecordWithUUID r2 = new RecordWithUUID();
+ r2.uuid = u2;
+
+ List<RecordWithStringUUID> expected = Arrays.asList(
+ new RecordWithStringUUID(), new RecordWithStringUUID());
+ expected.get(0).uuid = u1.toString();
+ expected.get(1).uuid = u2.toString();
+
+ File test = write(REFLECT, nullableUuidSchema, r1, r2);
+
+ // verify that the field's type overrides the logical type
+ Schema nullableUuidStringSchema = SchemaBuilder
+ .record(RecordWithStringUUID.class.getName())
+ .fields().optionalString("uuid").endRecord();
+
+ Assert.assertEquals("Should read uuid as String without UUID conversion",
+ expected,
+ read(REFLECT, nullableUuidStringSchema, test));
+ }
+
+ @Test(expected = ClassCastException.class)
+ public void testWriteUUIDMissingLogicalType() throws IOException {
+ Schema uuidSchema = SchemaBuilder.record(RecordWithUUID.class.getName())
+ .fields().requiredString("uuid").endRecord();
+ LogicalTypes.uuid().addToSchema(uuidSchema.getField("uuid").schema());
+
+ UUID u1 = UUID.randomUUID();
+ UUID u2 = UUID.randomUUID();
+
+ RecordWithUUID r1 = new RecordWithUUID();
+ r1.uuid = u1;
+ RecordWithUUID r2 = new RecordWithUUID();
+ r2.uuid = u2;
+
+ // write without using REFLECT, which has the logical type
+ File test = write(uuidSchema, r1, r2);
+
+ // verify that the field's type overrides the logical type
+ Schema uuidStringSchema = SchemaBuilder
+ .record(RecordWithStringUUID.class.getName())
+ .fields().requiredString("uuid").endRecord();
+
+ // this fails with an AppendWriteException wrapping ClassCastException
+ // because the UUID isn't converted to a CharSequence expected internally
+ read(ReflectData.get(), uuidStringSchema, test);
+ }
+
+ @Test
+ public void testReadUUIDGenericRecord() throws IOException {
+ Schema uuidSchema = SchemaBuilder.record("RecordWithUUID")
+ .fields().requiredString("uuid").endRecord();
+ LogicalTypes.uuid().addToSchema(uuidSchema.getField("uuid").schema());
+
+ UUID u1 = UUID.randomUUID();
+ UUID u2 = UUID.randomUUID();
+
+ RecordWithStringUUID r1 = new RecordWithStringUUID();
+ r1.uuid = u1.toString();
+ RecordWithStringUUID r2 = new RecordWithStringUUID();
+ r2.uuid = u2.toString();
+
+ List<GenericData.Record> expected = Arrays.asList(
+ new GenericData.Record(uuidSchema), new GenericData.Record(uuidSchema));
+ expected.get(0).put("uuid", u1);
+ expected.get(1).put("uuid", u2);
+
+ File test = write(
+ ReflectData.get().getSchema(RecordWithStringUUID.class), r1, r2);
+
+ Assert.assertEquals("Should convert Strings to UUIDs",
+ expected, read(REFLECT, uuidSchema, test));
+
+ // verify that the field's type overrides the logical type
+ Schema uuidStringSchema = SchemaBuilder
+ .record(RecordWithStringUUID.class.getName())
+ .fields().requiredString("uuid").endRecord();
+ LogicalTypes.uuid().addToSchema(uuidSchema.getField("uuid").schema());
+
+ Assert.assertEquals("Should not convert to UUID if accessor is String",
+ Arrays.asList(r1, r2),
+ read(REFLECT, uuidStringSchema, test));
+ }
+
+ @Test
+ public void testReadUUIDArray() throws IOException {
+ Schema uuidArraySchema = SchemaBuilder.record(RecordWithUUIDArray.class.getName())
+ .fields()
+ .name("uuids").type().array().items().stringType().noDefault()
+ .endRecord();
+ LogicalTypes.uuid().addToSchema(
+ uuidArraySchema.getField("uuids").schema().getElementType());
+
+ UUID u1 = UUID.randomUUID();
+ UUID u2 = UUID.randomUUID();
+
+ GenericRecord r = new GenericData.Record(uuidArraySchema);
+ r.put("uuids", Arrays.asList(u1.toString(), u2.toString()));
+
+ RecordWithUUIDArray expected = new RecordWithUUIDArray();
+ expected.uuids = new UUID[] {u1, u2};
+
+ File test = write(uuidArraySchema, r);
+
+ Assert.assertEquals("Should convert Strings to UUIDs",
+ expected,
+ read(REFLECT, uuidArraySchema, test).get(0));
+ }
+
+ @Test
+ public void testWriteUUIDArray() throws IOException {
+ Schema uuidArraySchema = SchemaBuilder.record(RecordWithUUIDArray.class.getName())
+ .fields()
+ .name("uuids").type().array().items().stringType().noDefault()
+ .endRecord();
+ LogicalTypes.uuid().addToSchema(
+ uuidArraySchema.getField("uuids").schema().getElementType());
+
+ Schema stringArraySchema = SchemaBuilder.record("RecordWithUUIDArray")
+ .fields()
+ .name("uuids").type().array().items().stringType().noDefault()
+ .endRecord();
+ stringArraySchema.getField("uuids").schema()
+ .addProp(SpecificData.CLASS_PROP, List.class.getName());
+
+ UUID u1 = UUID.randomUUID();
+ UUID u2 = UUID.randomUUID();
+
+ GenericRecord expected = new GenericData.Record(stringArraySchema);
+ List<String> uuids = new ArrayList<String>();
+ uuids.add(u1.toString());
+ uuids.add(u2.toString());
+ expected.put("uuids", uuids);
+
+ RecordWithUUIDArray r = new RecordWithUUIDArray();
+ r.uuids = new UUID[] {u1, u2};
+
+ File test = write(REFLECT, uuidArraySchema, r);
+
+ Assert.assertEquals("Should read UUIDs as Strings",
+ expected,
+ read(ReflectData.get(), stringArraySchema, test).get(0));
+ }
+
+ @Test
+ public void testReadUUIDList() throws IOException {
+ Schema uuidListSchema = SchemaBuilder.record(RecordWithUUIDList.class.getName())
+ .fields()
+ .name("uuids").type().array().items().stringType().noDefault()
+ .endRecord();
+ uuidListSchema.getField("uuids").schema().addProp(
+ SpecificData.CLASS_PROP, List.class.getName());
+ LogicalTypes.uuid().addToSchema(
+ uuidListSchema.getField("uuids").schema().getElementType());
+
+ UUID u1 = UUID.randomUUID();
+ UUID u2 = UUID.randomUUID();
+
+ GenericRecord r = new GenericData.Record(uuidListSchema);
+ r.put("uuids", Arrays.asList(u1.toString(), u2.toString()));
+
+ RecordWithUUIDList expected = new RecordWithUUIDList();
+ expected.uuids = Arrays.asList(u1, u2);
+
+ File test = write(uuidListSchema, r);
+
+ Assert.assertEquals("Should convert Strings to UUIDs",
+ expected, read(REFLECT, uuidListSchema, test).get(0));
+ }
+
+ @Test
+ public void testWriteUUIDList() throws IOException {
+ Schema uuidListSchema = SchemaBuilder.record(RecordWithUUIDList.class.getName())
+ .fields()
+ .name("uuids").type().array().items().stringType().noDefault()
+ .endRecord();
+ uuidListSchema.getField("uuids").schema().addProp(
+ SpecificData.CLASS_PROP, List.class.getName());
+ LogicalTypes.uuid().addToSchema(
+ uuidListSchema.getField("uuids").schema().getElementType());
+
+ Schema stringArraySchema = SchemaBuilder.record("RecordWithUUIDArray")
+ .fields()
+ .name("uuids").type().array().items().stringType().noDefault()
+ .endRecord();
+ stringArraySchema.getField("uuids").schema()
+ .addProp(SpecificData.CLASS_PROP, List.class.getName());
+
+ UUID u1 = UUID.randomUUID();
+ UUID u2 = UUID.randomUUID();
+
+ GenericRecord expected = new GenericData.Record(stringArraySchema);
+ expected.put("uuids", Arrays.asList(u1.toString(), u2.toString()));
+
+ RecordWithUUIDList r = new RecordWithUUIDList();
+ r.uuids = Arrays.asList(u1, u2);
+
+ File test = write(REFLECT, uuidListSchema, r);
+
+ Assert.assertEquals("Should read UUIDs as Strings",
+ expected,
+ read(REFLECT, stringArraySchema, test).get(0));
+ }
+
+ private <D> File write(Schema schema, D... data) throws IOException {
+ return write(ReflectData.get(), schema, data);
+ }
+
+ @SuppressWarnings("unchecked")
+ private <D> File write(GenericData model, Schema schema, D... data) throws IOException {
+ return AvroTestUtil.write(temp, model, schema, data);
+ }
+}
+
+class RecordWithUUID {
+ UUID uuid;
+
+ @Override
+ public int hashCode() {
+ return uuid.hashCode();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj == null) {
+ return false;
+ }
+ if (!(obj instanceof RecordWithUUID)) {
+ return false;
+ }
+ RecordWithUUID that = (RecordWithUUID) obj;
+ return this.uuid.equals(that.uuid);
+ }
+}
+
+class RecordWithStringUUID {
+ String uuid;
+
+ @Override
+ public int hashCode() {
+ return uuid.hashCode();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj == null) {
+ return false;
+ }
+ if (!(obj instanceof RecordWithStringUUID)) {
+ return false;
+ }
+ RecordWithStringUUID that = (RecordWithStringUUID) obj;
+ return this.uuid.equals(that.uuid);
+ }
+}
+
+class RecordWithUUIDArray {
+ UUID[] uuids;
+
+ @Override
+ public int hashCode() {
+ return Arrays.hashCode(uuids);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj == null) {
+ return false;
+ }
+ if (!(obj instanceof RecordWithUUIDArray)) {
+ return false;
+ }
+ RecordWithUUIDArray that = (RecordWithUUIDArray) obj;
+ return Arrays.equals(this.uuids, that.uuids);
+ }
+}
+
+class RecordWithUUIDList {
+ List<UUID> uuids;
+
+ @Override
+ public int hashCode() {
+ return uuids.hashCode();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj == null) {
+ return false;
+ }
+ if (!(obj instanceof RecordWithUUIDList)) {
+ return false;
+ }
+ RecordWithUUIDList that = (RecordWithUUIDList) obj;
+ return this.uuids.equals(that.uuids);
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b24a1d1/parquet-column/src/main/java/org/apache/parquet/io/api/Binary.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/io/api/Binary.java b/parquet-column/src/main/java/org/apache/parquet/io/api/Binary.java
index 4a167bd..30787f0 100644
--- a/parquet-column/src/main/java/org/apache/parquet/io/api/Binary.java
+++ b/parquet-column/src/main/java/org/apache/parquet/io/api/Binary.java
@@ -25,6 +25,10 @@ import java.io.OutputStream;
import java.io.Serializable;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
+import java.nio.CharBuffer;
+import java.nio.charset.CharacterCodingException;
+import java.nio.charset.CharsetEncoder;
+import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import org.apache.parquet.io.ParquetEncodingException;
@@ -209,26 +213,33 @@ abstract public class Binary implements Comparable<Binary>, Serializable {
}
- private static class FromStringBinary extends ByteArrayBackedBinary {
- public FromStringBinary(String value) {
- // reused is false, because we do not
- // hold on to the underlying bytes,
- // and nobody else has a handle to them
+ private static class FromStringBinary extends ByteBufferBackedBinary {
+ public FromStringBinary(CharSequence value) {
+ // reused is false, because we do not hold on to the buffer after
+ // conversion, and nobody else has a handle to it
super(encodeUTF8(value), false);
}
- private static byte[] encodeUTF8(String value) {
- try {
- return value.getBytes("UTF-8");
- } catch (UnsupportedEncodingException e) {
- throw new ParquetEncodingException("UTF-8 not supported.", e);
- }
- }
-
@Override
public String toString() {
return "Binary{\"" + toStringUsingUTF8() + "\"}";
}
+
+ private static final ThreadLocal<CharsetEncoder> ENCODER =
+ new ThreadLocal<CharsetEncoder>() {
+ @Override
+ protected CharsetEncoder initialValue() {
+ return StandardCharsets.UTF_8.newEncoder();
+ }
+ };
+
+ private static ByteBuffer encodeUTF8(CharSequence value) {
+ try {
+ return ENCODER.get().encode(CharBuffer.wrap(value));
+ } catch (CharacterCodingException e) {
+ throw new ParquetEncodingException("UTF-8 not supported.", e);
+ }
+ }
}
public static Binary fromReusedByteArray(final byte[] value, final int offset, final int length) {
@@ -359,6 +370,13 @@ abstract public class Binary implements Comparable<Binary>, Serializable {
private int offset;
private int length;
+ public ByteBufferBackedBinary(ByteBuffer value, boolean isBackingBytesReused) {
+ this.value = value;
+ this.offset = value.position();
+ this.length = value.remaining();
+ this.isBackingBytesReused = isBackingBytesReused;
+ }
+
public ByteBufferBackedBinary(ByteBuffer value, int offset, int length, boolean isBackingBytesReused) {
this.value = value;
this.offset = offset;
@@ -521,11 +539,11 @@ abstract public class Binary implements Comparable<Binary>, Serializable {
}
public static Binary fromReusedByteBuffer(final ByteBuffer value) {
- return new ByteBufferBackedBinary(value, value.position(), value.remaining(), true);
+ return new ByteBufferBackedBinary(value, true);
}
public static Binary fromConstantByteBuffer(final ByteBuffer value) {
- return new ByteBufferBackedBinary(value, value.position(), value.remaining(), false);
+ return new ByteBufferBackedBinary(value, false);
}
@Deprecated
@@ -536,7 +554,12 @@ abstract public class Binary implements Comparable<Binary>, Serializable {
return fromReusedByteBuffer(value); // Assume producer intends to reuse byte[]
}
- public static Binary fromString(final String value) {
+ public static Binary fromString(String value) {
+ // this method is for binary backward-compatibility
+ return fromString((CharSequence) value);
+ }
+
+ public static Binary fromString(CharSequence value) {
return new FromStringBinary(value);
}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b24a1d1/parquet-column/src/main/java/org/apache/parquet/schema/Types.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/schema/Types.java b/parquet-column/src/main/java/org/apache/parquet/schema/Types.java
index d3cc97b..9af71af 100644
--- a/parquet-column/src/main/java/org/apache/parquet/schema/Types.java
+++ b/parquet-column/src/main/java/org/apache/parquet/schema/Types.java
@@ -336,9 +336,9 @@ public class Types {
* @param length an int length
* @return this builder for method chaining
*/
- public BasePrimitiveBuilder<P, THIS> length(int length) {
+ public THIS length(int length) {
this.length = length;
- return this;
+ return self();
}
/**
@@ -351,9 +351,9 @@ public class Types {
* @param precision an int precision value for the DECIMAL
* @return this builder for method chaining
*/
- public BasePrimitiveBuilder<P, THIS> precision(int precision) {
+ public THIS precision(int precision) {
this.precision = precision;
- return this;
+ return self();
}
/**
@@ -369,9 +369,9 @@ public class Types {
* @param scale an int scale value for the DECIMAL
* @return this builder for method chaining
*/
- public BasePrimitiveBuilder<P, THIS> scale(int scale) {
+ public THIS scale(int scale) {
this.scale = scale;
- return this;
+ return self();
}
@Override
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b24a1d1/parquet-column/src/test/java/org/apache/parquet/io/api/TestBinary.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/org/apache/parquet/io/api/TestBinary.java b/parquet-column/src/test/java/org/apache/parquet/io/api/TestBinary.java
index 88aa4b2..a541e1b 100644
--- a/parquet-column/src/test/java/org/apache/parquet/io/api/TestBinary.java
+++ b/parquet-column/src/test/java/org/apache/parquet/io/api/TestBinary.java
@@ -198,16 +198,6 @@ public class TestBinary {
Binary copy = bao.binary.copy();
assertSame(copy, bao.binary);
-
- mutate(bao.original);
-
- byte[] expected = testString.getBytes(UTF8);
- mutate(expected);
-
- assertArrayEquals(expected, copy.getBytes());
- assertArrayEquals(expected, copy.getBytesUnsafe());
- assertArrayEquals(expected, copy.copy().getBytesUnsafe());
- assertArrayEquals(expected, copy.copy().getBytes());
}
private void testReusedCopy(BinaryFactory bf) throws Exception {
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b24a1d1/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index f606faa..9314a53 100644
--- a/pom.xml
+++ b/pom.xml
@@ -93,6 +93,7 @@
<fastutil.version>6.5.7</fastutil.version>
<semver.api.version>0.9.33</semver.api.version>
<slf4j.version>1.7.5</slf4j.version>
+ <avro.version>1.8.0</avro.version>
</properties>
<modules>
[2/2] parquet-mr git commit: PARQUET-358: Add support for Avro's
logical types API.
Posted by bl...@apache.org.
PARQUET-358: Add support for Avro's logical types API.
This adds support for Avro's logical types API to parquet-avro.
* The logical types API was introduced in Avro 1.8.0, so this bumps the Avro dependency version to 1.8.0.
* Types supported are: decimal, date, time-millis, time-micros, timestamp-millis, and timestamp-micros
* Tests have been copied from Avro and ported to the parquet-avro API
Author: Ryan Blue <bl...@apache.org>
Closes #318 from rdblue/PARQUET-358-add-avro-logical-types-api and squashes the following commits:
bd81f9c [Ryan Blue] PARQUET-358: Fix review items.
0a882ee [Ryan Blue] PARQUET-358: Add logical types circular reference test.
5124618 [Ryan Blue] PARQUET-358: Add license documentation for code from Avro.
dcb14be [Ryan Blue] PARQUET-358: Add support for Avro's logical types API.
Project: http://git-wip-us.apache.org/repos/asf/parquet-mr/repo
Commit: http://git-wip-us.apache.org/repos/asf/parquet-mr/commit/6b24a1d1
Tree: http://git-wip-us.apache.org/repos/asf/parquet-mr/tree/6b24a1d1
Diff: http://git-wip-us.apache.org/repos/asf/parquet-mr/diff/6b24a1d1
Branch: refs/heads/master
Commit: 6b24a1d1b5e2792a7821ad172a45e38d2b04f9b8
Parents: 82b8ecc
Author: Ryan Blue <bl...@apache.org>
Authored: Wed Apr 20 08:41:22 2016 -0700
Committer: Ryan Blue <bl...@apache.org>
Committed: Wed Apr 20 08:41:22 2016 -0700
----------------------------------------------------------------------
LICENSE | 8 +
NOTICE | 11 +
parquet-avro/pom.xml | 4 -
.../avro/AvroIndexedRecordConverter.java | 18 +-
.../apache/parquet/avro/AvroReadSupport.java | 4 +-
.../parquet/avro/AvroRecordConverter.java | 121 +++-
.../parquet/avro/AvroSchemaConverter.java | 147 ++--
.../apache/parquet/avro/AvroWriteSupport.java | 167 +++--
.../parquet/avro/ParentValueContainer.java | 175 +++++
.../src/main/resources/META-INF/LICENSE | 186 +++++
parquet-avro/src/main/resources/META-INF/NOTICE | 18 +
.../org/apache/parquet/avro/AvroTestUtil.java | 53 ++
.../parquet/avro/TestAvroSchemaConverter.java | 278 +++++++-
.../parquet/avro/TestCircularReferences.java | 383 ++++++++++
.../parquet/avro/TestGenericLogicalTypes.java | 271 +++++++
.../org/apache/parquet/avro/TestReadWrite.java | 118 +++-
.../avro/TestReadWriteOldListBehavior.java | 1 -
.../parquet/avro/TestReflectLogicalTypes.java | 705 +++++++++++++++++++
.../java/org/apache/parquet/io/api/Binary.java | 55 +-
.../java/org/apache/parquet/schema/Types.java | 12 +-
.../org/apache/parquet/io/api/TestBinary.java | 10 -
pom.xml | 1 +
22 files changed, 2593 insertions(+), 153 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b24a1d1/LICENSE
----------------------------------------------------------------------
diff --git a/LICENSE b/LICENSE
index b759148..b006581 100644
--- a/LICENSE
+++ b/LICENSE
@@ -178,6 +178,14 @@
--------------------------------------------------------------------------------
+This product includes code from Apache Avro.
+
+Copyright: 2014 The Apache Software Foundation.
+Home page: https://avro.apache.org/
+License: http://www.apache.org/licenses/LICENSE-2.0
+
+--------------------------------------------------------------------------------
+
This project includes code from Daniel Lemire's JavaFastPFOR project. The
"Lemire" bit packing source code produced by parquet-generator is derived from
the JavaFastPFOR project.
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b24a1d1/NOTICE
----------------------------------------------------------------------
diff --git a/NOTICE b/NOTICE
index c6e3bf2..a9b6c56 100644
--- a/NOTICE
+++ b/NOTICE
@@ -43,3 +43,14 @@ with the following copyright notice:
See the License for the specific language governing permissions and
limitations under the License.
+--------------------------------------------------------------------------------
+
+This product includes code from Apache Avro, which includes the following in
+its NOTICE file:
+
+ Apache Avro
+ Copyright 2010-2015 The Apache Software Foundation
+
+ This product includes software developed at
+ The Apache Software Foundation (http://www.apache.org/).
+
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b24a1d1/parquet-avro/pom.xml
----------------------------------------------------------------------
diff --git a/parquet-avro/pom.xml b/parquet-avro/pom.xml
index aad197d..50c37db 100644
--- a/parquet-avro/pom.xml
+++ b/parquet-avro/pom.xml
@@ -32,10 +32,6 @@
<name>Apache Parquet Avro</name>
<url>https://parquet.apache.org</url>
- <properties>
- <avro.version>1.7.6</avro.version>
- </properties>
-
<dependencies>
<dependency>
<groupId>org.apache.parquet</groupId>
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b24a1d1/parquet-avro/src/main/java/org/apache/parquet/avro/AvroIndexedRecordConverter.java
----------------------------------------------------------------------
diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroIndexedRecordConverter.java b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroIndexedRecordConverter.java
index 06c66d6..48eab4d 100644
--- a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroIndexedRecordConverter.java
+++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroIndexedRecordConverter.java
@@ -21,6 +21,8 @@ package org.apache.parquet.avro;
import java.lang.reflect.Constructor;
import java.util.HashMap;
import java.util.Map;
+import org.apache.avro.Conversion;
+import org.apache.avro.LogicalType;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericArray;
import org.apache.avro.generic.GenericData;
@@ -111,6 +113,11 @@ class AvroIndexedRecordConverter<T extends IndexedRecord> extends GroupConverter
@SuppressWarnings("unchecked")
private static <T> Class<T> getDatumClass(GenericData model, Schema schema) {
+ if (model.getConversionFor(schema.getLogicalType()) != null) {
+ // use generic classes to pass data to conversions
+ return null;
+ }
+
if (model instanceof SpecificData) {
return (Class<T>) ((SpecificData) model).getClass(schema);
}
@@ -133,7 +140,16 @@ class AvroIndexedRecordConverter<T extends IndexedRecord> extends GroupConverter
}
private static Converter newConverter(Schema schema, Type type,
- GenericData model, ParentValueContainer parent) {
+ GenericData model, ParentValueContainer setter) {
+
+ LogicalType logicalType = schema.getLogicalType();
+ // the expected type is always null because it is determined by the parent
+ // datum class, which never helps for generic. when logical types are added
+ // to specific, this should pass the expected type here.
+ Conversion<?> conversion = model.getConversionFor(logicalType);
+ ParentValueContainer parent = ParentValueContainer
+ .getConversionContainer(setter, conversion, schema);
+
if (schema.getType().equals(Schema.Type.BOOLEAN)) {
return new AvroConverters.FieldBooleanConverter(parent);
} else if (schema.getType().equals(Schema.Type.INT)) {
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b24a1d1/parquet-avro/src/main/java/org/apache/parquet/avro/AvroReadSupport.java
----------------------------------------------------------------------
diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroReadSupport.java b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroReadSupport.java
index e73e8af..7d55bf5 100644
--- a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroReadSupport.java
+++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroReadSupport.java
@@ -110,9 +110,9 @@ public class AvroReadSupport<T> extends ReadSupport<T> {
MessageType parquetSchema = readContext.getRequestedSchema();
Schema avroSchema;
- if (readContext.getReadSupportMetadata().get(AVRO_READ_SCHEMA_METADATA_KEY) != null) {
+ if (metadata.get(AVRO_READ_SCHEMA_METADATA_KEY) != null) {
// use the Avro read schema provided by the user
- avroSchema = new Schema.Parser().parse(readContext.getReadSupportMetadata().get(AVRO_READ_SCHEMA_METADATA_KEY));
+ avroSchema = new Schema.Parser().parse(metadata.get(AVRO_READ_SCHEMA_METADATA_KEY));
} else if (keyValueMetaData.get(AVRO_SCHEMA_METADATA_KEY) != null) {
// use the Avro schema from the file metadata if present
avroSchema = new Schema.Parser().parse(keyValueMetaData.get(AVRO_SCHEMA_METADATA_KEY));
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b24a1d1/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordConverter.java
----------------------------------------------------------------------
diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordConverter.java b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordConverter.java
index 38a761c..10bb29b 100644
--- a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordConverter.java
+++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordConverter.java
@@ -29,6 +29,7 @@ import it.unimi.dsi.fastutil.shorts.ShortArrayList;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
+import java.lang.reflect.Modifier;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
@@ -36,8 +37,14 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.LinkedHashMap;
+import org.apache.avro.AvroTypeException;
+import org.apache.avro.Conversion;
+import org.apache.avro.LogicalType;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
+import org.apache.avro.reflect.AvroIgnore;
+import org.apache.avro.reflect.AvroName;
import org.apache.avro.reflect.ReflectData;
import org.apache.avro.reflect.Stringable;
import org.apache.avro.specific.SpecificData;
@@ -69,7 +76,8 @@ class AvroRecordConverter<T> extends AvroConverters.AvroGroupConverter {
private static final String JAVA_CLASS_PROP = "java-class";
private static final String JAVA_KEY_CLASS_PROP = "java-key-class";
- protected T currentRecord;
+ protected T currentRecord = null;
+ private ParentValueContainer rootContainer = null;
private final Converter[] converters;
private final Schema avroSchema;
@@ -80,6 +88,15 @@ class AvroRecordConverter<T> extends AvroConverters.AvroGroupConverter {
public AvroRecordConverter(MessageType parquetSchema, Schema avroSchema,
GenericData baseModel) {
this(null, parquetSchema, avroSchema, baseModel);
+ LogicalType logicalType = avroSchema.getLogicalType();
+ Conversion<?> conversion = baseModel.getConversionFor(logicalType);
+ this.rootContainer = ParentValueContainer.getConversionContainer(new ParentValueContainer() {
+ @Override
+ @SuppressWarnings("unchecked")
+ public void add(Object value) {
+ AvroRecordConverter.this.currentRecord = (T) value;
+ }
+ }, conversion, avroSchema);
}
public AvroRecordConverter(ParentValueContainer parent,
@@ -101,6 +118,8 @@ class AvroRecordConverter<T> extends AvroConverters.AvroGroupConverter {
recordClass = getDatumClass(avroSchema, model);
}
+ Map<String, Class<?>> fields = getFieldsByName(recordClass, false);
+
int parquetFieldIndex = 0;
for (Type parquetField: parquetSchema.getFields()) {
final Schema.Field avroField = getAvroField(parquetField.getName());
@@ -112,8 +131,10 @@ class AvroRecordConverter<T> extends AvroConverters.AvroGroupConverter {
AvroRecordConverter.this.set(avroField.name(), finalAvroIndex, value);
}
};
+
+ Class<?> fieldClass = fields.get(avroField.name());
converters[parquetFieldIndex] = newConverter(
- nonNullSchema, parquetField, this.model, container);
+ nonNullSchema, parquetField, this.model, fieldClass, container);
// @Stringable doesn't affect the reflected schema; must be enforced here
if (recordClass != null &&
@@ -147,6 +168,43 @@ class AvroRecordConverter<T> extends AvroConverters.AvroGroupConverter {
}
}
+ // this was taken from Avro's ReflectData
+ private static Map<String, Class<?>> getFieldsByName(Class<?> recordClass,
+ boolean excludeJava) {
+ Map<String, Class<?>> fields = new LinkedHashMap<String, Class<?>>();
+
+ if (recordClass != null) {
+ Class<?> current = recordClass;
+ do {
+ if (excludeJava && current.getPackage() != null
+ && current.getPackage().getName().startsWith("java.")) {
+ break; // skip java built-in classes
+ }
+ for (Field field : current.getDeclaredFields()) {
+ if (field.isAnnotationPresent(AvroIgnore.class) ||
+ isTransientOrStatic(field)) {
+ continue;
+ }
+ AvroName altName = field.getAnnotation(AvroName.class);
+ Class<?> existing = fields.put(
+ altName != null ? altName.value() : field.getName(),
+ field.getType());
+ if (existing != null) {
+ throw new AvroTypeException(
+ current + " contains two fields named: " + field.getName());
+ }
+ }
+ current = current.getSuperclass();
+ } while (current != null);
+ }
+
+ return fields;
+ }
+
+ private static boolean isTransientOrStatic(Field field) {
+ return (field.getModifiers() & (Modifier.TRANSIENT | Modifier.STATIC)) != 0;
+ }
+
private Schema.Field getAvroField(String parquetFieldName) {
Schema.Field avroField = avroSchema.getField(parquetFieldName);
if (avroField != null) {
@@ -164,12 +222,28 @@ class AvroRecordConverter<T> extends AvroConverters.AvroGroupConverter {
parquetFieldName));
}
+ private static Converter newConverter(
+ Schema schema, Type type, GenericData model, ParentValueContainer setter) {
+ return newConverter(schema, type, model, null, setter);
+ }
+
private static Converter newConverter(Schema schema, Type type,
- GenericData model, ParentValueContainer parent) {
+ GenericData model, Class<?> knownClass, ParentValueContainer setter) {
+ LogicalType logicalType = schema.getLogicalType();
+ Conversion<?> conversion;
+ if (knownClass != null) {
+ conversion = model.getConversionByClass(knownClass, logicalType);
+ } else {
+ conversion = model.getConversionFor(logicalType);
+ }
+
+ ParentValueContainer parent = ParentValueContainer
+ .getConversionContainer(setter, conversion, schema);
+
if (schema.getType().equals(Schema.Type.BOOLEAN)) {
return new AvroConverters.FieldBooleanConverter(parent);
} else if (schema.getType().equals(Schema.Type.INT)) {
- Class<?> datumClass = getDatumClass(schema, model);
+ Class<?> datumClass = getDatumClass(conversion, knownClass, schema, model);
if (datumClass == null) {
return new AvroConverters.FieldIntegerConverter(parent);
} else if (datumClass == byte.class || datumClass == Byte.class) {
@@ -187,7 +261,7 @@ class AvroRecordConverter<T> extends AvroConverters.AvroGroupConverter {
} else if (schema.getType().equals(Schema.Type.DOUBLE)) {
return new AvroConverters.FieldDoubleConverter(parent);
} else if (schema.getType().equals(Schema.Type.BYTES)) {
- Class<?> datumClass = getDatumClass(schema, model);
+ Class<?> datumClass = getDatumClass(conversion, knownClass, schema, model);
if (datumClass == null) {
return new AvroConverters.FieldByteBufferConverter(parent);
} else if (datumClass.isArray() && datumClass.getComponentType() == byte.class) {
@@ -201,7 +275,7 @@ class AvroRecordConverter<T> extends AvroConverters.AvroGroupConverter {
} else if (schema.getType().equals(Schema.Type.ENUM)) {
return new AvroConverters.FieldEnumConverter(parent, schema, model);
} else if (schema.getType().equals(Schema.Type.ARRAY)) {
- Class<?> datumClass = getDatumClass(schema, model);
+ Class<?> datumClass = getDatumClass(conversion, knownClass, schema, model);
if (datumClass != null && datumClass.isArray()) {
return new AvroArrayConverter(
parent, type.asGroupType(), schema, model, datumClass);
@@ -265,8 +339,24 @@ class AvroRecordConverter<T> extends AvroConverters.AvroGroupConverter {
}
}
- @SuppressWarnings("unchecked")
private static <T> Class<T> getDatumClass(Schema schema, GenericData model) {
+ return getDatumClass(null, null, schema, model);
+ }
+
+ @SuppressWarnings("unchecked")
+ private static <T> Class<T> getDatumClass(Conversion<?> conversion,
+ Class<T> knownClass,
+ Schema schema, GenericData model) {
+ if (conversion != null) {
+ // use generic classes to pass data to conversions
+ return null;
+ }
+
+ // known class can be set when using reflect
+ if (knownClass != null) {
+ return knownClass;
+ }
+
if (model instanceof SpecificData) {
// this works for reflect as well
return ((SpecificData) model).getClass(schema);
@@ -314,6 +404,9 @@ class AvroRecordConverter<T> extends AvroConverters.AvroGroupConverter {
fillInDefaults();
if (parent != null) {
parent.add(currentRecord);
+ } else {
+ // this applies any converters needed for the root value
+ rootContainer.add(currentRecord);
}
}
@@ -502,10 +595,10 @@ class AvroRecordConverter<T> extends AvroConverters.AvroGroupConverter {
// matching it against the element schema.
if (isElementType(repeatedType, elementSchema)) {
// the element type is the repeated type (and required)
- converter = newConverter(elementSchema, repeatedType, model, setter);
+ converter = newConverter(elementSchema, repeatedType, model, elementClass, setter);
} else {
// the element is wrapped in a synthetic group and may be optional
- converter = new PrimitiveElementConverter(
+ converter = new ArrayElementConverter(
repeatedType.asGroupType(), elementSchema, model, setter);
}
}
@@ -643,20 +736,20 @@ class AvroRecordConverter<T> extends AvroConverters.AvroGroupConverter {
* }
* </pre>
*/
- final class PrimitiveElementConverter extends GroupConverter {
+ final class ArrayElementConverter extends GroupConverter {
private boolean isSet;
private final Converter elementConverter;
- public PrimitiveElementConverter(GroupType repeatedType,
- Schema elementSchema, GenericData model,
- final ParentValueContainer setter) {
+ public ArrayElementConverter(GroupType repeatedType,
+ Schema elementSchema, GenericData model,
+ final ParentValueContainer setter) {
Type elementType = repeatedType.getType(0);
Preconditions.checkArgument(
!elementClass.isPrimitive() || elementType.isRepetition(REQUIRED),
"Cannot convert list of optional elements to primitive array");
Schema nonNullElementSchema = AvroSchemaConverter.getNonNull(elementSchema);
this.elementConverter = newConverter(
- nonNullElementSchema, elementType, model, new ParentValueContainer() {
+ nonNullElementSchema, elementType, model, elementClass, new ParentValueContainer() {
@Override
public void add(Object value) {
isSet = true;
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b24a1d1/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java
----------------------------------------------------------------------
diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java
index 6cfa8d1..6b9b94c 100644
--- a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java
+++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java
@@ -18,20 +18,26 @@
*/
package org.apache.parquet.avro;
-import java.util.*;
-
+import org.apache.avro.LogicalType;
+import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;
import org.apache.hadoop.conf.Configuration;
-import org.codehaus.jackson.node.NullNode;
import org.apache.parquet.schema.ConversionPatterns;
+import org.apache.parquet.schema.DecimalMetadata;
import org.apache.parquet.schema.GroupType;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.OriginalType;
import org.apache.parquet.schema.PrimitiveType;
-import org.apache.parquet.schema.Type;
import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
+import org.apache.parquet.schema.Type;
+import org.apache.parquet.schema.Types;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import static org.apache.avro.JsonProperties.NULL_VALUE;
import static org.apache.parquet.avro.AvroWriteSupport.WRITE_OLD_LIST_STRUCTURE;
import static org.apache.parquet.avro.AvroWriteSupport.WRITE_OLD_LIST_STRUCTURE_DEFAULT;
import static org.apache.parquet.schema.OriginalType.*;
@@ -113,26 +119,28 @@ public class AvroSchemaConverter {
return convertField(fieldName, schema, Type.Repetition.REQUIRED);
}
+ @SuppressWarnings("deprecation")
private Type convertField(String fieldName, Schema schema, Type.Repetition repetition) {
+ Types.PrimitiveBuilder<PrimitiveType> builder;
Schema.Type type = schema.getType();
if (type.equals(Schema.Type.BOOLEAN)) {
- return primitive(fieldName, BOOLEAN, repetition);
+ builder = Types.primitive(BOOLEAN, repetition);
} else if (type.equals(Schema.Type.INT)) {
- return primitive(fieldName, INT32, repetition);
+ builder = Types.primitive(INT32, repetition);
} else if (type.equals(Schema.Type.LONG)) {
- return primitive(fieldName, INT64, repetition);
+ builder = Types.primitive(INT64, repetition);
} else if (type.equals(Schema.Type.FLOAT)) {
- return primitive(fieldName, FLOAT, repetition);
+ builder = Types.primitive(FLOAT, repetition);
} else if (type.equals(Schema.Type.DOUBLE)) {
- return primitive(fieldName, DOUBLE, repetition);
+ builder = Types.primitive(DOUBLE, repetition);
} else if (type.equals(Schema.Type.BYTES)) {
- return primitive(fieldName, BINARY, repetition);
+ builder = Types.primitive(BINARY, repetition);
} else if (type.equals(Schema.Type.STRING)) {
- return primitive(fieldName, BINARY, repetition, UTF8);
+ builder = Types.primitive(BINARY, repetition).as(UTF8);
} else if (type.equals(Schema.Type.RECORD)) {
return new GroupType(repetition, fieldName, convertFields(schema.getFields()));
} else if (type.equals(Schema.Type.ENUM)) {
- return primitive(fieldName, BINARY, repetition, ENUM);
+ builder = Types.primitive(BINARY, repetition).as(ENUM);
} else if (type.equals(Schema.Type.ARRAY)) {
if (writeOldListStructure) {
return ConversionPatterns.listType(repetition, fieldName,
@@ -146,16 +154,36 @@ public class AvroSchemaConverter {
// avro map key type is always string
return ConversionPatterns.stringKeyMapType(repetition, fieldName, valType);
} else if (type.equals(Schema.Type.FIXED)) {
- return primitive(fieldName, FIXED_LEN_BYTE_ARRAY, repetition,
- schema.getFixedSize(), null);
+ builder = Types.primitive(FIXED_LEN_BYTE_ARRAY, repetition)
+ .length(schema.getFixedSize());
} else if (type.equals(Schema.Type.UNION)) {
return convertUnion(fieldName, schema, repetition);
+ } else {
+ throw new UnsupportedOperationException("Cannot convert Avro type " + type);
}
- throw new UnsupportedOperationException("Cannot convert Avro type " + type);
+
+ // schema translation can only be done for known logical types because this
+ // creates an equivalence
+ LogicalType logicalType = schema.getLogicalType();
+ if (logicalType != null) {
+ if (logicalType instanceof LogicalTypes.Decimal) {
+ builder = builder.as(DECIMAL)
+ .precision(((LogicalTypes.Decimal) logicalType).getPrecision())
+ .scale(((LogicalTypes.Decimal) logicalType).getScale());
+
+ } else {
+ OriginalType annotation = convertLogicalType(logicalType);
+ if (annotation != null) {
+ builder.as(annotation);
+ }
+ }
+ }
+
+ return builder.named(fieldName);
}
private Type convertUnion(String fieldName, Schema schema, Type.Repetition repetition) {
- List<Schema> nonNullSchemas = new ArrayList(schema.getTypes().size());
+ List<Schema> nonNullSchemas = new ArrayList<Schema>(schema.getTypes().size());
for (Schema childSchema : schema.getTypes()) {
if (childSchema.getType().equals(Schema.Type.NULL)) {
if (Type.Repetition.REQUIRED == repetition) {
@@ -175,7 +203,7 @@ public class AvroSchemaConverter {
return convertField(fieldName, nonNullSchemas.get(0), repetition);
default: // complex union type
- List<Type> unionTypes = new ArrayList(nonNullSchemas.size());
+ List<Type> unionTypes = new ArrayList<Type>(nonNullSchemas.size());
int index = 0;
for (Schema childSchema : nonNullSchemas) {
unionTypes.add( convertField("member" + index++, childSchema, Type.Repetition.OPTIONAL));
@@ -188,24 +216,6 @@ public class AvroSchemaConverter {
return convertField(field.name(), field.schema());
}
- private PrimitiveType primitive(String name,
- PrimitiveType.PrimitiveTypeName primitive, Type.Repetition repetition,
- int typeLength, OriginalType originalType) {
- return new PrimitiveType(repetition, primitive, typeLength, name,
- originalType);
- }
-
- private PrimitiveType primitive(String name,
- PrimitiveType.PrimitiveTypeName primitive, Type.Repetition repetition,
- OriginalType originalType) {
- return new PrimitiveType(repetition, primitive, name, originalType);
- }
-
- private PrimitiveType primitive(String name,
- PrimitiveType.PrimitiveTypeName primitive, Type.Repetition repetition) {
- return new PrimitiveType(repetition, primitive, name, null);
- }
-
public Schema convert(MessageType parquetSchema) {
return convertFields(parquetSchema.getName(), parquetSchema.getFields());
}
@@ -217,10 +227,11 @@ public class AvroSchemaConverter {
if (parquetType.isRepetition(REPEATED)) {
throw new UnsupportedOperationException("REPEATED not supported outside LIST or MAP. Type: " + parquetType);
} else if (parquetType.isRepetition(Type.Repetition.OPTIONAL)) {
- fields.add(new Schema.Field(parquetType.getName(), optional(fieldSchema), null,
- NullNode.getInstance()));
+ fields.add(new Schema.Field(
+ parquetType.getName(), optional(fieldSchema), null, NULL_VALUE));
} else { // REQUIRED
- fields.add(new Schema.Field(parquetType.getName(), fieldSchema, null, null));
+ fields.add(new Schema.Field(
+ parquetType.getName(), fieldSchema, null, (Object) null));
}
}
Schema schema = Schema.createRecord(name, null, null, false);
@@ -230,10 +241,11 @@ public class AvroSchemaConverter {
private Schema convertField(final Type parquetType) {
if (parquetType.isPrimitive()) {
+ final PrimitiveType asPrimitive = parquetType.asPrimitiveType();
final PrimitiveTypeName parquetPrimitiveTypeName =
- parquetType.asPrimitiveType().getPrimitiveTypeName();
- final OriginalType originalType = parquetType.getOriginalType();
- return parquetPrimitiveTypeName.convert(
+ asPrimitive.getPrimitiveTypeName();
+ final OriginalType annotation = parquetType.getOriginalType();
+ Schema schema = parquetPrimitiveTypeName.convert(
new PrimitiveType.PrimitiveTypeNameConverter<Schema, RuntimeException>() {
@Override
public Schema convertBOOLEAN(PrimitiveTypeName primitiveTypeName) {
@@ -266,13 +278,24 @@ public class AvroSchemaConverter {
}
@Override
public Schema convertBINARY(PrimitiveTypeName primitiveTypeName) {
- if (originalType == OriginalType.UTF8 || originalType == OriginalType.ENUM) {
+ if (annotation == OriginalType.UTF8 || annotation == OriginalType.ENUM) {
return Schema.create(Schema.Type.STRING);
} else {
return Schema.create(Schema.Type.BYTES);
}
}
});
+
+ LogicalType logicalType = convertOriginalType(
+ annotation, asPrimitive.getDecimalMetadata());
+ if (logicalType != null && (annotation != DECIMAL ||
+ parquetPrimitiveTypeName == BINARY ||
+ parquetPrimitiveTypeName == FIXED_LEN_BYTE_ARRAY)) {
+ schema = logicalType.addToSchema(schema);
+ }
+
+ return schema;
+
} else {
GroupType parquetGroupType = parquetType.asGroupType();
OriginalType originalType = parquetGroupType.getOriginalType();
@@ -335,6 +358,46 @@ public class AvroSchemaConverter {
}
}
+ private OriginalType convertLogicalType(LogicalType logicalType) {
+ if (logicalType == null) {
+ return null;
+ } else if (logicalType instanceof LogicalTypes.Decimal) {
+ return OriginalType.DECIMAL;
+ } else if (logicalType instanceof LogicalTypes.Date) {
+ return OriginalType.DATE;
+ } else if (logicalType instanceof LogicalTypes.TimeMillis) {
+ return OriginalType.TIME_MILLIS;
+ } else if (logicalType instanceof LogicalTypes.TimeMicros) {
+ return OriginalType.TIME_MICROS;
+ } else if (logicalType instanceof LogicalTypes.TimestampMillis) {
+ return OriginalType.TIMESTAMP_MILLIS;
+ } else if (logicalType instanceof LogicalTypes.TimestampMicros) {
+ return OriginalType.TIMESTAMP_MICROS;
+ }
+ return null;
+ }
+
+ private LogicalType convertOriginalType(OriginalType annotation, DecimalMetadata meta) {
+ if (annotation == null) {
+ return null;
+ }
+ switch (annotation) {
+ case DECIMAL:
+ return LogicalTypes.decimal(meta.getPrecision(), meta.getScale());
+ case DATE:
+ return LogicalTypes.date();
+ case TIME_MILLIS:
+ return LogicalTypes.timeMillis();
+ case TIME_MICROS:
+ return LogicalTypes.timeMicros();
+ case TIMESTAMP_MILLIS:
+ return LogicalTypes.timestampMillis();
+ case TIMESTAMP_MICROS:
+ return LogicalTypes.timestampMicros();
+ }
+ return null;
+ }
+
/**
* Implements the rules for interpreting existing data from the logical type
* spec for the LIST annotation. This is used to produce the expected schema.
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b24a1d1/parquet-avro/src/main/java/org/apache/parquet/avro/AvroWriteSupport.java
----------------------------------------------------------------------
diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroWriteSupport.java b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroWriteSupport.java
index c75bb03..7fcd88e 100644
--- a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroWriteSupport.java
+++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroWriteSupport.java
@@ -23,6 +23,8 @@ import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import org.apache.avro.Conversion;
+import org.apache.avro.LogicalType;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericFixed;
@@ -69,6 +71,8 @@ public class AvroWriteSupport<T> extends WriteSupport<T> {
private RecordConsumer recordConsumer;
private MessageType rootSchema;
private Schema rootAvroSchema;
+ private LogicalType rootLogicalType;
+ private Conversion<?> rootConversion;
private GenericData model;
private ListWriter listWriter;
@@ -82,6 +86,7 @@ public class AvroWriteSupport<T> extends WriteSupport<T> {
public AvroWriteSupport(MessageType schema, Schema avroSchema) {
this.rootSchema = schema;
this.rootAvroSchema = avroSchema;
+ this.rootLogicalType = rootAvroSchema.getLogicalType();
this.model = null;
}
@@ -89,6 +94,7 @@ public class AvroWriteSupport<T> extends WriteSupport<T> {
GenericData model) {
this.rootSchema = schema;
this.rootAvroSchema = avroSchema;
+ this.rootLogicalType = rootAvroSchema.getLogicalType();
this.model = model;
}
@@ -136,16 +142,25 @@ public class AvroWriteSupport<T> extends WriteSupport<T> {
// overloaded version for backward compatibility
@SuppressWarnings("unchecked")
public void write(IndexedRecord record) {
- recordConsumer.startMessage();
- writeRecordFields(rootSchema, rootAvroSchema, record);
- recordConsumer.endMessage();
+ write((T) record);
}
@Override
public void write(T record) {
- recordConsumer.startMessage();
- writeRecordFields(rootSchema, rootAvroSchema, record);
- recordConsumer.endMessage();
+ if (rootLogicalType != null) {
+ Conversion<?> conversion = model.getConversionByClass(
+ record.getClass(), rootLogicalType);
+
+ recordConsumer.startMessage();
+ writeRecordFields(rootSchema, rootAvroSchema,
+ convert(rootAvroSchema, rootLogicalType, conversion, record));
+ recordConsumer.endMessage();
+
+ } else {
+ recordConsumer.startMessage();
+ writeRecordFields(rootSchema, rootAvroSchema, record);
+ recordConsumer.endMessage();
+ }
}
private void writeRecord(GroupType schema, Schema avroSchema,
@@ -226,6 +241,8 @@ public class AvroWriteSupport<T> extends WriteSupport<T> {
}
}
+ // TODO: what if the value is null?
+
// Sparsely populated method of encoding unions, each member has its own
// set of columns.
String memberName = "member" + parquetIndex;
@@ -237,44 +254,108 @@ public class AvroWriteSupport<T> extends WriteSupport<T> {
recordConsumer.endGroup();
}
- @SuppressWarnings("unchecked")
+ /**
+ * Calls an appropriate write method based on the value.
+ * Value MUST not be null.
+ *
+ * @param type the Parquet type
+ * @param avroSchema the Avro schema
+ * @param value a non-null value to write
+ */
private void writeValue(Type type, Schema avroSchema, Object value) {
Schema nonNullAvroSchema = AvroSchemaConverter.getNonNull(avroSchema);
- Schema.Type avroType = nonNullAvroSchema.getType();
- if (avroType.equals(Schema.Type.BOOLEAN)) {
- recordConsumer.addBoolean((Boolean) value);
- } else if (avroType.equals(Schema.Type.INT)) {
- if (value instanceof Character) {
- recordConsumer.addInteger((Character) value);
- } else {
- recordConsumer.addInteger(((Number) value).intValue());
- }
- } else if (avroType.equals(Schema.Type.LONG)) {
- recordConsumer.addLong(((Number) value).longValue());
- } else if (avroType.equals(Schema.Type.FLOAT)) {
- recordConsumer.addFloat(((Number) value).floatValue());
- } else if (avroType.equals(Schema.Type.DOUBLE)) {
- recordConsumer.addDouble(((Number) value).doubleValue());
- } else if (avroType.equals(Schema.Type.BYTES)) {
- if (value instanceof byte[]) {
- recordConsumer.addBinary(Binary.fromReusedByteArray((byte[]) value));
- } else {
- recordConsumer.addBinary(Binary.fromReusedByteBuffer((ByteBuffer) value));
- }
- } else if (avroType.equals(Schema.Type.STRING)) {
- recordConsumer.addBinary(fromAvroString(value));
- } else if (avroType.equals(Schema.Type.RECORD)) {
- writeRecord(type.asGroupType(), nonNullAvroSchema, value);
- } else if (avroType.equals(Schema.Type.ENUM)) {
- recordConsumer.addBinary(Binary.fromString(value.toString()));
- } else if (avroType.equals(Schema.Type.ARRAY)) {
- listWriter.writeList(type.asGroupType(), nonNullAvroSchema, value);
- } else if (avroType.equals(Schema.Type.MAP)) {
- writeMap(type.asGroupType(), nonNullAvroSchema, (Map<CharSequence, ?>) value);
- } else if (avroType.equals(Schema.Type.UNION)) {
- writeUnion(type.asGroupType(), nonNullAvroSchema, value);
- } else if (avroType.equals(Schema.Type.FIXED)) {
- recordConsumer.addBinary(Binary.fromReusedByteArray(((GenericFixed) value).bytes()));
+ LogicalType logicalType = nonNullAvroSchema.getLogicalType();
+ if (logicalType != null) {
+ Conversion<?> conversion = model.getConversionByClass(
+ value.getClass(), logicalType);
+ writeValueWithoutConversion(type, nonNullAvroSchema,
+ convert(nonNullAvroSchema, logicalType, conversion, value));
+ } else {
+ writeValueWithoutConversion(type, nonNullAvroSchema, value);
+ }
+ }
+
+ private <D> Object convert(Schema schema, LogicalType logicalType,
+ Conversion<D> conversion, Object datum) {
+ if (conversion == null) {
+ return datum;
+ }
+ Class<D> fromClass = conversion.getConvertedType();
+ switch (schema.getType()) {
+ case RECORD: return conversion.toRecord(fromClass.cast(datum), schema, logicalType);
+ case ENUM: return conversion.toEnumSymbol(fromClass.cast(datum), schema, logicalType);
+ case ARRAY: return conversion.toArray(fromClass.cast(datum), schema, logicalType);
+ case MAP: return conversion.toMap(fromClass.cast(datum), schema, logicalType);
+ case FIXED: return conversion.toFixed(fromClass.cast(datum), schema, logicalType);
+ case STRING: return conversion.toCharSequence(fromClass.cast(datum), schema, logicalType);
+ case BYTES: return conversion.toBytes(fromClass.cast(datum), schema, logicalType);
+ case INT: return conversion.toInt(fromClass.cast(datum), schema, logicalType);
+ case LONG: return conversion.toLong(fromClass.cast(datum), schema, logicalType);
+ case FLOAT: return conversion.toFloat(fromClass.cast(datum), schema, logicalType);
+ case DOUBLE: return conversion.toDouble(fromClass.cast(datum), schema, logicalType);
+ case BOOLEAN: return conversion.toBoolean(fromClass.cast(datum), schema, logicalType);
+ }
+ return datum;
+ }
+
+ /**
+ * Calls an appropriate write method based on the value.
+ * Value must not be null and the schema must not be nullable.
+ *
+ * @param type a Parquet type
+ * @param avroSchema a non-nullable Avro schema
+ * @param value a non-null value to write
+ */
+ @SuppressWarnings("unchecked")
+ private void writeValueWithoutConversion(Type type, Schema avroSchema, Object value) {
+ switch (avroSchema.getType()) {
+ case BOOLEAN:
+ recordConsumer.addBoolean((Boolean) value);
+ break;
+ case INT:
+ if (value instanceof Character) {
+ recordConsumer.addInteger((Character) value);
+ } else {
+ recordConsumer.addInteger(((Number) value).intValue());
+ }
+ break;
+ case LONG:
+ recordConsumer.addLong(((Number) value).longValue());
+ break;
+ case FLOAT:
+ recordConsumer.addFloat(((Number) value).floatValue());
+ break;
+ case DOUBLE:
+ recordConsumer.addDouble(((Number) value).doubleValue());
+ break;
+ case FIXED:
+ recordConsumer.addBinary(Binary.fromReusedByteArray(((GenericFixed) value).bytes()));
+ break;
+ case BYTES:
+ if (value instanceof byte[]) {
+ recordConsumer.addBinary(Binary.fromReusedByteArray((byte[]) value));
+ } else {
+ recordConsumer.addBinary(Binary.fromReusedByteBuffer((ByteBuffer) value));
+ }
+ break;
+ case STRING:
+ recordConsumer.addBinary(fromAvroString(value));
+ break;
+ case RECORD:
+ writeRecord(type.asGroupType(), avroSchema, value);
+ break;
+ case ENUM:
+ recordConsumer.addBinary(Binary.fromString(value.toString()));
+ break;
+ case ARRAY:
+ listWriter.writeList(type.asGroupType(), avroSchema, value);
+ break;
+ case MAP:
+ writeMap(type.asGroupType(), avroSchema, (Map<CharSequence, ?>) value);
+ break;
+ case UNION:
+ writeUnion(type.asGroupType(), avroSchema, value);
+ break;
}
}
@@ -283,7 +364,7 @@ public class AvroWriteSupport<T> extends WriteSupport<T> {
Utf8 utf8 = (Utf8) value;
return Binary.fromReusedByteArray(utf8.getBytes(), 0, utf8.getByteLength());
}
- return Binary.fromString(value.toString());
+ return Binary.fromString((CharSequence) value);
}
private static GenericData getDataModel(Configuration conf) {
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b24a1d1/parquet-avro/src/main/java/org/apache/parquet/avro/ParentValueContainer.java
----------------------------------------------------------------------
diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/ParentValueContainer.java b/parquet-avro/src/main/java/org/apache/parquet/avro/ParentValueContainer.java
index 67b710d..f36f5fc 100644
--- a/parquet-avro/src/main/java/org/apache/parquet/avro/ParentValueContainer.java
+++ b/parquet-avro/src/main/java/org/apache/parquet/avro/ParentValueContainer.java
@@ -18,6 +18,16 @@
*/
package org.apache.parquet.avro;
+import org.apache.avro.Conversion;
+import org.apache.avro.LogicalType;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericEnumSymbol;
+import org.apache.avro.generic.IndexedRecord;
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Map;
+
abstract class ParentValueContainer {
/**
@@ -60,4 +70,169 @@ abstract class ParentValueContainer {
add(value);
}
+ static class LogicalTypePrimitiveContainer extends ParentValueContainer {
+ private final ParentValueContainer wrapped;
+ private final Schema schema;
+ private final LogicalType logicalType;
+ private final Conversion conversion;
+
+ public LogicalTypePrimitiveContainer(ParentValueContainer wrapped,
+ Schema schema, Conversion conversion) {
+ this.wrapped = wrapped;
+ this.schema = schema;
+ this.logicalType = schema.getLogicalType();
+ this.conversion = conversion;
+ }
+
+ @Override
+ public void addDouble(double value) {
+ wrapped.add(conversion.fromDouble(value, schema, logicalType));
+ }
+
+ @Override
+ public void addFloat(float value) {
+ wrapped.add(conversion.fromFloat(value, schema, logicalType));
+ }
+
+ @Override
+ public void addLong(long value) {
+ wrapped.add(conversion.fromLong(value, schema, logicalType));
+ }
+
+ @Override
+ public void addInt(int value) {
+ wrapped.add(conversion.fromInt(value, schema, logicalType));
+ }
+
+ @Override
+ public void addShort(short value) {
+ wrapped.add(conversion.fromInt((int) value, schema, logicalType));
+ }
+
+ @Override
+ public void addChar(char value) {
+ wrapped.add(conversion.fromInt((int) value, schema, logicalType));
+ }
+
+ @Override
+ public void addByte(byte value) {
+ wrapped.add(conversion.fromInt((int) value, schema, logicalType));
+ }
+
+ @Override
+ public void addBoolean(boolean value) {
+ wrapped.add(conversion.fromBoolean(value, schema, logicalType));
+ }
+ }
+
+ static ParentValueContainer getConversionContainer(
+ final ParentValueContainer parent, final Conversion<?> conversion,
+ final Schema schema) {
+ if (conversion == null) {
+ return parent;
+ }
+
+ final LogicalType logicalType = schema.getLogicalType();
+
+ switch (schema.getType()) {
+ case STRING:
+ return new ParentValueContainer() {
+ @Override
+ public void add(Object value) {
+ parent.add(conversion.fromCharSequence(
+ (CharSequence) value, schema, logicalType));
+ }
+ };
+ case BOOLEAN:
+ return new LogicalTypePrimitiveContainer(parent, schema, conversion) {
+ @Override
+ public void add(Object value) {
+ parent.add(conversion.fromBoolean(
+ (Boolean) value, schema, logicalType));
+ }
+ };
+ case INT:
+ return new LogicalTypePrimitiveContainer(parent, schema, conversion) {
+ @Override
+ public void add(Object value) {
+ parent.add(conversion.fromInt(
+ (Integer) value, schema, logicalType));
+ }
+ };
+ case LONG:
+ return new LogicalTypePrimitiveContainer(parent, schema, conversion) {
+ @Override
+ public void add(Object value) {
+ parent.add(conversion.fromLong(
+ (Long) value, schema, logicalType));
+ }
+ };
+ case FLOAT:
+ return new LogicalTypePrimitiveContainer(parent, schema, conversion) {
+ @Override
+ public void add(Object value) {
+ parent.add(conversion.fromFloat(
+ (Float) value, schema, logicalType));
+ }
+ };
+ case DOUBLE:
+ return new LogicalTypePrimitiveContainer(parent, schema, conversion) {
+ @Override
+ public void add(Object value) {
+ parent.add(conversion.fromDouble(
+ (Double) value, schema, logicalType));
+ }
+ };
+ case BYTES:
+ return new ParentValueContainer() {
+ @Override
+ public void add(Object value) {
+ parent.add(conversion.fromBytes(
+ (ByteBuffer) value, schema, logicalType));
+ }
+ };
+ case FIXED:
+ return new ParentValueContainer() {
+ @Override
+ public void add(Object value) {
+ parent.add(conversion.fromFixed(
+ (GenericData.Fixed) value, schema, logicalType));
+ }
+ };
+ case RECORD:
+ return new ParentValueContainer() {
+ @Override
+ public void add(Object value) {
+ parent.add(conversion.fromRecord(
+ (IndexedRecord) value, schema, logicalType));
+ }
+ };
+ case ARRAY:
+ return new ParentValueContainer() {
+ @Override
+ public void add(Object value) {
+ parent.add(conversion.fromArray(
+ (Collection<?>) value, schema, logicalType));
+ }
+ };
+ case MAP:
+ return new ParentValueContainer() {
+ @Override
+ public void add(Object value) {
+ parent.add(conversion.fromMap(
+ (Map<?, ?>) value, schema, logicalType));
+ }
+ };
+ case ENUM:
+ return new ParentValueContainer() {
+ @Override
+ public void add(Object value) {
+ parent.add(conversion.fromEnumSymbol(
+ (GenericEnumSymbol) value, schema, logicalType));
+ }
+ };
+ default:
+ return new LogicalTypePrimitiveContainer(parent, schema, conversion);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b24a1d1/parquet-avro/src/main/resources/META-INF/LICENSE
----------------------------------------------------------------------
diff --git a/parquet-avro/src/main/resources/META-INF/LICENSE b/parquet-avro/src/main/resources/META-INF/LICENSE
new file mode 100644
index 0000000..20b23c9
--- /dev/null
+++ b/parquet-avro/src/main/resources/META-INF/LICENSE
@@ -0,0 +1,186 @@
+
+ Apache License
+ Version 2.0, January 2004
+ http://www.apache.org/licenses/
+
+ TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+ 1. Definitions.
+
+ "License" shall mean the terms and conditions for use, reproduction,
+ and distribution as defined by Sections 1 through 9 of this document.
+
+ "Licensor" shall mean the copyright owner or entity authorized by
+ the copyright owner that is granting the License.
+
+ "Legal Entity" shall mean the union of the acting entity and all
+ other entities that control, are controlled by, or are under common
+ control with that entity. For the purposes of this definition,
+ "control" means (i) the power, direct or indirect, to cause the
+ direction or management of such entity, whether by contract or
+ otherwise, or (ii) ownership of fifty percent (50%) or more of the
+ outstanding shares, or (iii) beneficial ownership of such entity.
+
+ "You" (or "Your") shall mean an individual or Legal Entity
+ exercising permissions granted by this License.
+
+ "Source" form shall mean the preferred form for making modifications,
+ including but not limited to software source code, documentation
+ source, and configuration files.
+
+ "Object" form shall mean any form resulting from mechanical
+ transformation or translation of a Source form, including but
+ not limited to compiled object code, generated documentation,
+ and conversions to other media types.
+
+ "Work" shall mean the work of authorship, whether in Source or
+ Object form, made available under the License, as indicated by a
+ copyright notice that is included in or attached to the work
+ (an example is provided in the Appendix below).
+
+ "Derivative Works" shall mean any work, whether in Source or Object
+ form, that is based on (or derived from) the Work and for which the
+ editorial revisions, annotations, elaborations, or other modifications
+ represent, as a whole, an original work of authorship. For the purposes
+ of this License, Derivative Works shall not include works that remain
+ separable from, or merely link (or bind by name) to the interfaces of,
+ the Work and Derivative Works thereof.
+
+ "Contribution" shall mean any work of authorship, including
+ the original version of the Work and any modifications or additions
+ to that Work or Derivative Works thereof, that is intentionally
+ submitted to Licensor for inclusion in the Work by the copyright owner
+ or by an individual or Legal Entity authorized to submit on behalf of
+ the copyright owner. For the purposes of this definition, "submitted"
+ means any form of electronic, verbal, or written communication sent
+ to the Licensor or its representatives, including but not limited to
+ communication on electronic mailing lists, source code control systems,
+ and issue tracking systems that are managed by, or on behalf of, the
+ Licensor for the purpose of discussing and improving the Work, but
+ excluding communication that is conspicuously marked or otherwise
+ designated in writing by the copyright owner as "Not a Contribution."
+
+ "Contributor" shall mean Licensor and any individual or Legal Entity
+ on behalf of whom a Contribution has been received by Licensor and
+ subsequently incorporated within the Work.
+
+ 2. Grant of Copyright License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ copyright license to reproduce, prepare Derivative Works of,
+ publicly display, publicly perform, sublicense, and distribute the
+ Work and such Derivative Works in Source or Object form.
+
+ 3. Grant of Patent License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ (except as stated in this section) patent license to make, have made,
+ use, offer to sell, sell, import, and otherwise transfer the Work,
+ where such license applies only to those patent claims licensable
+ by such Contributor that are necessarily infringed by their
+ Contribution(s) alone or by combination of their Contribution(s)
+ with the Work to which such Contribution(s) was submitted. If You
+ institute patent litigation against any entity (including a
+ cross-claim or counterclaim in a lawsuit) alleging that the Work
+ or a Contribution incorporated within the Work constitutes direct
+ or contributory patent infringement, then any patent licenses
+ granted to You under this License for that Work shall terminate
+ as of the date such litigation is filed.
+
+ 4. Redistribution. You may reproduce and distribute copies of the
+ Work or Derivative Works thereof in any medium, with or without
+ modifications, and in Source or Object form, provided that You
+ meet the following conditions:
+
+ (a) You must give any other recipients of the Work or
+ Derivative Works a copy of this License; and
+
+ (b) You must cause any modified files to carry prominent notices
+ stating that You changed the files; and
+
+ (c) You must retain, in the Source form of any Derivative Works
+ that You distribute, all copyright, patent, trademark, and
+ attribution notices from the Source form of the Work,
+ excluding those notices that do not pertain to any part of
+ the Derivative Works; and
+
+ (d) If the Work includes a "NOTICE" text file as part of its
+ distribution, then any Derivative Works that You distribute must
+ include a readable copy of the attribution notices contained
+ within such NOTICE file, excluding those notices that do not
+ pertain to any part of the Derivative Works, in at least one
+ of the following places: within a NOTICE text file distributed
+ as part of the Derivative Works; within the Source form or
+ documentation, if provided along with the Derivative Works; or,
+ within a display generated by the Derivative Works, if and
+ wherever such third-party notices normally appear. The contents
+ of the NOTICE file are for informational purposes only and
+ do not modify the License. You may add Your own attribution
+ notices within Derivative Works that You distribute, alongside
+ or as an addendum to the NOTICE text from the Work, provided
+ that such additional attribution notices cannot be construed
+ as modifying the License.
+
+ You may add Your own copyright statement to Your modifications and
+ may provide additional or different license terms and conditions
+ for use, reproduction, or distribution of Your modifications, or
+ for any such Derivative Works as a whole, provided Your use,
+ reproduction, and distribution of the Work otherwise complies with
+ the conditions stated in this License.
+
+ 5. Submission of Contributions. Unless You explicitly state otherwise,
+ any Contribution intentionally submitted for inclusion in the Work
+ by You to the Licensor shall be under the terms and conditions of
+ this License, without any additional terms or conditions.
+ Notwithstanding the above, nothing herein shall supersede or modify
+ the terms of any separate license agreement you may have executed
+ with Licensor regarding such Contributions.
+
+ 6. Trademarks. This License does not grant permission to use the trade
+ names, trademarks, service marks, or product names of the Licensor,
+ except as required for reasonable and customary use in describing the
+ origin of the Work and reproducing the content of the NOTICE file.
+
+ 7. Disclaimer of Warranty. Unless required by applicable law or
+ agreed to in writing, Licensor provides the Work (and each
+ Contributor provides its Contributions) on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ implied, including, without limitation, any warranties or conditions
+ of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+ PARTICULAR PURPOSE. You are solely responsible for determining the
+ appropriateness of using or redistributing the Work and assume any
+ risks associated with Your exercise of permissions under this License.
+
+ 8. Limitation of Liability. In no event and under no legal theory,
+ whether in tort (including negligence), contract, or otherwise,
+ unless required by applicable law (such as deliberate and grossly
+ negligent acts) or agreed to in writing, shall any Contributor be
+ liable to You for damages, including any direct, indirect, special,
+ incidental, or consequential damages of any character arising as a
+ result of this License or out of the use or inability to use the
+ Work (including but not limited to damages for loss of goodwill,
+ work stoppage, computer failure or malfunction, or any and all
+ other commercial damages or losses), even if such Contributor
+ has been advised of the possibility of such damages.
+
+ 9. Accepting Warranty or Additional Liability. While redistributing
+ the Work or Derivative Works thereof, You may choose to offer,
+ and charge a fee for, acceptance of support, warranty, indemnity,
+ or other liability obligations and/or rights consistent with this
+ License. However, in accepting such obligations, You may act only
+ on Your own behalf and on Your sole responsibility, not on behalf
+ of any other Contributor, and only if You agree to indemnify,
+ defend, and hold each Contributor harmless for any liability
+ incurred by, or claims asserted against, such Contributor by reason
+ of your accepting any such warranty or additional liability.
+
+ END OF TERMS AND CONDITIONS
+
+--------------------------------------------------------------------------------
+
+This product includes code from Apache Avro.
+
+Copyright: 2014 The Apache Software Foundation.
+Home page: https://avro.apache.org/
+License: http://www.apache.org/licenses/LICENSE-2.0
+
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b24a1d1/parquet-avro/src/main/resources/META-INF/NOTICE
----------------------------------------------------------------------
diff --git a/parquet-avro/src/main/resources/META-INF/NOTICE b/parquet-avro/src/main/resources/META-INF/NOTICE
new file mode 100644
index 0000000..7b5682c
--- /dev/null
+++ b/parquet-avro/src/main/resources/META-INF/NOTICE
@@ -0,0 +1,18 @@
+
+Apache Parquet MR (Incubating)
+Copyright 2014-2015 The Apache Software Foundation
+
+This product includes software developed at
+The Apache Software Foundation (http://www.apache.org/).
+
+--------------------------------------------------------------------------------
+
+This product includes code from Apache Avro, which includes the following in
+its NOTICE file:
+
+ Apache Avro
+ Copyright 2010-2015 The Apache Software Foundation
+
+ This product includes software developed at
+ The Apache Software Foundation (http://www.apache.org/).
+
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b24a1d1/parquet-avro/src/test/java/org/apache/parquet/avro/AvroTestUtil.java
----------------------------------------------------------------------
diff --git a/parquet-avro/src/test/java/org/apache/parquet/avro/AvroTestUtil.java b/parquet-avro/src/test/java/org/apache/parquet/avro/AvroTestUtil.java
index d5fe11a..f4682d6 100644
--- a/parquet-avro/src/test/java/org/apache/parquet/avro/AvroTestUtil.java
+++ b/parquet-avro/src/test/java/org/apache/parquet/avro/AvroTestUtil.java
@@ -19,11 +19,21 @@
package org.apache.parquet.avro;
import com.google.common.collect.Lists;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
import java.util.Arrays;
+import java.util.List;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.hadoop.ParquetReader;
+import org.apache.parquet.hadoop.ParquetWriter;
import org.codehaus.jackson.node.NullNode;
+import org.junit.Assert;
+import org.junit.rules.TemporaryFolder;
public class AvroTestUtil {
@@ -66,4 +76,47 @@ public class AvroTestUtil {
return record;
}
+ public static <D> List<D> read(GenericData model, Schema schema, File file) throws IOException {
+ List<D> data = new ArrayList<D>();
+ Configuration conf = new Configuration(false);
+ AvroReadSupport.setRequestedProjection(conf, schema);
+ AvroReadSupport.setAvroReadSchema(conf, schema);
+ ParquetReader<D> fileReader = AvroParquetReader
+ .<D>builder(new Path(file.toString()))
+ .withDataModel(model) // reflect disables compatibility
+ .withConf(conf)
+ .build();
+
+ try {
+ D datum;
+ while ((datum = fileReader.read()) != null) {
+ data.add(datum);
+ }
+ } finally {
+ fileReader.close();
+ }
+
+ return data;
+ }
+
+ @SuppressWarnings("unchecked")
+ public static <D> File write(TemporaryFolder temp, GenericData model, Schema schema, D... data) throws IOException {
+ File file = temp.newFile();
+ Assert.assertTrue(file.delete());
+ ParquetWriter<D> writer = AvroParquetWriter
+ .<D>builder(new Path(file.toString()))
+ .withDataModel(model)
+ .withSchema(schema)
+ .build();
+
+ try {
+ for (D datum : data) {
+ writer.write(datum);
+ }
+ } finally {
+ writer.close();
+ }
+
+ return file;
+ }
}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b24a1d1/parquet-avro/src/test/java/org/apache/parquet/avro/TestAvroSchemaConverter.java
----------------------------------------------------------------------
diff --git a/parquet-avro/src/test/java/org/apache/parquet/avro/TestAvroSchemaConverter.java b/parquet-avro/src/test/java/org/apache/parquet/avro/TestAvroSchemaConverter.java
index b393615..942e3b1 100644
--- a/parquet-avro/src/test/java/org/apache/parquet/avro/TestAvroSchemaConverter.java
+++ b/parquet-avro/src/test/java/org/apache/parquet/avro/TestAvroSchemaConverter.java
@@ -20,16 +20,37 @@ package org.apache.parquet.avro;
import com.google.common.collect.Lists;
import com.google.common.io.Resources;
-import java.util.Arrays;
-import java.util.Collections;
+import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;
import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.MessageTypeParser;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
+import org.apache.parquet.schema.Types;
import org.codehaus.jackson.node.NullNode;
+import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
-import org.apache.parquet.schema.MessageType;
-import org.apache.parquet.schema.MessageTypeParser;
+import java.util.Arrays;
+import java.util.Collections;
+import static org.apache.avro.Schema.Type.INT;
+import static org.apache.avro.Schema.Type.LONG;
+import static org.apache.parquet.schema.OriginalType.DATE;
+import static org.apache.parquet.schema.OriginalType.TIMESTAMP_MICROS;
+import static org.apache.parquet.schema.OriginalType.TIMESTAMP_MILLIS;
+import static org.apache.parquet.schema.OriginalType.TIME_MICROS;
+import static org.apache.parquet.schema.OriginalType.TIME_MILLIS;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BOOLEAN;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.DOUBLE;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FLOAT;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT96;
+import static org.apache.parquet.schema.Type.Repetition.REQUIRED;
import static org.junit.Assert.assertEquals;
public class TestAvroSchemaConverter {
@@ -131,7 +152,7 @@ public class TestAvroSchemaConverter {
@Test(expected = IllegalArgumentException.class)
public void testTopLevelMustBeARecord() {
- new AvroSchemaConverter().convert(Schema.create(Schema.Type.INT));
+ new AvroSchemaConverter().convert(Schema.create(INT));
}
@Test
@@ -270,7 +291,7 @@ public class TestAvroSchemaConverter {
@Test
public void testOptionalFields() throws Exception {
Schema schema = Schema.createRecord("record1", null, null, false);
- Schema optionalInt = optional(Schema.create(Schema.Type.INT));
+ Schema optionalInt = optional(Schema.create(INT));
schema.setFields(Arrays.asList(
new Schema.Field("myint", optionalInt, null, NullNode.getInstance())
));
@@ -284,7 +305,7 @@ public class TestAvroSchemaConverter {
@Test
public void testOptionalMapValue() throws Exception {
Schema schema = Schema.createRecord("record1", null, null, false);
- Schema optionalIntMap = Schema.createMap(optional(Schema.create(Schema.Type.INT)));
+ Schema optionalIntMap = Schema.createMap(optional(Schema.create(INT)));
schema.setFields(Arrays.asList(
new Schema.Field("myintmap", optionalIntMap, null, null)
));
@@ -303,7 +324,7 @@ public class TestAvroSchemaConverter {
@Test
public void testOptionalArrayElement() throws Exception {
Schema schema = Schema.createRecord("record1", null, null, false);
- Schema optionalIntArray = Schema.createArray(optional(Schema.create(Schema.Type.INT)));
+ Schema optionalIntArray = Schema.createArray(optional(Schema.create(INT)));
schema.setFields(Arrays.asList(
new Schema.Field("myintarray", optionalIntArray, null, null)
));
@@ -323,7 +344,7 @@ public class TestAvroSchemaConverter {
Schema schema = Schema.createRecord("record2", null, null, false);
Schema multipleTypes = Schema.createUnion(Arrays.asList(Schema.create(Schema.Type
.NULL),
- Schema.create(Schema.Type.INT),
+ Schema.create(INT),
Schema.create(Schema.Type.FLOAT)));
schema.setFields(Arrays.asList(
new Schema.Field("myunion", multipleTypes, null, NullNode.getInstance())));
@@ -396,7 +417,7 @@ public class TestAvroSchemaConverter {
@Test
public void testOldAvroListOfLists() throws Exception {
Schema listOfLists = optional(Schema.createArray(Schema.createArray(
- Schema.create(Schema.Type.INT))));
+ Schema.create(INT))));
Schema schema = Schema.createRecord("AvroCompatListInList", null, null, false);
schema.setFields(Lists.newArrayList(
new Schema.Field("listOfLists", listOfLists, null, NullNode.getInstance())
@@ -425,7 +446,7 @@ public class TestAvroSchemaConverter {
@Test
public void testOldThriftListOfLists() throws Exception {
Schema listOfLists = optional(Schema.createArray(Schema.createArray(
- Schema.create(Schema.Type.INT))));
+ Schema.create(INT))));
Schema schema = Schema.createRecord("ThriftCompatListInList", null, null, false);
schema.setFields(Lists.newArrayList(
new Schema.Field("listOfLists", listOfLists, null, NullNode.getInstance())
@@ -458,7 +479,7 @@ public class TestAvroSchemaConverter {
// group's name, but it must be 2-level because the repeated group doesn't
// contain an optional or repeated element as required for 3-level lists
Schema listOfLists = optional(Schema.createArray(Schema.createArray(
- Schema.create(Schema.Type.INT))));
+ Schema.create(INT))));
Schema schema = Schema.createRecord("UnknownTwoLevelListInList", null, null, false);
schema.setFields(Lists.newArrayList(
new Schema.Field("listOfLists", listOfLists, null, NullNode.getInstance())
@@ -488,7 +509,7 @@ public class TestAvroSchemaConverter {
@Test
public void testParquetMapWithoutMapKeyValueAnnotation() throws Exception {
Schema schema = Schema.createRecord("myrecord", null, null, false);
- Schema map = Schema.createMap(Schema.create(Schema.Type.INT));
+ Schema map = Schema.createMap(Schema.create(INT));
schema.setFields(Collections.singletonList(new Schema.Field("mymap", map, null, null)));
String parquetSchema =
"message myrecord {\n" +
@@ -504,9 +525,240 @@ public class TestAvroSchemaConverter {
testParquetToAvroConversion(NEW_BEHAVIOR, schema, parquetSchema);
}
+ @Test
+ public void testDecimalBytesType() throws Exception {
+ Schema schema = Schema.createRecord("myrecord", null, null, false);
+ Schema decimal = LogicalTypes.decimal(9, 2).addToSchema(
+ Schema.create(Schema.Type.BYTES));
+ schema.setFields(Collections.singletonList(
+ new Schema.Field("dec", decimal, null, null)));
+
+ testRoundTripConversion(schema,
+ "message myrecord {\n" +
+ " required binary dec (DECIMAL(9,2));\n" +
+ "}\n");
+ }
+
+ @Test
+ public void testDecimalFixedType() throws Exception {
+ Schema schema = Schema.createRecord("myrecord", null, null, false);
+ Schema decimal = LogicalTypes.decimal(9, 2).addToSchema(
+ Schema.createFixed("dec", null, null, 8));
+ schema.setFields(Collections.singletonList(
+ new Schema.Field("dec", decimal, null, null)));
+
+ testRoundTripConversion(schema,
+ "message myrecord {\n" +
+ " required fixed_len_byte_array(8) dec (DECIMAL(9,2));\n" +
+ "}\n");
+ }
+
+ @Test
+ public void testDecimalIntegerType() throws Exception {
+ Schema expected = Schema.createRecord("myrecord", null, null, false,
+ Arrays.asList(new Schema.Field(
+ "dec", Schema.create(INT), null, null)));
+
+ // the decimal portion is lost because it isn't valid in Avro
+ testParquetToAvroConversion(expected,
+ "message myrecord {\n" +
+ " required int32 dec (DECIMAL(9,2));\n" +
+ "}\n");
+ }
+
+ @Test
+ public void testDecimalLongType() throws Exception {
+ Schema expected = Schema.createRecord("myrecord", null, null, false,
+ Arrays.asList(new Schema.Field("dec", Schema.create(LONG), null, null)));
+
+ // the decimal portion is lost because it isn't valid in Avro
+ testParquetToAvroConversion(expected,
+ "message myrecord {\n" +
+ " required int64 dec (DECIMAL(9,2));\n" +
+ "}\n");
+ }
+
+ @Test
+ public void testDateType() throws Exception {
+ Schema date = LogicalTypes.date().addToSchema(Schema.create(INT));
+ Schema expected = Schema.createRecord("myrecord", null, null, false,
+ Arrays.asList(new Schema.Field("date", date, null, null)));
+
+ testRoundTripConversion(expected,
+ "message myrecord {\n" +
+ " required int32 date (DATE);\n" +
+ "}\n");
+
+ for (PrimitiveTypeName primitive : new PrimitiveTypeName[]
+ {INT64, INT96, FLOAT, DOUBLE, BOOLEAN, BINARY, FIXED_LEN_BYTE_ARRAY}) {
+ final PrimitiveType type;
+ if (primitive == FIXED_LEN_BYTE_ARRAY) {
+ type = new PrimitiveType(REQUIRED, primitive, 12, "test", DATE);
+ } else {
+ type = new PrimitiveType(REQUIRED, primitive, "test", DATE);
+ }
+
+ assertThrows("Should not allow TIME_MICROS with " + primitive,
+ IllegalArgumentException.class, new Runnable() {
+ @Override
+ public void run() {
+ new AvroSchemaConverter().convert(message(type));
+ }
+ });
+ }
+ }
+
+ @Test
+ public void testTimeMillisType() throws Exception {
+ Schema date = LogicalTypes.timeMillis().addToSchema(Schema.create(INT));
+ Schema expected = Schema.createRecord("myrecord", null, null, false,
+ Arrays.asList(new Schema.Field("time", date, null, null)));
+
+ testRoundTripConversion(expected,
+ "message myrecord {\n" +
+ " required int32 time (TIME_MILLIS);\n" +
+ "}\n");
+
+ for (PrimitiveTypeName primitive : new PrimitiveTypeName[]
+ {INT64, INT96, FLOAT, DOUBLE, BOOLEAN, BINARY, FIXED_LEN_BYTE_ARRAY}) {
+ final PrimitiveType type;
+ if (primitive == FIXED_LEN_BYTE_ARRAY) {
+ type = new PrimitiveType(REQUIRED, primitive, 12, "test", TIME_MILLIS);
+ } else {
+ type = new PrimitiveType(REQUIRED, primitive, "test", TIME_MILLIS);
+ }
+
+ assertThrows("Should not allow TIME_MICROS with " + primitive,
+ IllegalArgumentException.class, new Runnable() {
+ @Override
+ public void run() {
+ new AvroSchemaConverter().convert(message(type));
+ }
+ });
+ }
+ }
+
+ @Test
+ public void testTimeMicrosType() throws Exception {
+ Schema date = LogicalTypes.timeMicros().addToSchema(Schema.create(LONG));
+ Schema expected = Schema.createRecord("myrecord", null, null, false,
+ Arrays.asList(new Schema.Field("time", date, null, null)));
+
+ testRoundTripConversion(expected,
+ "message myrecord {\n" +
+ " required int64 time (TIME_MICROS);\n" +
+ "}\n");
+
+ for (PrimitiveTypeName primitive : new PrimitiveTypeName[]
+ {INT32, INT96, FLOAT, DOUBLE, BOOLEAN, BINARY, FIXED_LEN_BYTE_ARRAY}) {
+ final PrimitiveType type;
+ if (primitive == FIXED_LEN_BYTE_ARRAY) {
+ type = new PrimitiveType(REQUIRED, primitive, 12, "test", TIME_MICROS);
+ } else {
+ type = new PrimitiveType(REQUIRED, primitive, "test", TIME_MICROS);
+ }
+
+ assertThrows("Should not allow TIME_MICROS with " + primitive,
+ IllegalArgumentException.class, new Runnable() {
+ @Override
+ public void run() {
+ new AvroSchemaConverter().convert(message(type));
+ }
+ });
+ }
+ }
+
+ @Test
+ public void testTimestampMillisType() throws Exception {
+ Schema date = LogicalTypes.timestampMillis().addToSchema(Schema.create(LONG));
+ Schema expected = Schema.createRecord("myrecord", null, null, false,
+ Arrays.asList(new Schema.Field("timestamp", date, null, null)));
+
+ testRoundTripConversion(expected,
+ "message myrecord {\n" +
+ " required int64 timestamp (TIMESTAMP_MILLIS);\n" +
+ "}\n");
+
+ for (PrimitiveTypeName primitive : new PrimitiveTypeName[]
+ {INT32, INT96, FLOAT, DOUBLE, BOOLEAN, BINARY, FIXED_LEN_BYTE_ARRAY}) {
+ final PrimitiveType type;
+ if (primitive == FIXED_LEN_BYTE_ARRAY) {
+ type = new PrimitiveType(REQUIRED, primitive, 12, "test", TIMESTAMP_MILLIS);
+ } else {
+ type = new PrimitiveType(REQUIRED, primitive, "test", TIMESTAMP_MILLIS);
+ }
+
+ assertThrows("Should not allow TIMESTAMP_MILLIS with " + primitive,
+ IllegalArgumentException.class, new Runnable() {
+ @Override
+ public void run() {
+ new AvroSchemaConverter().convert(message(type));
+ }
+ });
+ }
+ }
+
+ @Test
+ public void testTimestampMicrosType() throws Exception {
+ Schema date = LogicalTypes.timestampMicros().addToSchema(Schema.create(LONG));
+ Schema expected = Schema.createRecord("myrecord", null, null, false,
+ Arrays.asList(new Schema.Field("timestamp", date, null, null)));
+
+ testRoundTripConversion(expected,
+ "message myrecord {\n" +
+ " required int64 timestamp (TIMESTAMP_MICROS);\n" +
+ "}\n");
+
+ for (PrimitiveTypeName primitive : new PrimitiveTypeName[]
+ {INT32, INT96, FLOAT, DOUBLE, BOOLEAN, BINARY, FIXED_LEN_BYTE_ARRAY}) {
+ final PrimitiveType type;
+ if (primitive == FIXED_LEN_BYTE_ARRAY) {
+ type = new PrimitiveType(REQUIRED, primitive, 12, "test", TIMESTAMP_MICROS);
+ } else {
+ type = new PrimitiveType(REQUIRED, primitive, "test", TIMESTAMP_MICROS);
+ }
+
+ assertThrows("Should not allow TIMESTAMP_MICROS with " + primitive,
+ IllegalArgumentException.class, new Runnable() {
+ @Override
+ public void run() {
+ new AvroSchemaConverter().convert(message(type));
+ }
+ });
+ }
+ }
+
public static Schema optional(Schema original) {
return Schema.createUnion(Lists.newArrayList(
Schema.create(Schema.Type.NULL),
original));
}
+
+ public static MessageType message(PrimitiveType primitive) {
+ return Types.buildMessage()
+ .addField(primitive)
+ .named("myrecord");
+ }
+
+ /**
+ * A convenience method to avoid a large number of @Test(expected=...) tests
+ * @param message A String message to describe this assertion
+ * @param expected An Exception class that the Runnable should throw
+ * @param runnable A Runnable that is expected to throw the exception
+ */
+ public static void assertThrows(
+ String message, Class<? extends Exception> expected, Runnable runnable) {
+ try {
+ runnable.run();
+ Assert.fail("No exception was thrown (" + message + "), expected: " +
+ expected.getName());
+ } catch (Exception actual) {
+ try {
+ Assert.assertEquals(message, expected, actual.getClass());
+ } catch (AssertionError e) {
+ e.addSuppressed(actual);
+ throw e;
+ }
+ }
+ }
}