You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by bl...@apache.org on 2016/04/20 17:41:30 UTC
[2/2] parquet-mr git commit: PARQUET-358: Add support for Avro's
logical types API.
PARQUET-358: Add support for Avro's logical types API.
This adds support for Avro's logical types API to parquet-avro.
* The logical types API was introduced in Avro 1.8.0, so this bumps the Avro dependency version to 1.8.0.
* Types supported are: decimal, date, time-millis, time-micros, timestamp-millis, and timestamp-micros
* Tests have been copied from Avro and ported to the parquet-avro API
Author: Ryan Blue <bl...@apache.org>
Closes #318 from rdblue/PARQUET-358-add-avro-logical-types-api and squashes the following commits:
bd81f9c [Ryan Blue] PARQUET-358: Fix review items.
0a882ee [Ryan Blue] PARQUET-358: Add logical types circular reference test.
5124618 [Ryan Blue] PARQUET-358: Add license documentation for code from Avro.
dcb14be [Ryan Blue] PARQUET-358: Add support for Avro's logical types API.
Project: http://git-wip-us.apache.org/repos/asf/parquet-mr/repo
Commit: http://git-wip-us.apache.org/repos/asf/parquet-mr/commit/6b24a1d1
Tree: http://git-wip-us.apache.org/repos/asf/parquet-mr/tree/6b24a1d1
Diff: http://git-wip-us.apache.org/repos/asf/parquet-mr/diff/6b24a1d1
Branch: refs/heads/master
Commit: 6b24a1d1b5e2792a7821ad172a45e38d2b04f9b8
Parents: 82b8ecc
Author: Ryan Blue <bl...@apache.org>
Authored: Wed Apr 20 08:41:22 2016 -0700
Committer: Ryan Blue <bl...@apache.org>
Committed: Wed Apr 20 08:41:22 2016 -0700
----------------------------------------------------------------------
LICENSE | 8 +
NOTICE | 11 +
parquet-avro/pom.xml | 4 -
.../avro/AvroIndexedRecordConverter.java | 18 +-
.../apache/parquet/avro/AvroReadSupport.java | 4 +-
.../parquet/avro/AvroRecordConverter.java | 121 +++-
.../parquet/avro/AvroSchemaConverter.java | 147 ++--
.../apache/parquet/avro/AvroWriteSupport.java | 167 +++--
.../parquet/avro/ParentValueContainer.java | 175 +++++
.../src/main/resources/META-INF/LICENSE | 186 +++++
parquet-avro/src/main/resources/META-INF/NOTICE | 18 +
.../org/apache/parquet/avro/AvroTestUtil.java | 53 ++
.../parquet/avro/TestAvroSchemaConverter.java | 278 +++++++-
.../parquet/avro/TestCircularReferences.java | 383 ++++++++++
.../parquet/avro/TestGenericLogicalTypes.java | 271 +++++++
.../org/apache/parquet/avro/TestReadWrite.java | 118 +++-
.../avro/TestReadWriteOldListBehavior.java | 1 -
.../parquet/avro/TestReflectLogicalTypes.java | 705 +++++++++++++++++++
.../java/org/apache/parquet/io/api/Binary.java | 55 +-
.../java/org/apache/parquet/schema/Types.java | 12 +-
.../org/apache/parquet/io/api/TestBinary.java | 10 -
pom.xml | 1 +
22 files changed, 2593 insertions(+), 153 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b24a1d1/LICENSE
----------------------------------------------------------------------
diff --git a/LICENSE b/LICENSE
index b759148..b006581 100644
--- a/LICENSE
+++ b/LICENSE
@@ -178,6 +178,14 @@
--------------------------------------------------------------------------------
+This product includes code from Apache Avro.
+
+Copyright: 2014 The Apache Software Foundation.
+Home page: https://avro.apache.org/
+License: http://www.apache.org/licenses/LICENSE-2.0
+
+--------------------------------------------------------------------------------
+
This project includes code from Daniel Lemire's JavaFastPFOR project. The
"Lemire" bit packing source code produced by parquet-generator is derived from
the JavaFastPFOR project.
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b24a1d1/NOTICE
----------------------------------------------------------------------
diff --git a/NOTICE b/NOTICE
index c6e3bf2..a9b6c56 100644
--- a/NOTICE
+++ b/NOTICE
@@ -43,3 +43,14 @@ with the following copyright notice:
See the License for the specific language governing permissions and
limitations under the License.
+--------------------------------------------------------------------------------
+
+This product includes code from Apache Avro, which includes the following in
+its NOTICE file:
+
+ Apache Avro
+ Copyright 2010-2015 The Apache Software Foundation
+
+ This product includes software developed at
+ The Apache Software Foundation (http://www.apache.org/).
+
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b24a1d1/parquet-avro/pom.xml
----------------------------------------------------------------------
diff --git a/parquet-avro/pom.xml b/parquet-avro/pom.xml
index aad197d..50c37db 100644
--- a/parquet-avro/pom.xml
+++ b/parquet-avro/pom.xml
@@ -32,10 +32,6 @@
<name>Apache Parquet Avro</name>
<url>https://parquet.apache.org</url>
- <properties>
- <avro.version>1.7.6</avro.version>
- </properties>
-
<dependencies>
<dependency>
<groupId>org.apache.parquet</groupId>
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b24a1d1/parquet-avro/src/main/java/org/apache/parquet/avro/AvroIndexedRecordConverter.java
----------------------------------------------------------------------
diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroIndexedRecordConverter.java b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroIndexedRecordConverter.java
index 06c66d6..48eab4d 100644
--- a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroIndexedRecordConverter.java
+++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroIndexedRecordConverter.java
@@ -21,6 +21,8 @@ package org.apache.parquet.avro;
import java.lang.reflect.Constructor;
import java.util.HashMap;
import java.util.Map;
+import org.apache.avro.Conversion;
+import org.apache.avro.LogicalType;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericArray;
import org.apache.avro.generic.GenericData;
@@ -111,6 +113,11 @@ class AvroIndexedRecordConverter<T extends IndexedRecord> extends GroupConverter
@SuppressWarnings("unchecked")
private static <T> Class<T> getDatumClass(GenericData model, Schema schema) {
+ if (model.getConversionFor(schema.getLogicalType()) != null) {
+ // use generic classes to pass data to conversions
+ return null;
+ }
+
if (model instanceof SpecificData) {
return (Class<T>) ((SpecificData) model).getClass(schema);
}
@@ -133,7 +140,16 @@ class AvroIndexedRecordConverter<T extends IndexedRecord> extends GroupConverter
}
private static Converter newConverter(Schema schema, Type type,
- GenericData model, ParentValueContainer parent) {
+ GenericData model, ParentValueContainer setter) {
+
+ LogicalType logicalType = schema.getLogicalType();
+ // the expected type is always null because it is determined by the parent
+ // datum class, which never helps for generic. when logical types are added
+ // to specific, this should pass the expected type here.
+ Conversion<?> conversion = model.getConversionFor(logicalType);
+ ParentValueContainer parent = ParentValueContainer
+ .getConversionContainer(setter, conversion, schema);
+
if (schema.getType().equals(Schema.Type.BOOLEAN)) {
return new AvroConverters.FieldBooleanConverter(parent);
} else if (schema.getType().equals(Schema.Type.INT)) {
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b24a1d1/parquet-avro/src/main/java/org/apache/parquet/avro/AvroReadSupport.java
----------------------------------------------------------------------
diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroReadSupport.java b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroReadSupport.java
index e73e8af..7d55bf5 100644
--- a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroReadSupport.java
+++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroReadSupport.java
@@ -110,9 +110,9 @@ public class AvroReadSupport<T> extends ReadSupport<T> {
MessageType parquetSchema = readContext.getRequestedSchema();
Schema avroSchema;
- if (readContext.getReadSupportMetadata().get(AVRO_READ_SCHEMA_METADATA_KEY) != null) {
+ if (metadata.get(AVRO_READ_SCHEMA_METADATA_KEY) != null) {
// use the Avro read schema provided by the user
- avroSchema = new Schema.Parser().parse(readContext.getReadSupportMetadata().get(AVRO_READ_SCHEMA_METADATA_KEY));
+ avroSchema = new Schema.Parser().parse(metadata.get(AVRO_READ_SCHEMA_METADATA_KEY));
} else if (keyValueMetaData.get(AVRO_SCHEMA_METADATA_KEY) != null) {
// use the Avro schema from the file metadata if present
avroSchema = new Schema.Parser().parse(keyValueMetaData.get(AVRO_SCHEMA_METADATA_KEY));
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b24a1d1/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordConverter.java
----------------------------------------------------------------------
diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordConverter.java b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordConverter.java
index 38a761c..10bb29b 100644
--- a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordConverter.java
+++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordConverter.java
@@ -29,6 +29,7 @@ import it.unimi.dsi.fastutil.shorts.ShortArrayList;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
+import java.lang.reflect.Modifier;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
@@ -36,8 +37,14 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.LinkedHashMap;
+import org.apache.avro.AvroTypeException;
+import org.apache.avro.Conversion;
+import org.apache.avro.LogicalType;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
+import org.apache.avro.reflect.AvroIgnore;
+import org.apache.avro.reflect.AvroName;
import org.apache.avro.reflect.ReflectData;
import org.apache.avro.reflect.Stringable;
import org.apache.avro.specific.SpecificData;
@@ -69,7 +76,8 @@ class AvroRecordConverter<T> extends AvroConverters.AvroGroupConverter {
private static final String JAVA_CLASS_PROP = "java-class";
private static final String JAVA_KEY_CLASS_PROP = "java-key-class";
- protected T currentRecord;
+ protected T currentRecord = null;
+ private ParentValueContainer rootContainer = null;
private final Converter[] converters;
private final Schema avroSchema;
@@ -80,6 +88,15 @@ class AvroRecordConverter<T> extends AvroConverters.AvroGroupConverter {
public AvroRecordConverter(MessageType parquetSchema, Schema avroSchema,
GenericData baseModel) {
this(null, parquetSchema, avroSchema, baseModel);
+ LogicalType logicalType = avroSchema.getLogicalType();
+ Conversion<?> conversion = baseModel.getConversionFor(logicalType);
+ this.rootContainer = ParentValueContainer.getConversionContainer(new ParentValueContainer() {
+ @Override
+ @SuppressWarnings("unchecked")
+ public void add(Object value) {
+ AvroRecordConverter.this.currentRecord = (T) value;
+ }
+ }, conversion, avroSchema);
}
public AvroRecordConverter(ParentValueContainer parent,
@@ -101,6 +118,8 @@ class AvroRecordConverter<T> extends AvroConverters.AvroGroupConverter {
recordClass = getDatumClass(avroSchema, model);
}
+ Map<String, Class<?>> fields = getFieldsByName(recordClass, false);
+
int parquetFieldIndex = 0;
for (Type parquetField: parquetSchema.getFields()) {
final Schema.Field avroField = getAvroField(parquetField.getName());
@@ -112,8 +131,10 @@ class AvroRecordConverter<T> extends AvroConverters.AvroGroupConverter {
AvroRecordConverter.this.set(avroField.name(), finalAvroIndex, value);
}
};
+
+ Class<?> fieldClass = fields.get(avroField.name());
converters[parquetFieldIndex] = newConverter(
- nonNullSchema, parquetField, this.model, container);
+ nonNullSchema, parquetField, this.model, fieldClass, container);
// @Stringable doesn't affect the reflected schema; must be enforced here
if (recordClass != null &&
@@ -147,6 +168,43 @@ class AvroRecordConverter<T> extends AvroConverters.AvroGroupConverter {
}
}
+ // this was taken from Avro's ReflectData
+ private static Map<String, Class<?>> getFieldsByName(Class<?> recordClass,
+ boolean excludeJava) {
+ Map<String, Class<?>> fields = new LinkedHashMap<String, Class<?>>();
+
+ if (recordClass != null) {
+ Class<?> current = recordClass;
+ do {
+ if (excludeJava && current.getPackage() != null
+ && current.getPackage().getName().startsWith("java.")) {
+ break; // skip java built-in classes
+ }
+ for (Field field : current.getDeclaredFields()) {
+ if (field.isAnnotationPresent(AvroIgnore.class) ||
+ isTransientOrStatic(field)) {
+ continue;
+ }
+ AvroName altName = field.getAnnotation(AvroName.class);
+ Class<?> existing = fields.put(
+ altName != null ? altName.value() : field.getName(),
+ field.getType());
+ if (existing != null) {
+ throw new AvroTypeException(
+ current + " contains two fields named: " + field.getName());
+ }
+ }
+ current = current.getSuperclass();
+ } while (current != null);
+ }
+
+ return fields;
+ }
+
+ private static boolean isTransientOrStatic(Field field) {
+ return (field.getModifiers() & (Modifier.TRANSIENT | Modifier.STATIC)) != 0;
+ }
+
private Schema.Field getAvroField(String parquetFieldName) {
Schema.Field avroField = avroSchema.getField(parquetFieldName);
if (avroField != null) {
@@ -164,12 +222,28 @@ class AvroRecordConverter<T> extends AvroConverters.AvroGroupConverter {
parquetFieldName));
}
+ private static Converter newConverter(
+ Schema schema, Type type, GenericData model, ParentValueContainer setter) {
+ return newConverter(schema, type, model, null, setter);
+ }
+
private static Converter newConverter(Schema schema, Type type,
- GenericData model, ParentValueContainer parent) {
+ GenericData model, Class<?> knownClass, ParentValueContainer setter) {
+ LogicalType logicalType = schema.getLogicalType();
+ Conversion<?> conversion;
+ if (knownClass != null) {
+ conversion = model.getConversionByClass(knownClass, logicalType);
+ } else {
+ conversion = model.getConversionFor(logicalType);
+ }
+
+ ParentValueContainer parent = ParentValueContainer
+ .getConversionContainer(setter, conversion, schema);
+
if (schema.getType().equals(Schema.Type.BOOLEAN)) {
return new AvroConverters.FieldBooleanConverter(parent);
} else if (schema.getType().equals(Schema.Type.INT)) {
- Class<?> datumClass = getDatumClass(schema, model);
+ Class<?> datumClass = getDatumClass(conversion, knownClass, schema, model);
if (datumClass == null) {
return new AvroConverters.FieldIntegerConverter(parent);
} else if (datumClass == byte.class || datumClass == Byte.class) {
@@ -187,7 +261,7 @@ class AvroRecordConverter<T> extends AvroConverters.AvroGroupConverter {
} else if (schema.getType().equals(Schema.Type.DOUBLE)) {
return new AvroConverters.FieldDoubleConverter(parent);
} else if (schema.getType().equals(Schema.Type.BYTES)) {
- Class<?> datumClass = getDatumClass(schema, model);
+ Class<?> datumClass = getDatumClass(conversion, knownClass, schema, model);
if (datumClass == null) {
return new AvroConverters.FieldByteBufferConverter(parent);
} else if (datumClass.isArray() && datumClass.getComponentType() == byte.class) {
@@ -201,7 +275,7 @@ class AvroRecordConverter<T> extends AvroConverters.AvroGroupConverter {
} else if (schema.getType().equals(Schema.Type.ENUM)) {
return new AvroConverters.FieldEnumConverter(parent, schema, model);
} else if (schema.getType().equals(Schema.Type.ARRAY)) {
- Class<?> datumClass = getDatumClass(schema, model);
+ Class<?> datumClass = getDatumClass(conversion, knownClass, schema, model);
if (datumClass != null && datumClass.isArray()) {
return new AvroArrayConverter(
parent, type.asGroupType(), schema, model, datumClass);
@@ -265,8 +339,24 @@ class AvroRecordConverter<T> extends AvroConverters.AvroGroupConverter {
}
}
- @SuppressWarnings("unchecked")
private static <T> Class<T> getDatumClass(Schema schema, GenericData model) {
+ return getDatumClass(null, null, schema, model);
+ }
+
+ @SuppressWarnings("unchecked")
+ private static <T> Class<T> getDatumClass(Conversion<?> conversion,
+ Class<T> knownClass,
+ Schema schema, GenericData model) {
+ if (conversion != null) {
+ // use generic classes to pass data to conversions
+ return null;
+ }
+
+ // known class can be set when using reflect
+ if (knownClass != null) {
+ return knownClass;
+ }
+
if (model instanceof SpecificData) {
// this works for reflect as well
return ((SpecificData) model).getClass(schema);
@@ -314,6 +404,9 @@ class AvroRecordConverter<T> extends AvroConverters.AvroGroupConverter {
fillInDefaults();
if (parent != null) {
parent.add(currentRecord);
+ } else {
+ // this applies any converters needed for the root value
+ rootContainer.add(currentRecord);
}
}
@@ -502,10 +595,10 @@ class AvroRecordConverter<T> extends AvroConverters.AvroGroupConverter {
// matching it against the element schema.
if (isElementType(repeatedType, elementSchema)) {
// the element type is the repeated type (and required)
- converter = newConverter(elementSchema, repeatedType, model, setter);
+ converter = newConverter(elementSchema, repeatedType, model, elementClass, setter);
} else {
// the element is wrapped in a synthetic group and may be optional
- converter = new PrimitiveElementConverter(
+ converter = new ArrayElementConverter(
repeatedType.asGroupType(), elementSchema, model, setter);
}
}
@@ -643,20 +736,20 @@ class AvroRecordConverter<T> extends AvroConverters.AvroGroupConverter {
* }
* </pre>
*/
- final class PrimitiveElementConverter extends GroupConverter {
+ final class ArrayElementConverter extends GroupConverter {
private boolean isSet;
private final Converter elementConverter;
- public PrimitiveElementConverter(GroupType repeatedType,
- Schema elementSchema, GenericData model,
- final ParentValueContainer setter) {
+ public ArrayElementConverter(GroupType repeatedType,
+ Schema elementSchema, GenericData model,
+ final ParentValueContainer setter) {
Type elementType = repeatedType.getType(0);
Preconditions.checkArgument(
!elementClass.isPrimitive() || elementType.isRepetition(REQUIRED),
"Cannot convert list of optional elements to primitive array");
Schema nonNullElementSchema = AvroSchemaConverter.getNonNull(elementSchema);
this.elementConverter = newConverter(
- nonNullElementSchema, elementType, model, new ParentValueContainer() {
+ nonNullElementSchema, elementType, model, elementClass, new ParentValueContainer() {
@Override
public void add(Object value) {
isSet = true;
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b24a1d1/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java
----------------------------------------------------------------------
diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java
index 6cfa8d1..6b9b94c 100644
--- a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java
+++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java
@@ -18,20 +18,26 @@
*/
package org.apache.parquet.avro;
-import java.util.*;
-
+import org.apache.avro.LogicalType;
+import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;
import org.apache.hadoop.conf.Configuration;
-import org.codehaus.jackson.node.NullNode;
import org.apache.parquet.schema.ConversionPatterns;
+import org.apache.parquet.schema.DecimalMetadata;
import org.apache.parquet.schema.GroupType;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.OriginalType;
import org.apache.parquet.schema.PrimitiveType;
-import org.apache.parquet.schema.Type;
import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
+import org.apache.parquet.schema.Type;
+import org.apache.parquet.schema.Types;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import static org.apache.avro.JsonProperties.NULL_VALUE;
import static org.apache.parquet.avro.AvroWriteSupport.WRITE_OLD_LIST_STRUCTURE;
import static org.apache.parquet.avro.AvroWriteSupport.WRITE_OLD_LIST_STRUCTURE_DEFAULT;
import static org.apache.parquet.schema.OriginalType.*;
@@ -113,26 +119,28 @@ public class AvroSchemaConverter {
return convertField(fieldName, schema, Type.Repetition.REQUIRED);
}
+ @SuppressWarnings("deprecation")
private Type convertField(String fieldName, Schema schema, Type.Repetition repetition) {
+ Types.PrimitiveBuilder<PrimitiveType> builder;
Schema.Type type = schema.getType();
if (type.equals(Schema.Type.BOOLEAN)) {
- return primitive(fieldName, BOOLEAN, repetition);
+ builder = Types.primitive(BOOLEAN, repetition);
} else if (type.equals(Schema.Type.INT)) {
- return primitive(fieldName, INT32, repetition);
+ builder = Types.primitive(INT32, repetition);
} else if (type.equals(Schema.Type.LONG)) {
- return primitive(fieldName, INT64, repetition);
+ builder = Types.primitive(INT64, repetition);
} else if (type.equals(Schema.Type.FLOAT)) {
- return primitive(fieldName, FLOAT, repetition);
+ builder = Types.primitive(FLOAT, repetition);
} else if (type.equals(Schema.Type.DOUBLE)) {
- return primitive(fieldName, DOUBLE, repetition);
+ builder = Types.primitive(DOUBLE, repetition);
} else if (type.equals(Schema.Type.BYTES)) {
- return primitive(fieldName, BINARY, repetition);
+ builder = Types.primitive(BINARY, repetition);
} else if (type.equals(Schema.Type.STRING)) {
- return primitive(fieldName, BINARY, repetition, UTF8);
+ builder = Types.primitive(BINARY, repetition).as(UTF8);
} else if (type.equals(Schema.Type.RECORD)) {
return new GroupType(repetition, fieldName, convertFields(schema.getFields()));
} else if (type.equals(Schema.Type.ENUM)) {
- return primitive(fieldName, BINARY, repetition, ENUM);
+ builder = Types.primitive(BINARY, repetition).as(ENUM);
} else if (type.equals(Schema.Type.ARRAY)) {
if (writeOldListStructure) {
return ConversionPatterns.listType(repetition, fieldName,
@@ -146,16 +154,36 @@ public class AvroSchemaConverter {
// avro map key type is always string
return ConversionPatterns.stringKeyMapType(repetition, fieldName, valType);
} else if (type.equals(Schema.Type.FIXED)) {
- return primitive(fieldName, FIXED_LEN_BYTE_ARRAY, repetition,
- schema.getFixedSize(), null);
+ builder = Types.primitive(FIXED_LEN_BYTE_ARRAY, repetition)
+ .length(schema.getFixedSize());
} else if (type.equals(Schema.Type.UNION)) {
return convertUnion(fieldName, schema, repetition);
+ } else {
+ throw new UnsupportedOperationException("Cannot convert Avro type " + type);
}
- throw new UnsupportedOperationException("Cannot convert Avro type " + type);
+
+ // schema translation can only be done for known logical types because this
+ // creates an equivalence
+ LogicalType logicalType = schema.getLogicalType();
+ if (logicalType != null) {
+ if (logicalType instanceof LogicalTypes.Decimal) {
+ builder = builder.as(DECIMAL)
+ .precision(((LogicalTypes.Decimal) logicalType).getPrecision())
+ .scale(((LogicalTypes.Decimal) logicalType).getScale());
+
+ } else {
+ OriginalType annotation = convertLogicalType(logicalType);
+ if (annotation != null) {
+ builder.as(annotation);
+ }
+ }
+ }
+
+ return builder.named(fieldName);
}
private Type convertUnion(String fieldName, Schema schema, Type.Repetition repetition) {
- List<Schema> nonNullSchemas = new ArrayList(schema.getTypes().size());
+ List<Schema> nonNullSchemas = new ArrayList<Schema>(schema.getTypes().size());
for (Schema childSchema : schema.getTypes()) {
if (childSchema.getType().equals(Schema.Type.NULL)) {
if (Type.Repetition.REQUIRED == repetition) {
@@ -175,7 +203,7 @@ public class AvroSchemaConverter {
return convertField(fieldName, nonNullSchemas.get(0), repetition);
default: // complex union type
- List<Type> unionTypes = new ArrayList(nonNullSchemas.size());
+ List<Type> unionTypes = new ArrayList<Type>(nonNullSchemas.size());
int index = 0;
for (Schema childSchema : nonNullSchemas) {
unionTypes.add( convertField("member" + index++, childSchema, Type.Repetition.OPTIONAL));
@@ -188,24 +216,6 @@ public class AvroSchemaConverter {
return convertField(field.name(), field.schema());
}
- private PrimitiveType primitive(String name,
- PrimitiveType.PrimitiveTypeName primitive, Type.Repetition repetition,
- int typeLength, OriginalType originalType) {
- return new PrimitiveType(repetition, primitive, typeLength, name,
- originalType);
- }
-
- private PrimitiveType primitive(String name,
- PrimitiveType.PrimitiveTypeName primitive, Type.Repetition repetition,
- OriginalType originalType) {
- return new PrimitiveType(repetition, primitive, name, originalType);
- }
-
- private PrimitiveType primitive(String name,
- PrimitiveType.PrimitiveTypeName primitive, Type.Repetition repetition) {
- return new PrimitiveType(repetition, primitive, name, null);
- }
-
public Schema convert(MessageType parquetSchema) {
return convertFields(parquetSchema.getName(), parquetSchema.getFields());
}
@@ -217,10 +227,11 @@ public class AvroSchemaConverter {
if (parquetType.isRepetition(REPEATED)) {
throw new UnsupportedOperationException("REPEATED not supported outside LIST or MAP. Type: " + parquetType);
} else if (parquetType.isRepetition(Type.Repetition.OPTIONAL)) {
- fields.add(new Schema.Field(parquetType.getName(), optional(fieldSchema), null,
- NullNode.getInstance()));
+ fields.add(new Schema.Field(
+ parquetType.getName(), optional(fieldSchema), null, NULL_VALUE));
} else { // REQUIRED
- fields.add(new Schema.Field(parquetType.getName(), fieldSchema, null, null));
+ fields.add(new Schema.Field(
+ parquetType.getName(), fieldSchema, null, (Object) null));
}
}
Schema schema = Schema.createRecord(name, null, null, false);
@@ -230,10 +241,11 @@ public class AvroSchemaConverter {
private Schema convertField(final Type parquetType) {
if (parquetType.isPrimitive()) {
+ final PrimitiveType asPrimitive = parquetType.asPrimitiveType();
final PrimitiveTypeName parquetPrimitiveTypeName =
- parquetType.asPrimitiveType().getPrimitiveTypeName();
- final OriginalType originalType = parquetType.getOriginalType();
- return parquetPrimitiveTypeName.convert(
+ asPrimitive.getPrimitiveTypeName();
+ final OriginalType annotation = parquetType.getOriginalType();
+ Schema schema = parquetPrimitiveTypeName.convert(
new PrimitiveType.PrimitiveTypeNameConverter<Schema, RuntimeException>() {
@Override
public Schema convertBOOLEAN(PrimitiveTypeName primitiveTypeName) {
@@ -266,13 +278,24 @@ public class AvroSchemaConverter {
}
@Override
public Schema convertBINARY(PrimitiveTypeName primitiveTypeName) {
- if (originalType == OriginalType.UTF8 || originalType == OriginalType.ENUM) {
+ if (annotation == OriginalType.UTF8 || annotation == OriginalType.ENUM) {
return Schema.create(Schema.Type.STRING);
} else {
return Schema.create(Schema.Type.BYTES);
}
}
});
+
+ LogicalType logicalType = convertOriginalType(
+ annotation, asPrimitive.getDecimalMetadata());
+ if (logicalType != null && (annotation != DECIMAL ||
+ parquetPrimitiveTypeName == BINARY ||
+ parquetPrimitiveTypeName == FIXED_LEN_BYTE_ARRAY)) {
+ schema = logicalType.addToSchema(schema);
+ }
+
+ return schema;
+
} else {
GroupType parquetGroupType = parquetType.asGroupType();
OriginalType originalType = parquetGroupType.getOriginalType();
@@ -335,6 +358,46 @@ public class AvroSchemaConverter {
}
}
+ private OriginalType convertLogicalType(LogicalType logicalType) {
+ if (logicalType == null) {
+ return null;
+ } else if (logicalType instanceof LogicalTypes.Decimal) {
+ return OriginalType.DECIMAL;
+ } else if (logicalType instanceof LogicalTypes.Date) {
+ return OriginalType.DATE;
+ } else if (logicalType instanceof LogicalTypes.TimeMillis) {
+ return OriginalType.TIME_MILLIS;
+ } else if (logicalType instanceof LogicalTypes.TimeMicros) {
+ return OriginalType.TIME_MICROS;
+ } else if (logicalType instanceof LogicalTypes.TimestampMillis) {
+ return OriginalType.TIMESTAMP_MILLIS;
+ } else if (logicalType instanceof LogicalTypes.TimestampMicros) {
+ return OriginalType.TIMESTAMP_MICROS;
+ }
+ return null;
+ }
+
+ private LogicalType convertOriginalType(OriginalType annotation, DecimalMetadata meta) {
+ if (annotation == null) {
+ return null;
+ }
+ switch (annotation) {
+ case DECIMAL:
+ return LogicalTypes.decimal(meta.getPrecision(), meta.getScale());
+ case DATE:
+ return LogicalTypes.date();
+ case TIME_MILLIS:
+ return LogicalTypes.timeMillis();
+ case TIME_MICROS:
+ return LogicalTypes.timeMicros();
+ case TIMESTAMP_MILLIS:
+ return LogicalTypes.timestampMillis();
+ case TIMESTAMP_MICROS:
+ return LogicalTypes.timestampMicros();
+ }
+ return null;
+ }
+
/**
* Implements the rules for interpreting existing data from the logical type
* spec for the LIST annotation. This is used to produce the expected schema.
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b24a1d1/parquet-avro/src/main/java/org/apache/parquet/avro/AvroWriteSupport.java
----------------------------------------------------------------------
diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroWriteSupport.java b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroWriteSupport.java
index c75bb03..7fcd88e 100644
--- a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroWriteSupport.java
+++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroWriteSupport.java
@@ -23,6 +23,8 @@ import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import org.apache.avro.Conversion;
+import org.apache.avro.LogicalType;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericFixed;
@@ -69,6 +71,8 @@ public class AvroWriteSupport<T> extends WriteSupport<T> {
private RecordConsumer recordConsumer;
private MessageType rootSchema;
private Schema rootAvroSchema;
+ private LogicalType rootLogicalType;
+ private Conversion<?> rootConversion;
private GenericData model;
private ListWriter listWriter;
@@ -82,6 +86,7 @@ public class AvroWriteSupport<T> extends WriteSupport<T> {
public AvroWriteSupport(MessageType schema, Schema avroSchema) {
this.rootSchema = schema;
this.rootAvroSchema = avroSchema;
+ this.rootLogicalType = rootAvroSchema.getLogicalType();
this.model = null;
}
@@ -89,6 +94,7 @@ public class AvroWriteSupport<T> extends WriteSupport<T> {
GenericData model) {
this.rootSchema = schema;
this.rootAvroSchema = avroSchema;
+ this.rootLogicalType = rootAvroSchema.getLogicalType();
this.model = model;
}
@@ -136,16 +142,25 @@ public class AvroWriteSupport<T> extends WriteSupport<T> {
// overloaded version for backward compatibility
@SuppressWarnings("unchecked")
public void write(IndexedRecord record) {
- recordConsumer.startMessage();
- writeRecordFields(rootSchema, rootAvroSchema, record);
- recordConsumer.endMessage();
+ write((T) record);
}
@Override
public void write(T record) {
- recordConsumer.startMessage();
- writeRecordFields(rootSchema, rootAvroSchema, record);
- recordConsumer.endMessage();
+ if (rootLogicalType != null) {
+ Conversion<?> conversion = model.getConversionByClass(
+ record.getClass(), rootLogicalType);
+
+ recordConsumer.startMessage();
+ writeRecordFields(rootSchema, rootAvroSchema,
+ convert(rootAvroSchema, rootLogicalType, conversion, record));
+ recordConsumer.endMessage();
+
+ } else {
+ recordConsumer.startMessage();
+ writeRecordFields(rootSchema, rootAvroSchema, record);
+ recordConsumer.endMessage();
+ }
}
private void writeRecord(GroupType schema, Schema avroSchema,
@@ -226,6 +241,8 @@ public class AvroWriteSupport<T> extends WriteSupport<T> {
}
}
+ // TODO: what if the value is null?
+
// Sparsely populated method of encoding unions, each member has its own
// set of columns.
String memberName = "member" + parquetIndex;
@@ -237,44 +254,108 @@ public class AvroWriteSupport<T> extends WriteSupport<T> {
recordConsumer.endGroup();
}
- @SuppressWarnings("unchecked")
+ /**
+ * Calls an appropriate write method based on the value.
+ * Value MUST not be null.
+ *
+ * @param type the Parquet type
+ * @param avroSchema the Avro schema
+ * @param value a non-null value to write
+ */
private void writeValue(Type type, Schema avroSchema, Object value) {
Schema nonNullAvroSchema = AvroSchemaConverter.getNonNull(avroSchema);
- Schema.Type avroType = nonNullAvroSchema.getType();
- if (avroType.equals(Schema.Type.BOOLEAN)) {
- recordConsumer.addBoolean((Boolean) value);
- } else if (avroType.equals(Schema.Type.INT)) {
- if (value instanceof Character) {
- recordConsumer.addInteger((Character) value);
- } else {
- recordConsumer.addInteger(((Number) value).intValue());
- }
- } else if (avroType.equals(Schema.Type.LONG)) {
- recordConsumer.addLong(((Number) value).longValue());
- } else if (avroType.equals(Schema.Type.FLOAT)) {
- recordConsumer.addFloat(((Number) value).floatValue());
- } else if (avroType.equals(Schema.Type.DOUBLE)) {
- recordConsumer.addDouble(((Number) value).doubleValue());
- } else if (avroType.equals(Schema.Type.BYTES)) {
- if (value instanceof byte[]) {
- recordConsumer.addBinary(Binary.fromReusedByteArray((byte[]) value));
- } else {
- recordConsumer.addBinary(Binary.fromReusedByteBuffer((ByteBuffer) value));
- }
- } else if (avroType.equals(Schema.Type.STRING)) {
- recordConsumer.addBinary(fromAvroString(value));
- } else if (avroType.equals(Schema.Type.RECORD)) {
- writeRecord(type.asGroupType(), nonNullAvroSchema, value);
- } else if (avroType.equals(Schema.Type.ENUM)) {
- recordConsumer.addBinary(Binary.fromString(value.toString()));
- } else if (avroType.equals(Schema.Type.ARRAY)) {
- listWriter.writeList(type.asGroupType(), nonNullAvroSchema, value);
- } else if (avroType.equals(Schema.Type.MAP)) {
- writeMap(type.asGroupType(), nonNullAvroSchema, (Map<CharSequence, ?>) value);
- } else if (avroType.equals(Schema.Type.UNION)) {
- writeUnion(type.asGroupType(), nonNullAvroSchema, value);
- } else if (avroType.equals(Schema.Type.FIXED)) {
- recordConsumer.addBinary(Binary.fromReusedByteArray(((GenericFixed) value).bytes()));
+ LogicalType logicalType = nonNullAvroSchema.getLogicalType();
+ if (logicalType != null) {
+ Conversion<?> conversion = model.getConversionByClass(
+ value.getClass(), logicalType);
+ writeValueWithoutConversion(type, nonNullAvroSchema,
+ convert(nonNullAvroSchema, logicalType, conversion, value));
+ } else {
+ writeValueWithoutConversion(type, nonNullAvroSchema, value);
+ }
+ }
+
+ private <D> Object convert(Schema schema, LogicalType logicalType,
+ Conversion<D> conversion, Object datum) {
+ if (conversion == null) {
+ return datum;
+ }
+ Class<D> fromClass = conversion.getConvertedType();
+ switch (schema.getType()) {
+ case RECORD: return conversion.toRecord(fromClass.cast(datum), schema, logicalType);
+ case ENUM: return conversion.toEnumSymbol(fromClass.cast(datum), schema, logicalType);
+ case ARRAY: return conversion.toArray(fromClass.cast(datum), schema, logicalType);
+ case MAP: return conversion.toMap(fromClass.cast(datum), schema, logicalType);
+ case FIXED: return conversion.toFixed(fromClass.cast(datum), schema, logicalType);
+ case STRING: return conversion.toCharSequence(fromClass.cast(datum), schema, logicalType);
+ case BYTES: return conversion.toBytes(fromClass.cast(datum), schema, logicalType);
+ case INT: return conversion.toInt(fromClass.cast(datum), schema, logicalType);
+ case LONG: return conversion.toLong(fromClass.cast(datum), schema, logicalType);
+ case FLOAT: return conversion.toFloat(fromClass.cast(datum), schema, logicalType);
+ case DOUBLE: return conversion.toDouble(fromClass.cast(datum), schema, logicalType);
+ case BOOLEAN: return conversion.toBoolean(fromClass.cast(datum), schema, logicalType);
+ }
+ return datum;
+ }
+
+ /**
+ * Calls an appropriate write method based on the value.
+ * Value must not be null and the schema must not be nullable.
+ *
+ * @param type a Parquet type
+ * @param avroSchema a non-nullable Avro schema
+ * @param value a non-null value to write
+ */
+ @SuppressWarnings("unchecked")
+ private void writeValueWithoutConversion(Type type, Schema avroSchema, Object value) {
+ switch (avroSchema.getType()) {
+ case BOOLEAN:
+ recordConsumer.addBoolean((Boolean) value);
+ break;
+ case INT:
+ if (value instanceof Character) {
+ recordConsumer.addInteger((Character) value);
+ } else {
+ recordConsumer.addInteger(((Number) value).intValue());
+ }
+ break;
+ case LONG:
+ recordConsumer.addLong(((Number) value).longValue());
+ break;
+ case FLOAT:
+ recordConsumer.addFloat(((Number) value).floatValue());
+ break;
+ case DOUBLE:
+ recordConsumer.addDouble(((Number) value).doubleValue());
+ break;
+ case FIXED:
+ recordConsumer.addBinary(Binary.fromReusedByteArray(((GenericFixed) value).bytes()));
+ break;
+ case BYTES:
+ if (value instanceof byte[]) {
+ recordConsumer.addBinary(Binary.fromReusedByteArray((byte[]) value));
+ } else {
+ recordConsumer.addBinary(Binary.fromReusedByteBuffer((ByteBuffer) value));
+ }
+ break;
+ case STRING:
+ recordConsumer.addBinary(fromAvroString(value));
+ break;
+ case RECORD:
+ writeRecord(type.asGroupType(), avroSchema, value);
+ break;
+ case ENUM:
+ recordConsumer.addBinary(Binary.fromString(value.toString()));
+ break;
+ case ARRAY:
+ listWriter.writeList(type.asGroupType(), avroSchema, value);
+ break;
+ case MAP:
+ writeMap(type.asGroupType(), avroSchema, (Map<CharSequence, ?>) value);
+ break;
+ case UNION:
+ writeUnion(type.asGroupType(), avroSchema, value);
+ break;
}
}
@@ -283,7 +364,7 @@ public class AvroWriteSupport<T> extends WriteSupport<T> {
Utf8 utf8 = (Utf8) value;
return Binary.fromReusedByteArray(utf8.getBytes(), 0, utf8.getByteLength());
}
- return Binary.fromString(value.toString());
+ return Binary.fromString((CharSequence) value);
}
private static GenericData getDataModel(Configuration conf) {
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b24a1d1/parquet-avro/src/main/java/org/apache/parquet/avro/ParentValueContainer.java
----------------------------------------------------------------------
diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/ParentValueContainer.java b/parquet-avro/src/main/java/org/apache/parquet/avro/ParentValueContainer.java
index 67b710d..f36f5fc 100644
--- a/parquet-avro/src/main/java/org/apache/parquet/avro/ParentValueContainer.java
+++ b/parquet-avro/src/main/java/org/apache/parquet/avro/ParentValueContainer.java
@@ -18,6 +18,16 @@
*/
package org.apache.parquet.avro;
+import org.apache.avro.Conversion;
+import org.apache.avro.LogicalType;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericEnumSymbol;
+import org.apache.avro.generic.IndexedRecord;
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Map;
+
abstract class ParentValueContainer {
/**
@@ -60,4 +70,169 @@ abstract class ParentValueContainer {
add(value);
}
+ static class LogicalTypePrimitiveContainer extends ParentValueContainer {
+ private final ParentValueContainer wrapped;
+ private final Schema schema;
+ private final LogicalType logicalType;
+ private final Conversion conversion;
+
+ public LogicalTypePrimitiveContainer(ParentValueContainer wrapped,
+ Schema schema, Conversion conversion) {
+ this.wrapped = wrapped;
+ this.schema = schema;
+ this.logicalType = schema.getLogicalType();
+ this.conversion = conversion;
+ }
+
+ @Override
+ public void addDouble(double value) {
+ wrapped.add(conversion.fromDouble(value, schema, logicalType));
+ }
+
+ @Override
+ public void addFloat(float value) {
+ wrapped.add(conversion.fromFloat(value, schema, logicalType));
+ }
+
+ @Override
+ public void addLong(long value) {
+ wrapped.add(conversion.fromLong(value, schema, logicalType));
+ }
+
+ @Override
+ public void addInt(int value) {
+ wrapped.add(conversion.fromInt(value, schema, logicalType));
+ }
+
+ @Override
+ public void addShort(short value) {
+ wrapped.add(conversion.fromInt((int) value, schema, logicalType));
+ }
+
+ @Override
+ public void addChar(char value) {
+ wrapped.add(conversion.fromInt((int) value, schema, logicalType));
+ }
+
+ @Override
+ public void addByte(byte value) {
+ wrapped.add(conversion.fromInt((int) value, schema, logicalType));
+ }
+
+ @Override
+ public void addBoolean(boolean value) {
+ wrapped.add(conversion.fromBoolean(value, schema, logicalType));
+ }
+ }
+
+ static ParentValueContainer getConversionContainer(
+ final ParentValueContainer parent, final Conversion<?> conversion,
+ final Schema schema) {
+ if (conversion == null) {
+ return parent;
+ }
+
+ final LogicalType logicalType = schema.getLogicalType();
+
+ switch (schema.getType()) {
+ case STRING:
+ return new ParentValueContainer() {
+ @Override
+ public void add(Object value) {
+ parent.add(conversion.fromCharSequence(
+ (CharSequence) value, schema, logicalType));
+ }
+ };
+ case BOOLEAN:
+ return new LogicalTypePrimitiveContainer(parent, schema, conversion) {
+ @Override
+ public void add(Object value) {
+ parent.add(conversion.fromBoolean(
+ (Boolean) value, schema, logicalType));
+ }
+ };
+ case INT:
+ return new LogicalTypePrimitiveContainer(parent, schema, conversion) {
+ @Override
+ public void add(Object value) {
+ parent.add(conversion.fromInt(
+ (Integer) value, schema, logicalType));
+ }
+ };
+ case LONG:
+ return new LogicalTypePrimitiveContainer(parent, schema, conversion) {
+ @Override
+ public void add(Object value) {
+ parent.add(conversion.fromLong(
+ (Long) value, schema, logicalType));
+ }
+ };
+ case FLOAT:
+ return new LogicalTypePrimitiveContainer(parent, schema, conversion) {
+ @Override
+ public void add(Object value) {
+ parent.add(conversion.fromFloat(
+ (Float) value, schema, logicalType));
+ }
+ };
+ case DOUBLE:
+ return new LogicalTypePrimitiveContainer(parent, schema, conversion) {
+ @Override
+ public void add(Object value) {
+ parent.add(conversion.fromDouble(
+ (Double) value, schema, logicalType));
+ }
+ };
+ case BYTES:
+ return new ParentValueContainer() {
+ @Override
+ public void add(Object value) {
+ parent.add(conversion.fromBytes(
+ (ByteBuffer) value, schema, logicalType));
+ }
+ };
+ case FIXED:
+ return new ParentValueContainer() {
+ @Override
+ public void add(Object value) {
+ parent.add(conversion.fromFixed(
+ (GenericData.Fixed) value, schema, logicalType));
+ }
+ };
+ case RECORD:
+ return new ParentValueContainer() {
+ @Override
+ public void add(Object value) {
+ parent.add(conversion.fromRecord(
+ (IndexedRecord) value, schema, logicalType));
+ }
+ };
+ case ARRAY:
+ return new ParentValueContainer() {
+ @Override
+ public void add(Object value) {
+ parent.add(conversion.fromArray(
+ (Collection<?>) value, schema, logicalType));
+ }
+ };
+ case MAP:
+ return new ParentValueContainer() {
+ @Override
+ public void add(Object value) {
+ parent.add(conversion.fromMap(
+ (Map<?, ?>) value, schema, logicalType));
+ }
+ };
+ case ENUM:
+ return new ParentValueContainer() {
+ @Override
+ public void add(Object value) {
+ parent.add(conversion.fromEnumSymbol(
+ (GenericEnumSymbol) value, schema, logicalType));
+ }
+ };
+ default:
+ return new LogicalTypePrimitiveContainer(parent, schema, conversion);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b24a1d1/parquet-avro/src/main/resources/META-INF/LICENSE
----------------------------------------------------------------------
diff --git a/parquet-avro/src/main/resources/META-INF/LICENSE b/parquet-avro/src/main/resources/META-INF/LICENSE
new file mode 100644
index 0000000..20b23c9
--- /dev/null
+++ b/parquet-avro/src/main/resources/META-INF/LICENSE
@@ -0,0 +1,186 @@
+
+ Apache License
+ Version 2.0, January 2004
+ http://www.apache.org/licenses/
+
+ TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+ 1. Definitions.
+
+ "License" shall mean the terms and conditions for use, reproduction,
+ and distribution as defined by Sections 1 through 9 of this document.
+
+ "Licensor" shall mean the copyright owner or entity authorized by
+ the copyright owner that is granting the License.
+
+ "Legal Entity" shall mean the union of the acting entity and all
+ other entities that control, are controlled by, or are under common
+ control with that entity. For the purposes of this definition,
+ "control" means (i) the power, direct or indirect, to cause the
+ direction or management of such entity, whether by contract or
+ otherwise, or (ii) ownership of fifty percent (50%) or more of the
+ outstanding shares, or (iii) beneficial ownership of such entity.
+
+ "You" (or "Your") shall mean an individual or Legal Entity
+ exercising permissions granted by this License.
+
+ "Source" form shall mean the preferred form for making modifications,
+ including but not limited to software source code, documentation
+ source, and configuration files.
+
+ "Object" form shall mean any form resulting from mechanical
+ transformation or translation of a Source form, including but
+ not limited to compiled object code, generated documentation,
+ and conversions to other media types.
+
+ "Work" shall mean the work of authorship, whether in Source or
+ Object form, made available under the License, as indicated by a
+ copyright notice that is included in or attached to the work
+ (an example is provided in the Appendix below).
+
+ "Derivative Works" shall mean any work, whether in Source or Object
+ form, that is based on (or derived from) the Work and for which the
+ editorial revisions, annotations, elaborations, or other modifications
+ represent, as a whole, an original work of authorship. For the purposes
+ of this License, Derivative Works shall not include works that remain
+ separable from, or merely link (or bind by name) to the interfaces of,
+ the Work and Derivative Works thereof.
+
+ "Contribution" shall mean any work of authorship, including
+ the original version of the Work and any modifications or additions
+ to that Work or Derivative Works thereof, that is intentionally
+ submitted to Licensor for inclusion in the Work by the copyright owner
+ or by an individual or Legal Entity authorized to submit on behalf of
+ the copyright owner. For the purposes of this definition, "submitted"
+ means any form of electronic, verbal, or written communication sent
+ to the Licensor or its representatives, including but not limited to
+ communication on electronic mailing lists, source code control systems,
+ and issue tracking systems that are managed by, or on behalf of, the
+ Licensor for the purpose of discussing and improving the Work, but
+ excluding communication that is conspicuously marked or otherwise
+ designated in writing by the copyright owner as "Not a Contribution."
+
+ "Contributor" shall mean Licensor and any individual or Legal Entity
+ on behalf of whom a Contribution has been received by Licensor and
+ subsequently incorporated within the Work.
+
+ 2. Grant of Copyright License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ copyright license to reproduce, prepare Derivative Works of,
+ publicly display, publicly perform, sublicense, and distribute the
+ Work and such Derivative Works in Source or Object form.
+
+ 3. Grant of Patent License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ (except as stated in this section) patent license to make, have made,
+ use, offer to sell, sell, import, and otherwise transfer the Work,
+ where such license applies only to those patent claims licensable
+ by such Contributor that are necessarily infringed by their
+ Contribution(s) alone or by combination of their Contribution(s)
+ with the Work to which such Contribution(s) was submitted. If You
+ institute patent litigation against any entity (including a
+ cross-claim or counterclaim in a lawsuit) alleging that the Work
+ or a Contribution incorporated within the Work constitutes direct
+ or contributory patent infringement, then any patent licenses
+ granted to You under this License for that Work shall terminate
+ as of the date such litigation is filed.
+
+ 4. Redistribution. You may reproduce and distribute copies of the
+ Work or Derivative Works thereof in any medium, with or without
+ modifications, and in Source or Object form, provided that You
+ meet the following conditions:
+
+ (a) You must give any other recipients of the Work or
+ Derivative Works a copy of this License; and
+
+ (b) You must cause any modified files to carry prominent notices
+ stating that You changed the files; and
+
+ (c) You must retain, in the Source form of any Derivative Works
+ that You distribute, all copyright, patent, trademark, and
+ attribution notices from the Source form of the Work,
+ excluding those notices that do not pertain to any part of
+ the Derivative Works; and
+
+ (d) If the Work includes a "NOTICE" text file as part of its
+ distribution, then any Derivative Works that You distribute must
+ include a readable copy of the attribution notices contained
+ within such NOTICE file, excluding those notices that do not
+ pertain to any part of the Derivative Works, in at least one
+ of the following places: within a NOTICE text file distributed
+ as part of the Derivative Works; within the Source form or
+ documentation, if provided along with the Derivative Works; or,
+ within a display generated by the Derivative Works, if and
+ wherever such third-party notices normally appear. The contents
+ of the NOTICE file are for informational purposes only and
+ do not modify the License. You may add Your own attribution
+ notices within Derivative Works that You distribute, alongside
+ or as an addendum to the NOTICE text from the Work, provided
+ that such additional attribution notices cannot be construed
+ as modifying the License.
+
+ You may add Your own copyright statement to Your modifications and
+ may provide additional or different license terms and conditions
+ for use, reproduction, or distribution of Your modifications, or
+ for any such Derivative Works as a whole, provided Your use,
+ reproduction, and distribution of the Work otherwise complies with
+ the conditions stated in this License.
+
+ 5. Submission of Contributions. Unless You explicitly state otherwise,
+ any Contribution intentionally submitted for inclusion in the Work
+ by You to the Licensor shall be under the terms and conditions of
+ this License, without any additional terms or conditions.
+ Notwithstanding the above, nothing herein shall supersede or modify
+ the terms of any separate license agreement you may have executed
+ with Licensor regarding such Contributions.
+
+ 6. Trademarks. This License does not grant permission to use the trade
+ names, trademarks, service marks, or product names of the Licensor,
+ except as required for reasonable and customary use in describing the
+ origin of the Work and reproducing the content of the NOTICE file.
+
+ 7. Disclaimer of Warranty. Unless required by applicable law or
+ agreed to in writing, Licensor provides the Work (and each
+ Contributor provides its Contributions) on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ implied, including, without limitation, any warranties or conditions
+ of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+ PARTICULAR PURPOSE. You are solely responsible for determining the
+ appropriateness of using or redistributing the Work and assume any
+ risks associated with Your exercise of permissions under this License.
+
+ 8. Limitation of Liability. In no event and under no legal theory,
+ whether in tort (including negligence), contract, or otherwise,
+ unless required by applicable law (such as deliberate and grossly
+ negligent acts) or agreed to in writing, shall any Contributor be
+ liable to You for damages, including any direct, indirect, special,
+ incidental, or consequential damages of any character arising as a
+ result of this License or out of the use or inability to use the
+ Work (including but not limited to damages for loss of goodwill,
+ work stoppage, computer failure or malfunction, or any and all
+ other commercial damages or losses), even if such Contributor
+ has been advised of the possibility of such damages.
+
+ 9. Accepting Warranty or Additional Liability. While redistributing
+ the Work or Derivative Works thereof, You may choose to offer,
+ and charge a fee for, acceptance of support, warranty, indemnity,
+ or other liability obligations and/or rights consistent with this
+ License. However, in accepting such obligations, You may act only
+ on Your own behalf and on Your sole responsibility, not on behalf
+ of any other Contributor, and only if You agree to indemnify,
+ defend, and hold each Contributor harmless for any liability
+ incurred by, or claims asserted against, such Contributor by reason
+ of your accepting any such warranty or additional liability.
+
+ END OF TERMS AND CONDITIONS
+
+--------------------------------------------------------------------------------
+
+This product includes code from Apache Avro.
+
+Copyright: 2014 The Apache Software Foundation.
+Home page: https://avro.apache.org/
+License: http://www.apache.org/licenses/LICENSE-2.0
+
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b24a1d1/parquet-avro/src/main/resources/META-INF/NOTICE
----------------------------------------------------------------------
diff --git a/parquet-avro/src/main/resources/META-INF/NOTICE b/parquet-avro/src/main/resources/META-INF/NOTICE
new file mode 100644
index 0000000..7b5682c
--- /dev/null
+++ b/parquet-avro/src/main/resources/META-INF/NOTICE
@@ -0,0 +1,18 @@
+
+Apache Parquet MR (Incubating)
+Copyright 2014-2015 The Apache Software Foundation
+
+This product includes software developed at
+The Apache Software Foundation (http://www.apache.org/).
+
+--------------------------------------------------------------------------------
+
+This product includes code from Apache Avro, which includes the following in
+its NOTICE file:
+
+ Apache Avro
+ Copyright 2010-2015 The Apache Software Foundation
+
+ This product includes software developed at
+ The Apache Software Foundation (http://www.apache.org/).
+
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b24a1d1/parquet-avro/src/test/java/org/apache/parquet/avro/AvroTestUtil.java
----------------------------------------------------------------------
diff --git a/parquet-avro/src/test/java/org/apache/parquet/avro/AvroTestUtil.java b/parquet-avro/src/test/java/org/apache/parquet/avro/AvroTestUtil.java
index d5fe11a..f4682d6 100644
--- a/parquet-avro/src/test/java/org/apache/parquet/avro/AvroTestUtil.java
+++ b/parquet-avro/src/test/java/org/apache/parquet/avro/AvroTestUtil.java
@@ -19,11 +19,21 @@
package org.apache.parquet.avro;
import com.google.common.collect.Lists;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
import java.util.Arrays;
+import java.util.List;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.hadoop.ParquetReader;
+import org.apache.parquet.hadoop.ParquetWriter;
import org.codehaus.jackson.node.NullNode;
+import org.junit.Assert;
+import org.junit.rules.TemporaryFolder;
public class AvroTestUtil {
@@ -66,4 +76,47 @@ public class AvroTestUtil {
return record;
}
+ public static <D> List<D> read(GenericData model, Schema schema, File file) throws IOException {
+ List<D> data = new ArrayList<D>();
+ Configuration conf = new Configuration(false);
+ AvroReadSupport.setRequestedProjection(conf, schema);
+ AvroReadSupport.setAvroReadSchema(conf, schema);
+ ParquetReader<D> fileReader = AvroParquetReader
+ .<D>builder(new Path(file.toString()))
+ .withDataModel(model) // reflect disables compatibility
+ .withConf(conf)
+ .build();
+
+ try {
+ D datum;
+ while ((datum = fileReader.read()) != null) {
+ data.add(datum);
+ }
+ } finally {
+ fileReader.close();
+ }
+
+ return data;
+ }
+
+ @SuppressWarnings("unchecked")
+ public static <D> File write(TemporaryFolder temp, GenericData model, Schema schema, D... data) throws IOException {
+ File file = temp.newFile();
+ Assert.assertTrue(file.delete());
+ ParquetWriter<D> writer = AvroParquetWriter
+ .<D>builder(new Path(file.toString()))
+ .withDataModel(model)
+ .withSchema(schema)
+ .build();
+
+ try {
+ for (D datum : data) {
+ writer.write(datum);
+ }
+ } finally {
+ writer.close();
+ }
+
+ return file;
+ }
}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b24a1d1/parquet-avro/src/test/java/org/apache/parquet/avro/TestAvroSchemaConverter.java
----------------------------------------------------------------------
diff --git a/parquet-avro/src/test/java/org/apache/parquet/avro/TestAvroSchemaConverter.java b/parquet-avro/src/test/java/org/apache/parquet/avro/TestAvroSchemaConverter.java
index b393615..942e3b1 100644
--- a/parquet-avro/src/test/java/org/apache/parquet/avro/TestAvroSchemaConverter.java
+++ b/parquet-avro/src/test/java/org/apache/parquet/avro/TestAvroSchemaConverter.java
@@ -20,16 +20,37 @@ package org.apache.parquet.avro;
import com.google.common.collect.Lists;
import com.google.common.io.Resources;
-import java.util.Arrays;
-import java.util.Collections;
+import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;
import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.MessageTypeParser;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
+import org.apache.parquet.schema.Types;
import org.codehaus.jackson.node.NullNode;
+import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
-import org.apache.parquet.schema.MessageType;
-import org.apache.parquet.schema.MessageTypeParser;
+import java.util.Arrays;
+import java.util.Collections;
+import static org.apache.avro.Schema.Type.INT;
+import static org.apache.avro.Schema.Type.LONG;
+import static org.apache.parquet.schema.OriginalType.DATE;
+import static org.apache.parquet.schema.OriginalType.TIMESTAMP_MICROS;
+import static org.apache.parquet.schema.OriginalType.TIMESTAMP_MILLIS;
+import static org.apache.parquet.schema.OriginalType.TIME_MICROS;
+import static org.apache.parquet.schema.OriginalType.TIME_MILLIS;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BOOLEAN;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.DOUBLE;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FLOAT;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT96;
+import static org.apache.parquet.schema.Type.Repetition.REQUIRED;
import static org.junit.Assert.assertEquals;
public class TestAvroSchemaConverter {
@@ -131,7 +152,7 @@ public class TestAvroSchemaConverter {
@Test(expected = IllegalArgumentException.class)
public void testTopLevelMustBeARecord() {
- new AvroSchemaConverter().convert(Schema.create(Schema.Type.INT));
+ new AvroSchemaConverter().convert(Schema.create(INT));
}
@Test
@@ -270,7 +291,7 @@ public class TestAvroSchemaConverter {
@Test
public void testOptionalFields() throws Exception {
Schema schema = Schema.createRecord("record1", null, null, false);
- Schema optionalInt = optional(Schema.create(Schema.Type.INT));
+ Schema optionalInt = optional(Schema.create(INT));
schema.setFields(Arrays.asList(
new Schema.Field("myint", optionalInt, null, NullNode.getInstance())
));
@@ -284,7 +305,7 @@ public class TestAvroSchemaConverter {
@Test
public void testOptionalMapValue() throws Exception {
Schema schema = Schema.createRecord("record1", null, null, false);
- Schema optionalIntMap = Schema.createMap(optional(Schema.create(Schema.Type.INT)));
+ Schema optionalIntMap = Schema.createMap(optional(Schema.create(INT)));
schema.setFields(Arrays.asList(
new Schema.Field("myintmap", optionalIntMap, null, null)
));
@@ -303,7 +324,7 @@ public class TestAvroSchemaConverter {
@Test
public void testOptionalArrayElement() throws Exception {
Schema schema = Schema.createRecord("record1", null, null, false);
- Schema optionalIntArray = Schema.createArray(optional(Schema.create(Schema.Type.INT)));
+ Schema optionalIntArray = Schema.createArray(optional(Schema.create(INT)));
schema.setFields(Arrays.asList(
new Schema.Field("myintarray", optionalIntArray, null, null)
));
@@ -323,7 +344,7 @@ public class TestAvroSchemaConverter {
Schema schema = Schema.createRecord("record2", null, null, false);
Schema multipleTypes = Schema.createUnion(Arrays.asList(Schema.create(Schema.Type
.NULL),
- Schema.create(Schema.Type.INT),
+ Schema.create(INT),
Schema.create(Schema.Type.FLOAT)));
schema.setFields(Arrays.asList(
new Schema.Field("myunion", multipleTypes, null, NullNode.getInstance())));
@@ -396,7 +417,7 @@ public class TestAvroSchemaConverter {
@Test
public void testOldAvroListOfLists() throws Exception {
Schema listOfLists = optional(Schema.createArray(Schema.createArray(
- Schema.create(Schema.Type.INT))));
+ Schema.create(INT))));
Schema schema = Schema.createRecord("AvroCompatListInList", null, null, false);
schema.setFields(Lists.newArrayList(
new Schema.Field("listOfLists", listOfLists, null, NullNode.getInstance())
@@ -425,7 +446,7 @@ public class TestAvroSchemaConverter {
@Test
public void testOldThriftListOfLists() throws Exception {
Schema listOfLists = optional(Schema.createArray(Schema.createArray(
- Schema.create(Schema.Type.INT))));
+ Schema.create(INT))));
Schema schema = Schema.createRecord("ThriftCompatListInList", null, null, false);
schema.setFields(Lists.newArrayList(
new Schema.Field("listOfLists", listOfLists, null, NullNode.getInstance())
@@ -458,7 +479,7 @@ public class TestAvroSchemaConverter {
// group's name, but it must be 2-level because the repeated group doesn't
// contain an optional or repeated element as required for 3-level lists
Schema listOfLists = optional(Schema.createArray(Schema.createArray(
- Schema.create(Schema.Type.INT))));
+ Schema.create(INT))));
Schema schema = Schema.createRecord("UnknownTwoLevelListInList", null, null, false);
schema.setFields(Lists.newArrayList(
new Schema.Field("listOfLists", listOfLists, null, NullNode.getInstance())
@@ -488,7 +509,7 @@ public class TestAvroSchemaConverter {
@Test
public void testParquetMapWithoutMapKeyValueAnnotation() throws Exception {
Schema schema = Schema.createRecord("myrecord", null, null, false);
- Schema map = Schema.createMap(Schema.create(Schema.Type.INT));
+ Schema map = Schema.createMap(Schema.create(INT));
schema.setFields(Collections.singletonList(new Schema.Field("mymap", map, null, null)));
String parquetSchema =
"message myrecord {\n" +
@@ -504,9 +525,240 @@ public class TestAvroSchemaConverter {
testParquetToAvroConversion(NEW_BEHAVIOR, schema, parquetSchema);
}
+ @Test
+ public void testDecimalBytesType() throws Exception {
+ Schema schema = Schema.createRecord("myrecord", null, null, false);
+ Schema decimal = LogicalTypes.decimal(9, 2).addToSchema(
+ Schema.create(Schema.Type.BYTES));
+ schema.setFields(Collections.singletonList(
+ new Schema.Field("dec", decimal, null, null)));
+
+ testRoundTripConversion(schema,
+ "message myrecord {\n" +
+ " required binary dec (DECIMAL(9,2));\n" +
+ "}\n");
+ }
+
+ @Test
+ public void testDecimalFixedType() throws Exception {
+ Schema schema = Schema.createRecord("myrecord", null, null, false);
+ Schema decimal = LogicalTypes.decimal(9, 2).addToSchema(
+ Schema.createFixed("dec", null, null, 8));
+ schema.setFields(Collections.singletonList(
+ new Schema.Field("dec", decimal, null, null)));
+
+ testRoundTripConversion(schema,
+ "message myrecord {\n" +
+ " required fixed_len_byte_array(8) dec (DECIMAL(9,2));\n" +
+ "}\n");
+ }
+
+ @Test
+ public void testDecimalIntegerType() throws Exception {
+ Schema expected = Schema.createRecord("myrecord", null, null, false,
+ Arrays.asList(new Schema.Field(
+ "dec", Schema.create(INT), null, null)));
+
+ // the decimal portion is lost because it isn't valid in Avro
+ testParquetToAvroConversion(expected,
+ "message myrecord {\n" +
+ " required int32 dec (DECIMAL(9,2));\n" +
+ "}\n");
+ }
+
+ @Test
+ public void testDecimalLongType() throws Exception {
+ Schema expected = Schema.createRecord("myrecord", null, null, false,
+ Arrays.asList(new Schema.Field("dec", Schema.create(LONG), null, null)));
+
+ // the decimal portion is lost because it isn't valid in Avro
+ testParquetToAvroConversion(expected,
+ "message myrecord {\n" +
+ " required int64 dec (DECIMAL(9,2));\n" +
+ "}\n");
+ }
+
+ @Test
+ public void testDateType() throws Exception {
+ Schema date = LogicalTypes.date().addToSchema(Schema.create(INT));
+ Schema expected = Schema.createRecord("myrecord", null, null, false,
+ Arrays.asList(new Schema.Field("date", date, null, null)));
+
+ testRoundTripConversion(expected,
+ "message myrecord {\n" +
+ " required int32 date (DATE);\n" +
+ "}\n");
+
+ for (PrimitiveTypeName primitive : new PrimitiveTypeName[]
+ {INT64, INT96, FLOAT, DOUBLE, BOOLEAN, BINARY, FIXED_LEN_BYTE_ARRAY}) {
+ final PrimitiveType type;
+ if (primitive == FIXED_LEN_BYTE_ARRAY) {
+ type = new PrimitiveType(REQUIRED, primitive, 12, "test", DATE);
+ } else {
+ type = new PrimitiveType(REQUIRED, primitive, "test", DATE);
+ }
+
+ assertThrows("Should not allow TIME_MICROS with " + primitive,
+ IllegalArgumentException.class, new Runnable() {
+ @Override
+ public void run() {
+ new AvroSchemaConverter().convert(message(type));
+ }
+ });
+ }
+ }
+
+ @Test
+ public void testTimeMillisType() throws Exception {
+ Schema date = LogicalTypes.timeMillis().addToSchema(Schema.create(INT));
+ Schema expected = Schema.createRecord("myrecord", null, null, false,
+ Arrays.asList(new Schema.Field("time", date, null, null)));
+
+ testRoundTripConversion(expected,
+ "message myrecord {\n" +
+ " required int32 time (TIME_MILLIS);\n" +
+ "}\n");
+
+ for (PrimitiveTypeName primitive : new PrimitiveTypeName[]
+ {INT64, INT96, FLOAT, DOUBLE, BOOLEAN, BINARY, FIXED_LEN_BYTE_ARRAY}) {
+ final PrimitiveType type;
+ if (primitive == FIXED_LEN_BYTE_ARRAY) {
+ type = new PrimitiveType(REQUIRED, primitive, 12, "test", TIME_MILLIS);
+ } else {
+ type = new PrimitiveType(REQUIRED, primitive, "test", TIME_MILLIS);
+ }
+
+ assertThrows("Should not allow TIME_MICROS with " + primitive,
+ IllegalArgumentException.class, new Runnable() {
+ @Override
+ public void run() {
+ new AvroSchemaConverter().convert(message(type));
+ }
+ });
+ }
+ }
+
+ @Test
+ public void testTimeMicrosType() throws Exception {
+ Schema date = LogicalTypes.timeMicros().addToSchema(Schema.create(LONG));
+ Schema expected = Schema.createRecord("myrecord", null, null, false,
+ Arrays.asList(new Schema.Field("time", date, null, null)));
+
+ testRoundTripConversion(expected,
+ "message myrecord {\n" +
+ " required int64 time (TIME_MICROS);\n" +
+ "}\n");
+
+ for (PrimitiveTypeName primitive : new PrimitiveTypeName[]
+ {INT32, INT96, FLOAT, DOUBLE, BOOLEAN, BINARY, FIXED_LEN_BYTE_ARRAY}) {
+ final PrimitiveType type;
+ if (primitive == FIXED_LEN_BYTE_ARRAY) {
+ type = new PrimitiveType(REQUIRED, primitive, 12, "test", TIME_MICROS);
+ } else {
+ type = new PrimitiveType(REQUIRED, primitive, "test", TIME_MICROS);
+ }
+
+ assertThrows("Should not allow TIME_MICROS with " + primitive,
+ IllegalArgumentException.class, new Runnable() {
+ @Override
+ public void run() {
+ new AvroSchemaConverter().convert(message(type));
+ }
+ });
+ }
+ }
+
+ @Test
+ public void testTimestampMillisType() throws Exception {
+ Schema date = LogicalTypes.timestampMillis().addToSchema(Schema.create(LONG));
+ Schema expected = Schema.createRecord("myrecord", null, null, false,
+ Arrays.asList(new Schema.Field("timestamp", date, null, null)));
+
+ testRoundTripConversion(expected,
+ "message myrecord {\n" +
+ " required int64 timestamp (TIMESTAMP_MILLIS);\n" +
+ "}\n");
+
+ for (PrimitiveTypeName primitive : new PrimitiveTypeName[]
+ {INT32, INT96, FLOAT, DOUBLE, BOOLEAN, BINARY, FIXED_LEN_BYTE_ARRAY}) {
+ final PrimitiveType type;
+ if (primitive == FIXED_LEN_BYTE_ARRAY) {
+ type = new PrimitiveType(REQUIRED, primitive, 12, "test", TIMESTAMP_MILLIS);
+ } else {
+ type = new PrimitiveType(REQUIRED, primitive, "test", TIMESTAMP_MILLIS);
+ }
+
+ assertThrows("Should not allow TIMESTAMP_MILLIS with " + primitive,
+ IllegalArgumentException.class, new Runnable() {
+ @Override
+ public void run() {
+ new AvroSchemaConverter().convert(message(type));
+ }
+ });
+ }
+ }
+
+ @Test
+ public void testTimestampMicrosType() throws Exception {
+ Schema date = LogicalTypes.timestampMicros().addToSchema(Schema.create(LONG));
+ Schema expected = Schema.createRecord("myrecord", null, null, false,
+ Arrays.asList(new Schema.Field("timestamp", date, null, null)));
+
+ testRoundTripConversion(expected,
+ "message myrecord {\n" +
+ " required int64 timestamp (TIMESTAMP_MICROS);\n" +
+ "}\n");
+
+ for (PrimitiveTypeName primitive : new PrimitiveTypeName[]
+ {INT32, INT96, FLOAT, DOUBLE, BOOLEAN, BINARY, FIXED_LEN_BYTE_ARRAY}) {
+ final PrimitiveType type;
+ if (primitive == FIXED_LEN_BYTE_ARRAY) {
+ type = new PrimitiveType(REQUIRED, primitive, 12, "test", TIMESTAMP_MICROS);
+ } else {
+ type = new PrimitiveType(REQUIRED, primitive, "test", TIMESTAMP_MICROS);
+ }
+
+ assertThrows("Should not allow TIMESTAMP_MICROS with " + primitive,
+ IllegalArgumentException.class, new Runnable() {
+ @Override
+ public void run() {
+ new AvroSchemaConverter().convert(message(type));
+ }
+ });
+ }
+ }
+
public static Schema optional(Schema original) {
return Schema.createUnion(Lists.newArrayList(
Schema.create(Schema.Type.NULL),
original));
}
+
+ public static MessageType message(PrimitiveType primitive) {
+ return Types.buildMessage()
+ .addField(primitive)
+ .named("myrecord");
+ }
+
+ /**
+ * A convenience method to avoid a large number of @Test(expected=...) tests
+ * @param message A String message to describe this assertion
+ * @param expected An Exception class that the Runnable should throw
+ * @param runnable A Runnable that is expected to throw the exception
+ */
+ public static void assertThrows(
+ String message, Class<? extends Exception> expected, Runnable runnable) {
+ try {
+ runnable.run();
+ Assert.fail("No exception was thrown (" + message + "), expected: " +
+ expected.getName());
+ } catch (Exception actual) {
+ try {
+ Assert.assertEquals(message, expected, actual.getClass());
+ } catch (AssertionError e) {
+ e.addSuppressed(actual);
+ throw e;
+ }
+ }
+ }
}