You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ec...@apache.org on 2012/06/02 04:14:06 UTC
svn commit: r1345420 [2/3] - in /hive/trunk: data/files/
ql/src/java/org/apache/hadoop/hive/ql/io/avro/
ql/src/test/queries/clientpositive/ ql/src/test/results/clientpositive/
serde/ serde/src/java/org/apache/hadoop/hive/serde2/avro/
serde/src/test/org...
Added: hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerdeUtils.java
URL: http://svn.apache.org/viewvc/hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerdeUtils.java?rev=1345420&view=auto
==============================================================================
--- hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerdeUtils.java (added)
+++ hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerdeUtils.java Sat Jun 2 02:14:04 2012
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.serde2.avro;
+
+
+import org.apache.avro.Schema;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.mapred.JobConf;
+
+import java.io.IOException;
+import java.net.URL;
+import java.util.List;
+import java.util.Properties;
+
+/**
+ * Utilities useful only to the AvroSerde itself. Not mean to be used by
+ * end-users but public for interop to the ql package.
+ */
+public class AvroSerdeUtils {
+ private static final Log LOG = LogFactory.getLog(AvroSerdeUtils.class);
+
+ public static final String SCHEMA_LITERAL = "avro.schema.literal";
+ public static final String SCHEMA_URL = "avro.schema.url";
+ public static final String SCHEMA_NONE = "none";
+ public static final String EXCEPTION_MESSAGE = "Neither " + SCHEMA_LITERAL + " nor "
+ + SCHEMA_URL + " specified, can't determine table schema";
+ public static final String AVRO_SERDE_SCHEMA = "avro.serde.schema";
+
+ /**
+ * Determine the schema to that's been provided for Avro serde work.
+ * @param properties containing a key pointing to the schema, one way or another
+ * @return schema to use while serdeing the avro file
+ * @throws IOException if error while trying to read the schema from another location
+ * @throws AvroSerdeException if unable to find a schema or pointer to it in the properties
+ */
+ public static Schema determineSchemaOrThrowException(Properties properties)
+ throws IOException, AvroSerdeException {
+ String schemaString = properties.getProperty(SCHEMA_LITERAL);
+ if(schemaString != null && !schemaString.equals(SCHEMA_NONE))
+ return Schema.parse(schemaString);
+
+ // Try pulling directly from URL
+ schemaString = properties.getProperty(SCHEMA_URL);
+ if(schemaString == null || schemaString.equals(SCHEMA_NONE))
+ throw new AvroSerdeException(EXCEPTION_MESSAGE);
+
+ try {
+ if(schemaString.toLowerCase().startsWith("hdfs://"))
+ return getSchemaFromHDFS(schemaString, new Configuration());
+ } catch(IOException ioe) {
+ throw new AvroSerdeException("Unable to read schema from HDFS: " + schemaString, ioe);
+ }
+
+ return Schema.parse(new URL(schemaString).openStream());
+ }
+
+ /**
+ * Attempt to determine the schema via the usual means, but do not throw
+ * an exception if we fail. Instead, signal failure via a special
+ * schema. This is used because Hive calls init on the serde during
+ * any call, including calls to update the serde properties, meaning
+ * if the serde is in a bad state, there is no way to update that state.
+ */
+ public static Schema determineSchemaOrReturnErrorSchema(Properties props) {
+ try {
+ return determineSchemaOrThrowException(props);
+ } catch(AvroSerdeException he) {
+ LOG.warn("Encountered AvroSerdeException determining schema. Returning " +
+ "signal schema to indicate problem", he);
+ return SchemaResolutionProblem.SIGNAL_BAD_SCHEMA;
+ } catch (Exception e) {
+ LOG.warn("Encountered exception determining schema. Returning signal " +
+ "schema to indicate problem", e);
+ return SchemaResolutionProblem.SIGNAL_BAD_SCHEMA;
+ }
+ }
+ // Protected for testing and so we can pass in a conf for testing.
+ protected static Schema getSchemaFromHDFS(String schemaHDFSUrl,
+ Configuration conf) throws IOException {
+ FileSystem fs = FileSystem.get(conf);
+ FSDataInputStream in = null;
+
+ try {
+ in = fs.open(new Path(schemaHDFSUrl));
+ Schema s = Schema.parse(in);
+ return s;
+ } finally {
+ if(in != null) in.close();
+ }
+ }
+
+ /**
+ * Determine if an Avro schema is of type Union[T, NULL]. Avro supports nullable
+ * types via a union of type T and null. This is a very common use case.
+ * As such, we want to silently convert it to just T and allow the value to be null.
+ *
+ * @return true if type represents Union[T, Null], false otherwise
+ */
+ public static boolean isNullableType(Schema schema) {
+ return schema.getType().equals(Schema.Type.UNION) &&
+ schema.getTypes().size() == 2 &&
+ (schema.getTypes().get(0).getType().equals(Schema.Type.NULL) ||
+ schema.getTypes().get(1).getType().equals(Schema.Type.NULL));
+ // [null, null] not allowed, so this check is ok.
+ }
+
+ /**
+ * In a nullable type, get the schema for the non-nullable type. This method
+ * does no checking that the provides Schema is nullable.
+ */
+ public static Schema getOtherTypeFromNullableType(Schema schema) {
+ List<Schema> types = schema.getTypes();
+
+ return types.get(0).getType().equals(Schema.Type.NULL) ? types.get(1) : types.get(0);
+ }
+
+ /**
+ * Determine if we're being executed from within an MR job or as part
+ * of a select * statement. The signals for this varies between Hive versions.
+ * @param job that contains things that are or are not set in a job
+ * @return Are we in a job or not?
+ */
+ public static boolean insideMRJob(JobConf job) {
+ return job != null
+ && (HiveConf.getVar(job, HiveConf.ConfVars.PLAN) != null)
+ && (!HiveConf.getVar(job, HiveConf.ConfVars.PLAN).isEmpty());
+ }
+}
Added: hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerializer.java
URL: http://svn.apache.org/viewvc/hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerializer.java?rev=1345420&view=auto
==============================================================================
--- hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerializer.java (added)
+++ hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerializer.java Sat Jun 2 02:14:04 2012
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.serde2.avro;
+
+
+import org.apache.avro.Schema;
+import org.apache.avro.Schema.Field;
+import org.apache.avro.generic.GenericData;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.UnionObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.UnionTypeInfo;
+import org.apache.hadoop.io.Writable;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Hashtable;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.avro.Schema.Type.BYTES;
+import static org.apache.avro.Schema.Type.FIXED;
+
+class AvroSerializer {
+ private static final Log LOG = LogFactory.getLog(AvroSerializer.class);
+
+ AvroGenericRecordWritable cache = new AvroGenericRecordWritable();
+
+ // Hive is pretty simple (read: stupid) in writing out values via the serializer.
+ // We're just going to go through, matching indices. Hive formats normally
+ // handle mismatches with null. We don't have that option, so instead we'll
+ // end up throwing an exception for invalid records.
+ public Writable serialize(Object o, ObjectInspector objectInspector, List<String> columnNames, List<TypeInfo> columnTypes, Schema schema) throws AvroSerdeException {
+ StructObjectInspector soi = (StructObjectInspector) objectInspector;
+ GenericData.Record record = new GenericData.Record(schema);
+
+ List<? extends StructField> outputFieldRefs = soi.getAllStructFieldRefs();
+ if(outputFieldRefs.size() != columnNames.size())
+ throw new AvroSerdeException("Number of input columns was different than output columns (in = " + columnNames.size() + " vs out = " + outputFieldRefs.size());
+
+ int size = schema.getFields().size();
+ if(outputFieldRefs.size() != size) // Hive does this check for us, so we should be ok.
+ throw new AvroSerdeException("Hive passed in a different number of fields than the schema expected: (Hive wanted " + outputFieldRefs.size() +", Avro expected " + schema.getFields().size());
+
+ List<? extends StructField> allStructFieldRefs = soi.getAllStructFieldRefs();
+ List<Object> structFieldsDataAsList = soi.getStructFieldsDataAsList(o);
+
+ for(int i = 0; i < size; i++) {
+ Field field = schema.getFields().get(i);
+ TypeInfo typeInfo = columnTypes.get(i);
+ StructField structFieldRef = allStructFieldRefs.get(i);
+ Object structFieldData = structFieldsDataAsList.get(i);
+ ObjectInspector fieldOI = structFieldRef.getFieldObjectInspector();
+
+ Object val = serialize(typeInfo, fieldOI, structFieldData, field.schema());
+ record.put(field.name(), val);
+ }
+
+ if(!GenericData.get().validate(schema, record))
+ throw new SerializeToAvroException(schema, record);
+
+ cache.setRecord(record);
+
+ return cache;
+ }
+
+ private Object serialize(TypeInfo typeInfo, ObjectInspector fieldOI, Object structFieldData, Schema schema) throws AvroSerdeException {
+ switch(typeInfo.getCategory()) {
+ case PRIMITIVE:
+ assert fieldOI instanceof PrimitiveObjectInspector;
+ return serializePrimitive(typeInfo, (PrimitiveObjectInspector) fieldOI, structFieldData);
+ case MAP:
+ assert fieldOI instanceof MapObjectInspector;
+ assert typeInfo instanceof MapTypeInfo;
+ return serializeMap((MapTypeInfo) typeInfo, (MapObjectInspector) fieldOI, structFieldData, schema);
+ case LIST:
+ assert fieldOI instanceof ListObjectInspector;
+ assert typeInfo instanceof ListTypeInfo;
+ return serializeList((ListTypeInfo) typeInfo, (ListObjectInspector) fieldOI, structFieldData, schema);
+ case UNION:
+ assert fieldOI instanceof UnionObjectInspector;
+ assert typeInfo instanceof UnionTypeInfo;
+ return serializeUnion((UnionTypeInfo) typeInfo, (UnionObjectInspector) fieldOI, structFieldData, schema);
+ case STRUCT:
+ assert fieldOI instanceof StructObjectInspector;
+ assert typeInfo instanceof StructTypeInfo;
+ return serializeStruct((StructTypeInfo) typeInfo, (StructObjectInspector) fieldOI, structFieldData, schema);
+ default:
+ throw new AvroSerdeException("Ran out of TypeInfo Categories: " + typeInfo.getCategory());
+ }
+ }
+
+ private Object serializeStruct(StructTypeInfo typeInfo, StructObjectInspector ssoi, Object o, Schema schema) throws AvroSerdeException {
+ int size = schema.getFields().size();
+ List<? extends StructField> allStructFieldRefs = ssoi.getAllStructFieldRefs();
+ List<Object> structFieldsDataAsList = ssoi.getStructFieldsDataAsList(o);
+ GenericData.Record record = new GenericData.Record(schema);
+ ArrayList<TypeInfo> allStructFieldTypeInfos = typeInfo.getAllStructFieldTypeInfos();
+
+ for(int i = 0; i < size; i++) {
+ Field field = schema.getFields().get(i);
+ TypeInfo colTypeInfo = allStructFieldTypeInfos.get(i);
+ StructField structFieldRef = allStructFieldRefs.get(i);
+ Object structFieldData = structFieldsDataAsList.get(i);
+ ObjectInspector fieldOI = structFieldRef.getFieldObjectInspector();
+
+ Object val = serialize(colTypeInfo, fieldOI, structFieldData, field.schema());
+ record.put(field.name(), val);
+ }
+ return record;
+ }
+
+ private Object serializePrimitive(TypeInfo typeInfo, PrimitiveObjectInspector fieldOI, Object structFieldData) throws AvroSerdeException {
+ switch(fieldOI.getPrimitiveCategory()) {
+ case UNKNOWN:
+ throw new AvroSerdeException("Received UNKNOWN primitive category.");
+ case VOID:
+ return null;
+ default: // All other primitive types are simple
+ return fieldOI.getPrimitiveJavaObject(structFieldData);
+ }
+ }
+
+ private Object serializeUnion(UnionTypeInfo typeInfo, UnionObjectInspector fieldOI, Object structFieldData, Schema schema) throws AvroSerdeException {
+ byte tag = fieldOI.getTag(structFieldData);
+
+ // Invariant that Avro's tag ordering must match Hive's.
+ return serialize(typeInfo.getAllUnionObjectTypeInfos().get(tag),
+ fieldOI.getObjectInspectors().get(tag),
+ fieldOI.getField(structFieldData),
+ schema.getTypes().get(tag));
+ }
+
+ // We treat FIXED and BYTES as arrays of tinyints within Hive. Check
+ // if we're dealing with either of these types and thus need to serialize
+ // them as their Avro types.
+ private boolean isTransformedType(Schema schema) {
+ return schema.getType().equals(FIXED) || schema.getType().equals(BYTES);
+ }
+
+ private Object serializeTransformedType(ListTypeInfo typeInfo, ListObjectInspector fieldOI, Object structFieldData, Schema schema) throws AvroSerdeException {
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Beginning to transform " + typeInfo + " with Avro schema " + schema.toString(false));
+ }
+ if(schema.getType().equals(FIXED)) return serializedAvroFixed(typeInfo, fieldOI, structFieldData, schema);
+ else return serializeAvroBytes(typeInfo, fieldOI, structFieldData, schema);
+
+ }
+
+ private Object serializeAvroBytes(ListTypeInfo typeInfo, ListObjectInspector fieldOI, Object structFieldData, Schema schema) throws AvroSerdeException {
+ ByteBuffer bb = ByteBuffer.wrap(extraByteArray(fieldOI, structFieldData));
+ return bb.rewind();
+ }
+
+ private Object serializedAvroFixed(ListTypeInfo typeInfo, ListObjectInspector fieldOI, Object structFieldData, Schema schema) throws AvroSerdeException {
+ return new GenericData.Fixed(schema, extraByteArray(fieldOI, structFieldData));
+ }
+
+ // For transforming to BYTES and FIXED, pull out the byte array Avro will want
+ private byte[] extraByteArray(ListObjectInspector fieldOI, Object structFieldData) throws AvroSerdeException {
+ // Grab a book. This is going to be slow.
+ int listLength = fieldOI.getListLength(structFieldData);
+ byte[] bytes = new byte[listLength];
+ assert fieldOI.getListElementObjectInspector() instanceof PrimitiveObjectInspector;
+ PrimitiveObjectInspector poi = (PrimitiveObjectInspector)fieldOI.getListElementObjectInspector();
+ List<?> list = fieldOI.getList(structFieldData);
+
+ for(int i = 0; i < listLength; i++) {
+ Object b = poi.getPrimitiveJavaObject(list.get(i));
+ if(!(b instanceof Byte))
+ throw new AvroSerdeException("Attempting to transform to bytes, element was not byte but " + b.getClass().getCanonicalName());
+ bytes[i] = (Byte)b;
+ }
+ return bytes;
+ }
+
+ private Object serializeList(ListTypeInfo typeInfo, ListObjectInspector fieldOI, Object structFieldData, Schema schema) throws AvroSerdeException {
+ if(isTransformedType(schema))
+ return serializeTransformedType(typeInfo, fieldOI, structFieldData, schema);
+
+ List<?> list = fieldOI.getList(structFieldData);
+ List<Object> deserialized = new ArrayList<Object>(list.size());
+
+ TypeInfo listElementTypeInfo = typeInfo.getListElementTypeInfo();
+ ObjectInspector listElementObjectInspector = fieldOI.getListElementObjectInspector();
+ Schema elementType = schema.getElementType();
+
+ for(int i = 0; i < list.size(); i++) {
+ deserialized.add(i, serialize(listElementTypeInfo, listElementObjectInspector, list.get(i), elementType));
+ }
+
+ return deserialized;
+ }
+
+ private Object serializeMap(MapTypeInfo typeInfo, MapObjectInspector fieldOI, Object structFieldData, Schema schema) throws AvroSerdeException {
+ // Avro only allows maps with string keys
+ if(!mapHasStringKey(fieldOI.getMapKeyObjectInspector()))
+ throw new AvroSerdeException("Avro only supports maps with keys as Strings. Current Map is: " + typeInfo.toString());
+
+ ObjectInspector mapKeyObjectInspector = fieldOI.getMapKeyObjectInspector();
+ ObjectInspector mapValueObjectInspector = fieldOI.getMapValueObjectInspector();
+ TypeInfo mapKeyTypeInfo = typeInfo.getMapKeyTypeInfo();
+ TypeInfo mapValueTypeInfo = typeInfo.getMapValueTypeInfo();
+ Map<?,?> map = fieldOI.getMap(structFieldData);
+ Schema valueType = schema.getValueType();
+
+ Map<Object, Object> deserialized = new Hashtable<Object, Object>(fieldOI.getMapSize(structFieldData));
+
+ for (Map.Entry<?, ?> entry : map.entrySet()) {
+ deserialized.put(serialize(mapKeyTypeInfo, mapKeyObjectInspector, entry.getKey(), null), // This works, but is a bit fragile. Construct a single String schema?
+ serialize(mapValueTypeInfo, mapValueObjectInspector, entry.getValue(), valueType));
+ }
+
+ return deserialized;
+ }
+
+ private boolean mapHasStringKey(ObjectInspector mapKeyObjectInspector) {
+ return mapKeyObjectInspector instanceof PrimitiveObjectInspector &&
+ ((PrimitiveObjectInspector) mapKeyObjectInspector)
+ .getPrimitiveCategory()
+ .equals(PrimitiveObjectInspector.PrimitiveCategory.STRING);
+ }
+
+ /**
+ * Thrown when, during serialization of a Hive row to an Avro record, Avro
+ * cannot verify the converted row to the record's schema.
+ */
+ public static class SerializeToAvroException extends AvroSerdeException {
+ final private Schema schema;
+ final private GenericData.Record record;
+
+ public SerializeToAvroException(Schema schema, GenericData.Record record) {
+ this.schema = schema;
+ this.record = record;
+ }
+
+ @Override
+ public String toString() {
+ return "Avro could not validate record against schema (record = " + record
+ + ") (schema = "+schema.toString(false) + ")";
+ }
+ }
+}
Added: hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/avro/BadSchemaException.java
URL: http://svn.apache.org/viewvc/hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/avro/BadSchemaException.java?rev=1345420&view=auto
==============================================================================
--- hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/avro/BadSchemaException.java (added)
+++ hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/avro/BadSchemaException.java Sat Jun 2 02:14:04 2012
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.serde2.avro;
+
+public class BadSchemaException extends AvroSerdeException {
+}
Added: hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/avro/InstanceCache.java
URL: http://svn.apache.org/viewvc/hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/avro/InstanceCache.java?rev=1345420&view=auto
==============================================================================
--- hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/avro/InstanceCache.java (added)
+++ hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/avro/InstanceCache.java Sat Jun 2 02:14:04 2012
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.serde2.avro;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.util.HashMap;
+
+/**
+ * Cache for objects whose creation only depends on some other set of objects
+ * and therefore can be used against other equivalent versions of those
+ * objects. Essentially memoizes instance creation.
+ *
+ * @param <SeedObject> Object that determines the instance
+ * @param <Instance> Instance that will be created from SeedObject.
+ */
+public abstract class InstanceCache<SeedObject, Instance> {
+ private static final Log LOG = LogFactory.getLog(InstanceCache.class);
+ HashMap<Integer, Instance> cache = new HashMap<Integer, Instance>();
+
+ public InstanceCache() {}
+
+ /**
+ * Retrieve (or create if it doesn't exist) the correct Instance for this
+ * SeedObject
+ */
+ public Instance retrieve(SeedObject hv) throws AvroSerdeException {
+ if(LOG.isDebugEnabled()) LOG.debug("Checking for hv: " + hv.toString());
+
+ if(cache.containsKey(hv.hashCode())) {
+ if(LOG.isDebugEnabled()) LOG.debug("Returning cache result.");
+ return cache.get(hv.hashCode());
+ }
+
+ if(LOG.isDebugEnabled()) LOG.debug("Creating new instance and storing in cache");
+
+ Instance instance = makeInstance(hv);
+ cache.put(hv.hashCode(), instance);
+ return instance;
+ }
+
+ protected abstract Instance makeInstance(SeedObject hv) throws AvroSerdeException;
+}
Added: hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/avro/ReaderWriterSchemaPair.java
URL: http://svn.apache.org/viewvc/hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/avro/ReaderWriterSchemaPair.java?rev=1345420&view=auto
==============================================================================
--- hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/avro/ReaderWriterSchemaPair.java (added)
+++ hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/avro/ReaderWriterSchemaPair.java Sat Jun 2 02:14:04 2012
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.serde2.avro;
+
+import org.apache.avro.Schema;
+
+/**
+ * Simple pair class used for memoizing schema-reencoding operations.
+ */
+class ReaderWriterSchemaPair {
+ final Schema reader;
+ final Schema writer;
+
+ public ReaderWriterSchemaPair(Schema writer, Schema reader) {
+ this.reader = reader;
+ this.writer = writer;
+ }
+
+ public Schema getReader() {
+ return reader;
+ }
+
+ public Schema getWriter() {
+ return writer;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ ReaderWriterSchemaPair that = (ReaderWriterSchemaPair) o;
+
+ if (!reader.equals(that.reader)) return false;
+ if (!writer.equals(that.writer)) return false;
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ int result = reader.hashCode();
+ result = 31 * result + writer.hashCode();
+ return result;
+ }
+}
Added: hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/avro/SchemaResolutionProblem.java
URL: http://svn.apache.org/viewvc/hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/avro/SchemaResolutionProblem.java?rev=1345420&view=auto
==============================================================================
--- hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/avro/SchemaResolutionProblem.java (added)
+++ hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/avro/SchemaResolutionProblem.java Sat Jun 2 02:14:04 2012
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.serde2.avro;
+
+import org.apache.avro.Schema;
+
+class SchemaResolutionProblem {
+ static final String sentinelString = "{\n" +
+ " \"namespace\": \"org.apache.hadoop.hive\",\n" +
+ " \"name\": \"CannotDetermineSchemaSentinel\",\n" +
+ " \"type\": \"record\",\n" +
+ " \"fields\": [\n" +
+ " {\n" +
+ " \"name\":\"ERROR_ERROR_ERROR_ERROR_ERROR_ERROR_ERROR\",\n" +
+ " \"type\":\"string\"\n" +
+ " },\n" +
+ " {\n" +
+ " \"name\":\"Cannot_determine_schema\",\n" +
+ " \"type\":\"string\"\n" +
+ " },\n" +
+ " {\n" +
+ " \"name\":\"check\",\n" +
+ " \"type\":\"string\"\n" +
+ " },\n" +
+ " {\n" +
+ " \"name\":\"schema\",\n" +
+ " \"type\":\"string\"\n" +
+ " },\n" +
+ " {\n" +
+ " \"name\":\"url\",\n" +
+ " \"type\":\"string\"\n" +
+ " },\n" +
+ " {\n" +
+ " \"name\":\"and\",\n" +
+ " \"type\":\"string\"\n" +
+ " },\n" +
+ " {\n" +
+ " \"name\":\"literal\",\n" +
+ " \"type\":\"string\"\n" +
+ " }\n" +
+ " ]\n" +
+ "}";
+ public final static Schema SIGNAL_BAD_SCHEMA = Schema.parse(sentinelString);
+}
Added: hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/avro/SchemaToTypeInfo.java
URL: http://svn.apache.org/viewvc/hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/avro/SchemaToTypeInfo.java?rev=1345420&view=auto
==============================================================================
--- hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/avro/SchemaToTypeInfo.java (added)
+++ hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/avro/SchemaToTypeInfo.java Sat Jun 2 02:14:04 2012
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.serde2.avro;
+
+import org.apache.avro.Schema;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Hashtable;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.avro.Schema.Type.BOOLEAN;
+import static org.apache.avro.Schema.Type.DOUBLE;
+import static org.apache.avro.Schema.Type.FLOAT;
+import static org.apache.avro.Schema.Type.INT;
+import static org.apache.avro.Schema.Type.LONG;
+import static org.apache.avro.Schema.Type.NULL;
+import static org.apache.avro.Schema.Type.STRING;
+
+/**
+ * Convert an Avro Schema to a Hive TypeInfo
+ */
+class SchemaToTypeInfo {
+ // Conversion of Avro primitive types to Hive primitive types
+ // Avro Hive
+ // Null
+ // boolean boolean check
+ // int int check
+ // long bigint check
+ // float double check
+ // double double check
+ // bytes
+ // string string check
+ // tinyint
+ // smallint
+
+ // Map of Avro's primitive types to Hives (for those that are supported by both)
+ private static final Map<Schema.Type, TypeInfo> primitiveTypeToTypeInfo = initTypeMap();
+ private static Map<Schema.Type, TypeInfo> initTypeMap() {
+ Map<Schema.Type, TypeInfo> theMap = new Hashtable<Schema.Type, TypeInfo>();
+ theMap.put(STRING, TypeInfoFactory.getPrimitiveTypeInfo("string"));
+ theMap.put(INT, TypeInfoFactory.getPrimitiveTypeInfo("int"));
+ theMap.put(BOOLEAN, TypeInfoFactory.getPrimitiveTypeInfo("boolean"));
+ theMap.put(LONG, TypeInfoFactory.getPrimitiveTypeInfo("bigint"));
+ theMap.put(FLOAT, TypeInfoFactory.getPrimitiveTypeInfo("float"));
+ theMap.put(DOUBLE, TypeInfoFactory.getPrimitiveTypeInfo("double"));
+ theMap.put(NULL, TypeInfoFactory.getPrimitiveTypeInfo("void"));
+ return Collections.unmodifiableMap(theMap);
+ }
+
+ /**
+ * Generate a list of of TypeInfos from an Avro schema. This method is
+ * currently public due to some weirdness in deserializing unions, but
+ * will be made private once that is resolved.
+ * @param schema Schema to generate field types for
+ * @return List of TypeInfos, each element of which is a TypeInfo derived
+ * from the schema.
+ * @throws AvroSerdeException for problems during conversion.
+ */
+ public static List<TypeInfo> generateColumnTypes(Schema schema) throws AvroSerdeException {
+ List<Schema.Field> fields = schema.getFields();
+
+ List<TypeInfo> types = new ArrayList<TypeInfo>(fields.size());
+
+ for (Schema.Field field : fields) {
+ types.add(generateTypeInfo(field.schema()));
+ }
+
+ return types;
+ }
+
+ static InstanceCache<Schema, TypeInfo> typeInfoCache = new InstanceCache<Schema, TypeInfo>() {
+ @Override
+ protected TypeInfo makeInstance(Schema s) throws AvroSerdeException {
+ return generateTypeInfoWorker(s);
+ }
+ };
+ /**
+ * Convert an Avro Schema into an equivalent Hive TypeInfo.
+ * @param schema to record. Must be of record type.
+ * @return TypeInfo matching the Avro schema
+ * @throws AvroSerdeException for any problems during conversion.
+ */
+ public static TypeInfo generateTypeInfo(Schema schema) throws AvroSerdeException {
+ return typeInfoCache.retrieve(schema);
+ }
+
+ private static TypeInfo generateTypeInfoWorker(Schema schema) throws AvroSerdeException {
+ // Avro requires NULLable types to be defined as unions of some type T
+ // and NULL. This is annoying and we're going to hide it from the user.
+ if(AvroSerdeUtils.isNullableType(schema))
+ return generateTypeInfo(AvroSerdeUtils.getOtherTypeFromNullableType(schema));
+
+ Schema.Type type = schema.getType();
+
+ if(primitiveTypeToTypeInfo.containsKey(type))
+ return primitiveTypeToTypeInfo.get(type);
+
+ switch(type) {
+ case BYTES: return generateBytesTypeInfo(schema);
+ case RECORD: return generateRecordTypeInfo(schema);
+ case MAP: return generateMapTypeInfo(schema);
+ case ARRAY: return generateArrayTypeInfo(schema);
+ case UNION: return generateUnionTypeInfo(schema);
+ case ENUM: return generateEnumTypeInfo(schema);
+ case FIXED: return generateFixedTypeInfo(schema);
+ default: throw new AvroSerdeException("Do not yet support: " + schema);
+ }
+ }
+
+ private static TypeInfo generateRecordTypeInfo(Schema schema) throws AvroSerdeException {
+ assert schema.getType().equals(Schema.Type.RECORD);
+
+ List<Schema.Field> fields = schema.getFields();
+ List<String> fieldNames = new ArrayList<String>(fields.size());
+ List<TypeInfo> typeInfos = new ArrayList<TypeInfo>(fields.size());
+
+ for(int i = 0; i < fields.size(); i++) {
+ fieldNames.add(i, fields.get(i).name());
+ typeInfos.add(i, generateTypeInfo(fields.get(i).schema()));
+ }
+
+ return TypeInfoFactory.getStructTypeInfo(fieldNames, typeInfos);
+ }
+
+ /**
+ * Generate a TypeInfo for an Avro Map. This is made slightly simpler in that
+ * Avro only allows maps with strings for keys.
+ */
+ private static TypeInfo generateMapTypeInfo(Schema schema) throws AvroSerdeException {
+ assert schema.getType().equals(Schema.Type.MAP);
+ Schema valueType = schema.getValueType();
+ TypeInfo ti = generateTypeInfo(valueType);
+
+ return TypeInfoFactory.getMapTypeInfo(TypeInfoFactory.getPrimitiveTypeInfo("string"), ti);
+ }
+
+ private static TypeInfo generateArrayTypeInfo(Schema schema) throws AvroSerdeException {
+ assert schema.getType().equals(Schema.Type.ARRAY);
+ Schema itemsType = schema.getElementType();
+ TypeInfo itemsTypeInfo = generateTypeInfo(itemsType);
+
+ return TypeInfoFactory.getListTypeInfo(itemsTypeInfo);
+ }
+
+ private static TypeInfo generateUnionTypeInfo(Schema schema) throws AvroSerdeException {
+ assert schema.getType().equals(Schema.Type.UNION);
+ List<Schema> types = schema.getTypes();
+
+
+ List<TypeInfo> typeInfos = new ArrayList<TypeInfo>(types.size());
+
+ for(Schema type : types) {
+ typeInfos.add(generateTypeInfo(type));
+ }
+
+ return TypeInfoFactory.getUnionTypeInfo(typeInfos);
+ }
+
+ // Hive doesn't have an Enum type, so we're going to treat them as Strings.
+ // During the deserialize/serialize stage we'll check for enumness and
+ // convert as such.
+ private static TypeInfo generateEnumTypeInfo(Schema schema) {
+ assert schema.getType().equals(Schema.Type.ENUM);
+
+ return TypeInfoFactory.getPrimitiveTypeInfo("string");
+ }
+
+ // Hive doesn't have a Fixed type, so we're going to treat them as arrays of
+ // bytes
+ // TODO: Make note in documentation that Hive sends these out as signed bytes.
+ private static final TypeInfo FIXED_AND_BYTES_EQUIV =
+ TypeInfoFactory.getListTypeInfo(TypeInfoFactory.byteTypeInfo);
+ private static TypeInfo generateFixedTypeInfo(Schema schema) {
+ assert schema.getType().equals(Schema.Type.FIXED);
+
+ return FIXED_AND_BYTES_EQUIV;
+ }
+
+ // Avro considers bytes to be a primitive type, but Hive doesn't. We'll
+ // convert them to a list of bytes, just like Fixed. Sigh.
+ private static TypeInfo generateBytesTypeInfo(Schema schema) {
+ assert schema.getType().equals(Schema.Type.BYTES);
+ return FIXED_AND_BYTES_EQUIV;
+ }
+}
Added: hive/trunk/serde/src/test/org/apache/hadoop/hive/serde2/avro/TestAvroDeserializer.java
URL: http://svn.apache.org/viewvc/hive/trunk/serde/src/test/org/apache/hadoop/hive/serde2/avro/TestAvroDeserializer.java?rev=1345420&view=auto
==============================================================================
--- hive/trunk/serde/src/test/org/apache/hadoop/hive/serde2/avro/TestAvroDeserializer.java (added)
+++ hive/trunk/serde/src/test/org/apache/hadoop/hive/serde2/avro/TestAvroDeserializer.java Sat Jun 2 02:14:04 2012
@@ -0,0 +1,432 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.serde2.avro;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StandardListObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.UnionObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaStringObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.VoidObjectInspector;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Hashtable;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+public class TestAvroDeserializer {
+ private final GenericData GENERIC_DATA = GenericData.get();
+
+ @Test
+ public void canDeserializeVoidType() throws IOException, SerDeException {
+ String schemaString = "{\n" +
+ " \"type\": \"record\", \n" +
+ " \"name\": \"nullTest\",\n" +
+ " \"fields\" : [\n" +
+ " {\"name\": \"isANull\", \"type\": \"null\"}\n" +
+ " ]\n" +
+ "}";
+ Schema s = Schema.parse(schemaString);
+ GenericData.Record record = new GenericData.Record(s);
+
+ record.put("isANull", null);
+ assertTrue(GENERIC_DATA.validate(s, record));
+
+ AvroGenericRecordWritable garw = Utils.serializeAndDeserializeRecord(record);
+
+ AvroObjectInspectorGenerator aoig = new AvroObjectInspectorGenerator(s);
+
+ AvroDeserializer de = new AvroDeserializer();
+
+ ArrayList<Object> row = (ArrayList<Object>)de.deserialize(aoig.getColumnNames(),
+ aoig.getColumnTypes(), garw, s);
+ assertEquals(1, row.size());
+ Object theVoidObject = row.get(0);
+ assertNull(theVoidObject);
+
+ StandardStructObjectInspector oi = (StandardStructObjectInspector)aoig.getObjectInspector();
+ StructField fieldRef = oi.getStructFieldRef("isANull");
+
+ Object shouldBeNull = oi.getStructFieldData(row, fieldRef);
+ assertNull(shouldBeNull);
+ assertTrue(fieldRef.getFieldObjectInspector() instanceof VoidObjectInspector);
+ }
+
+ @Test
+ public void canDeserializeMapsWithPrimitiveKeys() throws SerDeException, IOException {
+ Schema s = Schema.parse(TestAvroObjectInspectorGenerator.MAP_WITH_PRIMITIVE_VALUE_TYPE);
+ GenericData.Record record = new GenericData.Record(s);
+
+ Map<String, Long> m = new Hashtable<String, Long>();
+ m.put("one", 1l);
+ m.put("two", 2l);
+ m.put("three", 3l);
+
+ record.put("aMap", m);
+ assertTrue(GENERIC_DATA.validate(s, record));
+ System.out.println("record = " + record);
+
+ AvroGenericRecordWritable garw = Utils.serializeAndDeserializeRecord(record);
+
+ AvroObjectInspectorGenerator aoig = new AvroObjectInspectorGenerator(s);
+
+ AvroDeserializer de = new AvroDeserializer();
+
+ ArrayList<Object> row = (ArrayList<Object>)de.deserialize(aoig.getColumnNames(),
+ aoig.getColumnTypes(), garw, s);
+ assertEquals(1, row.size());
+ Object theMapObject = row.get(0);
+ assertTrue(theMapObject instanceof Map);
+ Map theMap = (Map)theMapObject;
+
+ // Verify the raw object that's been created
+ assertEquals(1l, theMap.get("one"));
+ assertEquals(2l, theMap.get("two"));
+ assertEquals(3l, theMap.get("three"));
+
+ // Verify that the provided object inspector can pull out these same values
+ StandardStructObjectInspector oi =
+ (StandardStructObjectInspector)aoig.getObjectInspector();
+
+ List<Object> z = oi.getStructFieldsDataAsList(row);
+ assertEquals(1, z.size());
+ StructField fieldRef = oi.getStructFieldRef("amap");
+
+ Map theMap2 = (Map)oi.getStructFieldData(row, fieldRef);
+ assertEquals(1l, theMap2.get("one"));
+ assertEquals(2l, theMap2.get("two"));
+ assertEquals(3l, theMap2.get("three"));
+ }
+
+ @Test
+ public void canDeserializeArrays() throws SerDeException, IOException {
+ Schema s = Schema.parse(TestAvroObjectInspectorGenerator.ARRAY_WITH_PRIMITIVE_ELEMENT_TYPE);
+ GenericData.Record record = new GenericData.Record(s);
+
+ List<String> list = new ArrayList<String>();
+ list.add("Eccleston");
+ list.add("Tennant");
+ list.add("Smith");
+
+ record.put("anArray", list);
+ assertTrue(GENERIC_DATA.validate(s, record));
+ System.out.println("Array-backed record = " + record);
+
+ AvroGenericRecordWritable garw = Utils.serializeAndDeserializeRecord(record);
+
+ AvroObjectInspectorGenerator aoig = new AvroObjectInspectorGenerator(s);
+
+ AvroDeserializer de = new AvroDeserializer();
+ ArrayList<Object> row = (ArrayList<Object>)de.deserialize(aoig.getColumnNames(),
+ aoig.getColumnTypes(), garw, s);
+ assertEquals(1, row.size());
+ Object theArrayObject = row.get(0);
+ assertTrue(theArrayObject instanceof List);
+ List theList = (List)theArrayObject;
+
+ // Verify the raw object that's been created
+ assertEquals("Eccleston", theList.get(0));
+ assertEquals("Tennant", theList.get(1));
+ assertEquals("Smith", theList.get(2));
+
+ // Now go the correct way, through objectinspectors
+ StandardStructObjectInspector oi =
+ (StandardStructObjectInspector)aoig.getObjectInspector();
+ StructField fieldRefToArray = oi.getStructFieldRef("anArray");
+
+ Object anArrayData = oi.getStructFieldData(row, fieldRefToArray);
+ StandardListObjectInspector anArrayOI =
+ (StandardListObjectInspector)fieldRefToArray.getFieldObjectInspector();
+ assertEquals(3, anArrayOI.getListLength(anArrayData));
+
+ JavaStringObjectInspector elementOI =
+ (JavaStringObjectInspector)anArrayOI.getListElementObjectInspector();
+
+ Object firstElement = anArrayOI.getListElement(anArrayData, 0);
+ assertEquals("Eccleston", elementOI.getPrimitiveJavaObject(firstElement));
+ assertTrue(firstElement instanceof String);
+
+ Object secondElement = anArrayOI.getListElement(anArrayData, 1);
+ assertEquals("Tennant", elementOI.getPrimitiveJavaObject(secondElement));
+ assertTrue(secondElement instanceof String);
+
+ Object thirdElement = anArrayOI.getListElement(anArrayData, 2);
+ assertEquals("Smith", elementOI.getPrimitiveJavaObject(thirdElement));
+ assertTrue(thirdElement instanceof String);
+
+ }
+
+ @Test
+ public void canDeserializeRecords() throws SerDeException, IOException {
+ Schema s = Schema.parse(TestAvroObjectInspectorGenerator.RECORD_SCHEMA);
+ GenericData.Record record = new GenericData.Record(s);
+ GenericData.Record innerRecord = new GenericData.Record(s.getField("aRecord").schema());
+ innerRecord.put("int1", 42);
+ innerRecord.put("boolean1", true);
+ innerRecord.put("long1", 42432234234l);
+ record.put("aRecord", innerRecord);
+ assertTrue(GENERIC_DATA.validate(s, record));
+
+ AvroGenericRecordWritable garw = Utils.serializeAndDeserializeRecord(record);
+
+ AvroObjectInspectorGenerator aoig = new AvroObjectInspectorGenerator(s);
+
+ AvroDeserializer de = new AvroDeserializer();
+ ArrayList<Object> row =
+ (ArrayList<Object>)de.deserialize(aoig.getColumnNames(), aoig.getColumnTypes(), garw, s);
+ assertEquals(1, row.size());
+ Object theRecordObject = row.get(0);
+ System.out.println("theRecordObject = " + theRecordObject.getClass().getCanonicalName());
+
+ // The original record was lost in the deserialization, so just go the
+ // correct way, through objectinspectors
+ StandardStructObjectInspector oi = (StandardStructObjectInspector)aoig.getObjectInspector();
+ List<? extends StructField> allStructFieldRefs = oi.getAllStructFieldRefs();
+ assertEquals(1, allStructFieldRefs.size());
+ StructField fieldRefForaRecord = allStructFieldRefs.get(0);
+ assertEquals("arecord", fieldRefForaRecord.getFieldName());
+ Object innerRecord2 = oi.getStructFieldData(row, fieldRefForaRecord);
+
+ // Extract innerRecord field refs
+ StandardStructObjectInspector innerRecord2OI =
+ (StandardStructObjectInspector) fieldRefForaRecord.getFieldObjectInspector();
+
+ List<? extends StructField> allStructFieldRefs1 = innerRecord2OI.getAllStructFieldRefs();
+ assertEquals(3, allStructFieldRefs1.size());
+ assertEquals("int1", allStructFieldRefs1.get(0).getFieldName());
+ assertEquals("boolean1", allStructFieldRefs1.get(1).getFieldName());
+ assertEquals("long1", allStructFieldRefs1.get(2).getFieldName());
+
+ innerRecord2OI.getStructFieldsDataAsList(innerRecord2);
+ assertEquals(42, innerRecord2OI.getStructFieldData(innerRecord2, allStructFieldRefs1.get(0)));
+ assertEquals(true, innerRecord2OI.getStructFieldData(innerRecord2, allStructFieldRefs1.get(1)));
+ assertEquals(42432234234l, innerRecord2OI.getStructFieldData(innerRecord2, allStructFieldRefs1.get(2)));
+ }
+
+ private class ResultPair { // Because Pairs give Java the vapors.
+ public final ObjectInspector oi;
+ public final Object value;
+ public final Object unionObject;
+
+ private ResultPair(ObjectInspector oi, Object value, Object unionObject) {
+ this.oi = oi;
+ this.value = value;
+ this.unionObject = unionObject;
+ }
+ }
+
+ @Test
+ public void canDeserializeUnions() throws SerDeException, IOException {
+ Schema s = Schema.parse(TestAvroObjectInspectorGenerator.UNION_SCHEMA);
+ GenericData.Record record = new GenericData.Record(s);
+
+ record.put("aUnion", "this is a string");
+
+ ResultPair result = unionTester(s, record);
+ assertTrue(result.value instanceof String);
+ assertEquals("this is a string", result.value);
+ UnionObjectInspector uoi = (UnionObjectInspector)result.oi;
+ assertEquals(1, uoi.getTag(result.unionObject));
+
+ // Now the other enum possibility
+ record = new GenericData.Record(s);
+ record.put("aUnion", 99);
+ result = unionTester(s, record);
+ assertTrue(result.value instanceof Integer);
+ assertEquals(99, result.value);
+ uoi = (UnionObjectInspector)result.oi;
+ assertEquals(0, uoi.getTag(result.unionObject));
+ }
+
+ private ResultPair unionTester(Schema s, GenericData.Record record)
+ throws SerDeException, IOException {
+ assertTrue(GENERIC_DATA.validate(s, record));
+ AvroGenericRecordWritable garw = Utils.serializeAndDeserializeRecord(record);
+
+ AvroObjectInspectorGenerator aoig = new AvroObjectInspectorGenerator(s);
+
+ AvroDeserializer de = new AvroDeserializer();
+ ArrayList<Object> row =
+ (ArrayList<Object>)de.deserialize(aoig.getColumnNames(), aoig.getColumnTypes(), garw, s);
+ assertEquals(1, row.size());
+ StandardStructObjectInspector oi = (StandardStructObjectInspector)aoig.getObjectInspector();
+ List<? extends StructField> fieldRefs = oi.getAllStructFieldRefs();
+ assertEquals(1, fieldRefs.size());
+ StructField fieldRef = fieldRefs.get(0);
+ assertEquals("aunion", fieldRef.getFieldName());
+ Object theUnion = oi.getStructFieldData(row, fieldRef);
+
+ assertTrue(fieldRef.getFieldObjectInspector() instanceof UnionObjectInspector);
+ UnionObjectInspector fieldObjectInspector =
+ (UnionObjectInspector)fieldRef.getFieldObjectInspector();
+ Object value = fieldObjectInspector.getField(theUnion);
+
+ return new ResultPair(fieldObjectInspector, value, theUnion);
+ }
+
+ @Test // Enums are one of two types we fudge for Hive. Enums go in, Strings come out.
+ public void canDeserializeEnums() throws SerDeException, IOException {
+ Schema s = Schema.parse(TestAvroObjectInspectorGenerator.ENUM_SCHEMA);
+ GenericData.Record record = new GenericData.Record(s);
+
+ record.put("baddies", "DALEKS");
+ assertTrue(GENERIC_DATA.validate(s, record));
+
+ AvroGenericRecordWritable garw = Utils.serializeAndDeserializeRecord(record);
+ AvroObjectInspectorGenerator aoig = new AvroObjectInspectorGenerator(s);
+
+ AvroDeserializer de = new AvroDeserializer();
+ ArrayList<Object> row =
+ (ArrayList<Object>)de.deserialize(aoig.getColumnNames(), aoig.getColumnTypes(), garw, s);
+ assertEquals(1, row.size());
+ StandardStructObjectInspector oi = (StandardStructObjectInspector)aoig.getObjectInspector();
+ List<? extends StructField> fieldRefs = oi.getAllStructFieldRefs();
+ assertEquals(1, fieldRefs.size());
+ StructField fieldRef = fieldRefs.get(0);
+
+ assertEquals("baddies", fieldRef.getFieldName());
+
+ Object theStringObject = oi.getStructFieldData(row, fieldRef);
+ assertTrue(fieldRef.getFieldObjectInspector() instanceof StringObjectInspector);
+ StringObjectInspector soi = (StringObjectInspector)fieldRef.getFieldObjectInspector();
+
+ String finalValue = soi.getPrimitiveJavaObject(theStringObject);
+ assertEquals("DALEKS", finalValue);
+ }
+
+ @Test // Fixed doesn't exist in Hive. Fixeds go in, lists of bytes go out.
+ public void canDeserializeFixed() throws SerDeException, IOException {
+ Schema s = Schema.parse(TestAvroObjectInspectorGenerator.FIXED_SCHEMA);
+ GenericData.Record record = new GenericData.Record(s);
+
+ byte [] bytes = "ANANCIENTBLUEBOX".getBytes();
+ record.put("hash", new GenericData.Fixed(s, bytes));
+ assertTrue(GENERIC_DATA.validate(s, record));
+
+ AvroGenericRecordWritable garw = Utils.serializeAndDeserializeRecord(record);
+ AvroObjectInspectorGenerator aoig = new AvroObjectInspectorGenerator(s);
+
+ AvroDeserializer de = new AvroDeserializer();
+ ArrayList<Object> row =
+ (ArrayList<Object>)de.deserialize(aoig.getColumnNames(), aoig.getColumnTypes(), garw, s);
+ assertEquals(1, row.size());
+ Object theArrayObject = row.get(0);
+ assertTrue(theArrayObject instanceof List);
+ List theList = (List)theArrayObject;
+ // Verify the raw object that's been created
+ for(int i = 0; i < bytes.length; i++) {
+ assertEquals(bytes[i], theList.get(i));
+ }
+
+ // Now go the correct way, through objectinspectors
+ StandardStructObjectInspector oi = (StandardStructObjectInspector)aoig.getObjectInspector();
+ List<Object> fieldsDataAsList = oi.getStructFieldsDataAsList(row);
+ assertEquals(1, fieldsDataAsList.size());
+ StructField fieldRef = oi.getStructFieldRef("hash");
+
+ List theList2 = (List)oi.getStructFieldData(row, fieldRef);
+ for(int i = 0; i < bytes.length; i++) {
+ assertEquals(bytes[i], theList2.get(i));
+ }
+ }
+
+ @Test
+ public void canDeserializeBytes() throws SerDeException, IOException {
+ Schema s = Schema.parse(TestAvroObjectInspectorGenerator.BYTES_SCHEMA);
+ GenericData.Record record = new GenericData.Record(s);
+
+ byte [] bytes = "ANANCIENTBLUEBOX".getBytes();
+
+ ByteBuffer bb = ByteBuffer.wrap(bytes);
+ bb.rewind();
+ record.put("bytesField", bb);
+ assertTrue(GENERIC_DATA.validate(s, record));
+
+ AvroGenericRecordWritable garw = Utils.serializeAndDeserializeRecord(record);
+ AvroObjectInspectorGenerator aoig = new AvroObjectInspectorGenerator(s);
+
+ AvroDeserializer de = new AvroDeserializer();
+ ArrayList<Object> row =
+ (ArrayList<Object>)de.deserialize(aoig.getColumnNames(), aoig.getColumnTypes(), garw, s);
+ assertEquals(1, row.size());
+ Object theArrayObject = row.get(0);
+ assertTrue(theArrayObject instanceof List);
+
+ // Now go the correct way, through objectinspectors
+ StandardStructObjectInspector oi = (StandardStructObjectInspector)aoig.getObjectInspector();
+ List<Object> fieldsDataAsList = oi.getStructFieldsDataAsList(row);
+ assertEquals(1, fieldsDataAsList.size());
+ StructField fieldRef = oi.getStructFieldRef("bytesField");
+
+ List theList2 = (List)oi.getStructFieldData(row, fieldRef);
+ for(int i = 0; i < bytes.length; i++) {
+ assertEquals(bytes[i], theList2.get(i));
+ }
+ }
+
+ @Test
+ public void canDeserializeNullableTypes() throws IOException, SerDeException {
+ Schema s = Schema.parse(TestAvroObjectInspectorGenerator.NULLABLE_STRING_SCHEMA);
+ GenericData.Record record = new GenericData.Record(s);
+ record.put("nullableString", "this is a string");
+
+ verifyNullableType(record, s, "this is a string");
+
+ record = new GenericData.Record(s);
+ record.put("nullableString", null);
+ verifyNullableType(record, s, null);
+ }
+
+ private void verifyNullableType(GenericData.Record record, Schema s,
+ String expected) throws SerDeException, IOException {
+ assertTrue(GENERIC_DATA.validate(s, record));
+
+ AvroGenericRecordWritable garw = Utils.serializeAndDeserializeRecord(record);
+ AvroObjectInspectorGenerator aoig = new AvroObjectInspectorGenerator(s);
+
+ AvroDeserializer de = new AvroDeserializer();
+ ArrayList<Object> row =
+ (ArrayList<Object>)de.deserialize(aoig.getColumnNames(), aoig.getColumnTypes(), garw, s);
+ assertEquals(1, row.size());
+ Object rowElement = row.get(0);
+
+ StandardStructObjectInspector oi = (StandardStructObjectInspector)aoig.getObjectInspector();
+ List<Object> fieldsDataAsList = oi.getStructFieldsDataAsList(row);
+ assertEquals(1, fieldsDataAsList.size());
+ StructField fieldRef = oi.getStructFieldRef("nullablestring");
+ ObjectInspector fieldObjectInspector = fieldRef.getFieldObjectInspector();
+ StringObjectInspector soi = (StringObjectInspector)fieldObjectInspector;
+
+ if(expected == null)
+ assertNull(soi.getPrimitiveJavaObject(rowElement));
+ else
+ assertEquals("this is a string", soi.getPrimitiveJavaObject(rowElement));
+ }
+}
Added: hive/trunk/serde/src/test/org/apache/hadoop/hive/serde2/avro/TestAvroObjectInspectorGenerator.java
URL: http://svn.apache.org/viewvc/hive/trunk/serde/src/test/org/apache/hadoop/hive/serde2/avro/TestAvroObjectInspectorGenerator.java?rev=1345420&view=auto
==============================================================================
--- hive/trunk/serde/src/test/org/apache/hadoop/hive/serde2/avro/TestAvroObjectInspectorGenerator.java (added)
+++ hive/trunk/serde/src/test/org/apache/hadoop/hive/serde2/avro/TestAvroObjectInspectorGenerator.java Sat Jun 2 02:14:04 2012
@@ -0,0 +1,500 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.serde2.avro;
+
+import org.apache.avro.Schema;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.apache.hadoop.hive.serde2.typeinfo.UnionTypeInfo;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class TestAvroObjectInspectorGenerator {
+ private final TypeInfo STRING = TypeInfoFactory.getPrimitiveTypeInfo("string");
+ private final TypeInfo INT = TypeInfoFactory.getPrimitiveTypeInfo("int");
+ private final TypeInfo BOOLEAN = TypeInfoFactory.getPrimitiveTypeInfo("boolean");
+ private final TypeInfo LONG = TypeInfoFactory.getPrimitiveTypeInfo("bigint");
+ private final TypeInfo FLOAT = TypeInfoFactory.getPrimitiveTypeInfo("float");
+ private final TypeInfo DOUBLE = TypeInfoFactory.getPrimitiveTypeInfo("double");
+ private final TypeInfo VOID = TypeInfoFactory.getPrimitiveTypeInfo("void");
+
+ // These schemata are used in other tests
+ static public final String MAP_WITH_PRIMITIVE_VALUE_TYPE = "{\n" +
+ " \"namespace\": \"testing\",\n" +
+ " \"name\": \"oneMap\",\n" +
+ " \"type\": \"record\",\n" +
+ " \"fields\": [\n" +
+ " {\n" +
+ " \"name\":\"aMap\",\n" +
+ " \"type\":{\"type\":\"map\",\n" +
+ " \"values\":\"long\"}\n" +
+ "\t}\n" +
+ " ]\n" +
+ "}";
+ static public final String ARRAY_WITH_PRIMITIVE_ELEMENT_TYPE = "{\n" +
+ " \"namespace\": \"testing\",\n" +
+ " \"name\": \"oneArray\",\n" +
+ " \"type\": \"record\",\n" +
+ " \"fields\": [\n" +
+ " {\n" +
+ " \"name\":\"anArray\",\n" +
+ " \"type\":{\"type\":\"array\",\n" +
+ " \"items\":\"string\"}\n" +
+ "\t}\n" +
+ " ]\n" +
+ "}";
+ public static final String RECORD_SCHEMA = "{\n" +
+ " \"namespace\": \"testing.test.mctesty\",\n" +
+ " \"name\": \"oneRecord\",\n" +
+ " \"type\": \"record\",\n" +
+ " \"fields\": [\n" +
+ " {\n" +
+ " \"name\":\"aRecord\",\n" +
+ " \"type\":{\"type\":\"record\",\n" +
+ " \"name\":\"recordWithinARecord\",\n" +
+ " \"fields\": [\n" +
+ " {\n" +
+ " \"name\":\"int1\",\n" +
+ " \"type\":\"int\"\n" +
+ " },\n" +
+ " {\n" +
+ " \"name\":\"boolean1\",\n" +
+ " \"type\":\"boolean\"\n" +
+ " },\n" +
+ " {\n" +
+ " \"name\":\"long1\",\n" +
+ " \"type\":\"long\"\n" +
+ " }\n" +
+ " ]}\n" +
+ " }\n" +
+ " ]\n" +
+ "}";
+ public static final String UNION_SCHEMA = "{\n" +
+ " \"namespace\": \"test.a.rossa\",\n" +
+ " \"name\": \"oneUnion\",\n" +
+ " \"type\": \"record\",\n" +
+ " \"fields\": [\n" +
+ " {\n" +
+ " \"name\":\"aUnion\",\n" +
+ " \"type\":[\"int\", \"string\"]\n" +
+ " }\n" +
+ " ]\n" +
+ "}";
+ public static final String ENUM_SCHEMA = "{\n" +
+ " \"namespace\": \"clever.namespace.name.in.space\",\n" +
+ " \"name\": \"oneEnum\",\n" +
+ " \"type\": \"record\",\n" +
+ " \"fields\": [\n" +
+ " {\n" +
+ " \"name\":\"baddies\",\n" +
+ " \"type\":{\"type\":\"enum\",\"name\":\"villians\", \"symbols\": " +
+ "[\"DALEKS\", \"CYBERMEN\", \"SLITHEEN\", \"JAGRAFESS\"]}\n" +
+ " \n" +
+ " \n" +
+ " }\n" +
+ " ]\n" +
+ "}";
+ public static final String FIXED_SCHEMA = "{\n" +
+ " \"namespace\": \"ecapseman\",\n" +
+ " \"name\": \"oneFixed\",\n" +
+ " \"type\": \"record\",\n" +
+ " \"fields\": [\n" +
+ " {\n" +
+ " \"name\":\"hash\",\n" +
+ " \"type\":{\"type\": \"fixed\", \"name\": \"MD5\", \"size\": 16}\n" +
+ " }\n" +
+ " ]\n" +
+ "}";
+ public static final String NULLABLE_STRING_SCHEMA = "{\n" +
+ " \"type\": \"record\", \n" +
+ " \"name\": \"nullableUnionTest\",\n" +
+ " \"fields\" : [\n" +
+ " {\"name\":\"nullableString\", \"type\":[\"null\", \"string\"]}\n" +
+ " ]\n" +
+ "}";
+ public static final String BYTES_SCHEMA = "{\n" +
+ " \"type\": \"record\", \n" +
+ " \"name\": \"bytesTest\",\n" +
+ " \"fields\" : [\n" +
+ " {\"name\":\"bytesField\", \"type\":\"bytes\"}\n" +
+ " ]\n" +
+ "}";
+
+ public static final String KITCHEN_SINK_SCHEMA = "{\n" +
+ " \"namespace\": \"org.apache.hadoop.hive\",\n" +
+ " \"name\": \"kitchsink\",\n" +
+ " \"type\": \"record\",\n" +
+ " \"fields\": [\n" +
+ " {\n" +
+ " \"name\":\"string1\",\n" +
+ " \"type\":\"string\"\n" +
+ " },\n" +
+ " {\n" +
+ " \"name\":\"string2\",\n" +
+ " \"type\":\"string\"\n" +
+ " },\n" +
+ " {\n" +
+ " \"name\":\"int1\",\n" +
+ " \"type\":\"int\"\n" +
+ " },\n" +
+ " {\n" +
+ " \"name\":\"boolean1\",\n" +
+ " \"type\":\"boolean\"\n" +
+ " },\n" +
+ " {\n" +
+ " \"name\":\"long1\",\n" +
+ " \"type\":\"long\"\n" +
+ " },\n" +
+ " {\n" +
+ " \"name\":\"float1\",\n" +
+ " \"type\":\"float\"\n" +
+ " },\n" +
+ " {\n" +
+ " \"name\":\"double1\",\n" +
+ " \"type\":\"double\"\n" +
+ " },\n" +
+ " {\n" +
+ " \"name\":\"inner_record1\",\n" +
+ " \"type\":{ \"type\":\"record\",\n" +
+ " \"name\":\"inner_record1_impl\",\n" +
+ " \"fields\": [\n" +
+ " {\"name\":\"int_in_inner_record1\",\n" +
+ " \"type\":\"int\"},\n" +
+ " {\"name\":\"string_in_inner_record1\",\n" +
+ " \"type\":\"string\"}\n" +
+ " ]\n" +
+ " }\n" +
+ " },\n" +
+ " {\n" +
+ " \"name\":\"enum1\",\n" +
+ " \"type\":{\"type\":\"enum\", \"name\":\"enum1_values\", " +
+ "\"symbols\":[\"ENUM1_VALUES_VALUE1\",\"ENUM1_VALUES_VALUE2\", \"ENUM1_VALUES_VALUE3\"]}\n" +
+ " },\n" +
+ " {\n" +
+ " \"name\":\"array1\",\n" +
+ " \"type\":{\"type\":\"array\", \"items\":\"string\"}\n" +
+ " },\n" +
+ " {\n" +
+ " \"name\":\"map1\",\n" +
+ " \"type\":{\"type\":\"map\", \"values\":\"string\"}\n" +
+ " },\n" +
+ " {\n" +
+ " \"name\":\"union1\",\n" +
+ " \"type\":[\"float\", \"boolean\", \"string\"]\n" +
+ " },\n" +
+ " {\n" +
+ " \"name\":\"fixed1\",\n" +
+ " \"type\":{\"type\":\"fixed\", \"name\":\"fourbytes\", \"size\":4}\n" +
+ " },\n" +
+ " {\n" +
+ " \"name\":\"null1\",\n" +
+ " \"type\":\"null\"\n" +
+ " },\n" +
+ " {\n" +
+ " \"name\":\"UnionNullInt\",\n" +
+ " \"type\":[\"int\", \"null\"]\n" +
+ " },\n" +
+ " {\n" +
+ " \"name\":\"bytes1\",\n" +
+ " \"type\":\"bytes\"\n" +
+ " }\n" +
+ " ]\n" +
+ "}";
+
+ @Test // that we can only process records
+ public void failOnNonRecords() throws Exception {
+ String nonRecordSchema = "{ \"type\": \"enum\",\n" +
+ " \"name\": \"Suit\",\n" +
+ " \"symbols\" : [\"SPADES\", \"HEARTS\", \"DIAMONDS\", \"CLUBS\"]\n" +
+ "}";
+
+ Schema s = Schema.parse(nonRecordSchema);
+ try {
+ new AvroObjectInspectorGenerator(s);
+ fail("Should not be able to handle non-record Avro types");
+ } catch(SerDeException sde) {
+ assertTrue(sde.getMessage().startsWith("Schema for table must be of type RECORD"));
+ }
+ }
+
+ @Test
+ public void primitiveTypesWorkCorrectly() throws SerDeException {
+ final String bunchOfPrimitives = "{\n" +
+ " \"namespace\": \"testing\",\n" +
+ " \"name\": \"PrimitiveTypes\",\n" +
+ " \"type\": \"record\",\n" +
+ " \"fields\": [\n" +
+ " {\n" +
+ " \"name\":\"aString\",\n" +
+ " \"type\":\"string\"\n" +
+ " },\n" +
+ " {\n" +
+ " \"name\":\"anInt\",\n" +
+ " \"type\":\"int\"\n" +
+ " },\n" +
+ " {\n" +
+ " \"name\":\"aBoolean\",\n" +
+ " \"type\":\"boolean\"\n" +
+ " },\n" +
+ " {\n" +
+ " \"name\":\"aLong\",\n" +
+ " \"type\":\"long\"\n" +
+ " },\n" +
+ " {\n" +
+ " \"name\":\"aFloat\",\n" +
+ " \"type\":\"float\"\n" +
+ " },\n" +
+ " {\n" +
+ " \"name\":\"aDouble\",\n" +
+ " \"type\":\"double\"\n" +
+ " },\n" +
+ " {\n" +
+ " \"name\":\"aNull\",\n" +
+ " \"type\":\"null\"\n" +
+ " }\n" +
+ " ]\n" +
+ "}";
+ AvroObjectInspectorGenerator aoig = new AvroObjectInspectorGenerator(Schema.parse(bunchOfPrimitives));
+
+ String [] expectedColumnNames = {"aString", "anInt", "aBoolean", "aLong", "aFloat", "aDouble", "aNull"};
+ verifyColumnNames(expectedColumnNames, aoig.getColumnNames());
+
+ TypeInfo [] expectedColumnTypes = {STRING, INT, BOOLEAN, LONG, FLOAT, DOUBLE, VOID};
+ verifyColumnTypes(expectedColumnTypes, aoig.getColumnTypes());
+
+ // Rip apart the object inspector, making sure we got what we expect.
+ final ObjectInspector oi = aoig.getObjectInspector();
+ assertTrue(oi instanceof StandardStructObjectInspector);
+ final StandardStructObjectInspector ssoi = (StandardStructObjectInspector)oi;
+ List<? extends StructField> structFields = ssoi.getAllStructFieldRefs();
+ assertEquals(expectedColumnNames.length, structFields.size());
+
+ for(int i = 0; i < expectedColumnNames.length;i++) {
+ assertEquals("Column names don't match",
+ expectedColumnNames[i].toLowerCase(), structFields.get(i).getFieldName());
+ assertEquals("Column types don't match",
+ expectedColumnTypes[i].getTypeName(),
+ structFields.get(i).getFieldObjectInspector().getTypeName());
+ }
+ }
+
+ private void verifyColumnTypes(TypeInfo[] expectedColumnTypes, List<TypeInfo> columnTypes) {
+ for(int i = 0; i < expectedColumnTypes.length; i++) {
+ assertEquals(expectedColumnTypes[i], columnTypes.get(i));
+
+ }
+ }
+
+ private void verifyColumnNames(String[] expectedColumnNames, List<String> columnNames) {
+ for(int i = 0; i < expectedColumnNames.length; i++) {
+ assertEquals(expectedColumnNames[i], columnNames.get(i));
+ }
+ }
+
+ @Test
+ public void canHandleMapsWithPrimitiveValueTypes() throws SerDeException {
+ Schema s = Schema.parse(MAP_WITH_PRIMITIVE_VALUE_TYPE);
+ AvroObjectInspectorGenerator aoig = new AvroObjectInspectorGenerator(s);
+
+ // Column names
+ assertEquals(1, aoig.getColumnNames().size());
+ assertEquals("aMap", aoig.getColumnNames().get(0));
+
+ // Column types
+ assertEquals(1, aoig.getColumnTypes().size());
+ TypeInfo typeInfo = aoig.getColumnTypes().get(0);
+ assertEquals(ObjectInspector.Category.MAP, typeInfo.getCategory());
+ assertTrue(typeInfo instanceof MapTypeInfo);
+ MapTypeInfo mapTypeInfo = (MapTypeInfo)typeInfo;
+
+ assertEquals("bigint" /* == long in Avro */, mapTypeInfo.getMapValueTypeInfo().getTypeName());
+ assertEquals("string", mapTypeInfo.getMapKeyTypeInfo().getTypeName());
+ }
+
+ @Test
+ public void canHandleArrays() throws SerDeException {
+ Schema s = Schema.parse(ARRAY_WITH_PRIMITIVE_ELEMENT_TYPE);
+ AvroObjectInspectorGenerator aoig = new AvroObjectInspectorGenerator(s);
+
+ // Column names
+ assertEquals(1, aoig.getColumnNames().size());
+ assertEquals("anArray", aoig.getColumnNames().get(0));
+
+ // Column types
+ assertEquals(1, aoig.getColumnTypes().size());
+ TypeInfo typeInfo = aoig.getColumnTypes().get(0);
+ assertEquals(ObjectInspector.Category.LIST, typeInfo.getCategory());
+ assertTrue(typeInfo instanceof ListTypeInfo);
+ ListTypeInfo listTypeInfo = (ListTypeInfo)typeInfo;
+
+ assertEquals("string", listTypeInfo.getListElementTypeInfo().getTypeName());
+ }
+
+ @Test
+ public void canHandleRecords() throws SerDeException {
+ Schema s = Schema.parse(RECORD_SCHEMA);
+ AvroObjectInspectorGenerator aoig = new AvroObjectInspectorGenerator(s);
+
+ // Column names
+ assertEquals(1, aoig.getColumnNames().size());
+ assertEquals("aRecord", aoig.getColumnNames().get(0));
+
+ // Column types
+ assertEquals(1, aoig.getColumnTypes().size());
+ TypeInfo typeInfo = aoig.getColumnTypes().get(0);
+ assertEquals(ObjectInspector.Category.STRUCT, typeInfo.getCategory());
+ assertTrue(typeInfo instanceof StructTypeInfo);
+ StructTypeInfo structTypeInfo = (StructTypeInfo)typeInfo;
+
+ // Check individual elements of subrecord
+ ArrayList<String> allStructFieldNames = structTypeInfo.getAllStructFieldNames();
+ ArrayList<TypeInfo> allStructFieldTypeInfos = structTypeInfo.getAllStructFieldTypeInfos();
+ assertEquals(allStructFieldNames.size(), 3);
+ String[] names = new String[]{"int1", "boolean1", "long1"};
+ String [] typeInfoStrings = new String [] {"int", "boolean", "bigint"};
+ for(int i = 0; i < allStructFieldNames.size(); i++) {
+ assertEquals("Fieldname " + allStructFieldNames.get(i) +
+ " doesn't match expected " + names[i], names[i], allStructFieldNames.get(i));
+ assertEquals("Typeinfo " + allStructFieldTypeInfos.get(i) +
+ " doesn't match expected " + typeInfoStrings[i], typeInfoStrings[i],
+ allStructFieldTypeInfos.get(i).getTypeName());
+ }
+ }
+
+ @Test
+ public void canHandleUnions() throws SerDeException {
+ Schema s = Schema.parse(UNION_SCHEMA);
+ AvroObjectInspectorGenerator aoig = new AvroObjectInspectorGenerator(s);
+
+ // Column names
+ assertEquals(1, aoig.getColumnNames().size());
+ assertEquals("aUnion", aoig.getColumnNames().get(0));
+
+ // Column types
+ assertEquals(1, aoig.getColumnTypes().size());
+ TypeInfo typeInfo = aoig.getColumnTypes().get(0);
+ assertTrue(typeInfo instanceof UnionTypeInfo);
+ UnionTypeInfo uti = (UnionTypeInfo)typeInfo;
+
+ // Check that the union has come out unscathed. No scathing of unions allowed.
+ List<TypeInfo> typeInfos = uti.getAllUnionObjectTypeInfos();
+ assertEquals(2, typeInfos.size());
+ assertEquals(INT, typeInfos.get(0));
+ assertEquals(STRING, typeInfos.get(1));
+ assertEquals("uniontype<int,string>", uti.getTypeName());
+ }
+
+ @Test // Enums are one of two Avro types that Hive doesn't have any native support for.
+ public void canHandleEnums() throws SerDeException {
+ Schema s = Schema.parse(ENUM_SCHEMA);
+ AvroObjectInspectorGenerator aoig = new AvroObjectInspectorGenerator(s);
+
+ // Column names - we lose the enumness of this schema
+ assertEquals(1, aoig.getColumnNames().size());
+ assertEquals("baddies", aoig.getColumnNames().get(0));
+
+ // Column types
+ assertEquals(1, aoig.getColumnTypes().size());
+ assertEquals(STRING, aoig.getColumnTypes().get(0));
+ }
+
+ @Test // Hive has no concept of Avro's fixed type. Fixed -> arrays of bytes
+ public void canHandleFixed() throws SerDeException {
+ Schema s = Schema.parse(FIXED_SCHEMA);
+
+ AvroObjectInspectorGenerator aoig = new AvroObjectInspectorGenerator(s);
+
+ // Column names
+ assertEquals(1, aoig.getColumnNames().size());
+ assertEquals("hash", aoig.getColumnNames().get(0));
+
+ // Column types
+ assertEquals(1, aoig.getColumnTypes().size());
+ TypeInfo typeInfo = aoig.getColumnTypes().get(0);
+ assertTrue(typeInfo instanceof ListTypeInfo);
+ ListTypeInfo listTypeInfo = (ListTypeInfo) typeInfo;
+ assertTrue(listTypeInfo.getListElementTypeInfo() instanceof PrimitiveTypeInfo);
+ assertEquals("tinyint", listTypeInfo.getListElementTypeInfo().getTypeName());
+ }
+
+ @Test // Avro considers bytes primitive, Hive doesn't. Make them list of tinyint.
+ public void canHandleBytes() throws SerDeException {
+ Schema s = Schema.parse(BYTES_SCHEMA);
+
+ AvroObjectInspectorGenerator aoig = new AvroObjectInspectorGenerator(s);
+
+ // Column names
+ assertEquals(1, aoig.getColumnNames().size());
+ assertEquals("bytesField", aoig.getColumnNames().get(0));
+
+ // Column types
+ assertEquals(1, aoig.getColumnTypes().size());
+ TypeInfo typeInfo = aoig.getColumnTypes().get(0);
+ assertTrue(typeInfo instanceof ListTypeInfo);
+ ListTypeInfo listTypeInfo = (ListTypeInfo) typeInfo;
+ assertTrue(listTypeInfo.getListElementTypeInfo() instanceof PrimitiveTypeInfo);
+ assertEquals("tinyint", listTypeInfo.getListElementTypeInfo().getTypeName());
+ }
+
+ @Test // That Union[T, NULL] is converted to just T.
+ public void convertsNullableTypes() throws SerDeException {
+ Schema s = Schema.parse(NULLABLE_STRING_SCHEMA);
+
+ AvroObjectInspectorGenerator aoig = new AvroObjectInspectorGenerator(s);
+ assertEquals(1, aoig.getColumnNames().size());
+ assertEquals("nullableString", aoig.getColumnNames().get(0));
+
+ // Column types
+ assertEquals(1, aoig.getColumnTypes().size());
+ TypeInfo typeInfo = aoig.getColumnTypes().get(0);
+ assertTrue(typeInfo instanceof PrimitiveTypeInfo);
+ PrimitiveTypeInfo pti = (PrimitiveTypeInfo) typeInfo;
+ // Verify the union has been hidden and just the main type has been returned.
+ assertEquals(PrimitiveObjectInspector.PrimitiveCategory.STRING, pti.getPrimitiveCategory());
+ }
+
+ @Test
+ public void objectInspectorsAreCached() throws SerDeException {
+ // Verify that Hive is caching the object inspectors for us.
+ Schema s = Schema.parse(KITCHEN_SINK_SCHEMA);
+ AvroObjectInspectorGenerator aoig = new AvroObjectInspectorGenerator(s);
+
+ Schema s2 = Schema.parse(KITCHEN_SINK_SCHEMA);
+ AvroObjectInspectorGenerator aoig2 = new AvroObjectInspectorGenerator(s2);
+
+
+ assertEquals(aoig.getObjectInspector(), aoig2.getObjectInspector());
+ // For once we actually want reference equality in Java.
+ assertTrue(aoig.getObjectInspector() == aoig2.getObjectInspector());
+ }
+}
Added: hive/trunk/serde/src/test/org/apache/hadoop/hive/serde2/avro/TestAvroSerde.java
URL: http://svn.apache.org/viewvc/hive/trunk/serde/src/test/org/apache/hadoop/hive/serde2/avro/TestAvroSerde.java?rev=1345420&view=auto
==============================================================================
--- hive/trunk/serde/src/test/org/apache/hadoop/hive/serde2/avro/TestAvroSerde.java (added)
+++ hive/trunk/serde/src/test/org/apache/hadoop/hive/serde2/avro/TestAvroSerde.java Sat Jun 2 02:14:04 2012
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.serde2.avro;
+
+import org.apache.avro.Schema;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.io.Writable;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.util.List;
+import java.util.Properties;
+
+import static org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils.AVRO_SERDE_SCHEMA;
+import static org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils.SCHEMA_LITERAL;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class TestAvroSerde {
+ static final String originalSchemaString = "{\n" +
+ " \"namespace\": \"org.apache.hadoop.hive\",\n" +
+ " \"name\": \"previous\",\n" +
+ " \"type\": \"record\",\n" +
+ " \"fields\": [\n" +
+ " {\n" +
+ " \"name\":\"text\",\n" +
+ " \"type\":\"string\"\n" +
+ " }\n" +
+ " ]\n" +
+ "}";
+ static final String newSchemaString = "{\n" +
+ " \"namespace\": \"org.apache.hadoop.hive\",\n" +
+ " \"name\": \"new\",\n" +
+ " \"type\": \"record\",\n" +
+ " \"fields\": [\n" +
+ " {\n" +
+ " \"name\":\"text\",\n" +
+ " \"type\":\"string\"\n" +
+ " }\n" +
+ " ]\n" +
+ "}";
+
+ static final Schema originalSchema = Schema.parse(originalSchemaString);
+ static final Schema newSchema = Schema.parse(newSchemaString);
+
+ @Test
+ public void initializeDoesNotReuseSchemasFromConf() throws SerDeException {
+ // Hive will re-use the Configuration object that it passes in to be
+ // initialized. Therefore we need to make sure we don't look for any
+ // old schemas within it.
+ Configuration conf = new Configuration();
+ conf.set(AVRO_SERDE_SCHEMA, originalSchema.toString(false));
+
+ Properties props = new Properties();
+ props.put(SCHEMA_LITERAL, newSchemaString);
+
+
+ AvroSerDe asd = new AvroSerDe();
+ asd.initialize(conf, props);
+
+ // Verify that the schema now within the configuration is the one passed
+ // in via the properties
+ assertEquals(newSchema, Schema.parse(conf.get(AVRO_SERDE_SCHEMA)));
+ }
+
+ @Test
+ public void noSchemaProvidedReturnsErrorSchema() throws SerDeException {
+ Properties props = new Properties();
+
+ verifyErrorSchemaReturned(props);
+ }
+
+ @Test
+ public void gibberishSchemaProvidedReturnsErrorSchema() throws SerDeException {
+ Properties props = new Properties();
+ props.put(AvroSerdeUtils.SCHEMA_LITERAL, "blahblahblah");
+
+ verifyErrorSchemaReturned(props);
+ }
+
+ @Test
+ public void emptySchemaProvidedReturnsErrorSchema() throws SerDeException {
+ Properties props = new Properties();
+ props.put(AvroSerdeUtils.SCHEMA_LITERAL, "");
+
+ verifyErrorSchemaReturned(props);
+ }
+
+ @Test
+ public void badSchemaURLProvidedReturnsErrorSchema() throws SerDeException {
+ Properties props = new Properties();
+ props.put(AvroSerdeUtils.SCHEMA_URL, "not://a/url");
+
+ verifyErrorSchemaReturned(props);
+ }
+
+ @Test
+ public void emptySchemaURLProvidedReturnsErrorSchema() throws SerDeException {
+ Properties props = new Properties();
+ props.put(AvroSerdeUtils.SCHEMA_URL, "");
+
+ verifyErrorSchemaReturned(props);
+ }
+
+ @Test
+ public void bothPropertiesSetToNoneReturnsErrorSchema() throws SerDeException {
+ Properties props = new Properties();
+ props.put(AvroSerdeUtils.SCHEMA_URL, AvroSerdeUtils.SCHEMA_NONE);
+ props.put(AvroSerdeUtils.SCHEMA_LITERAL, AvroSerdeUtils.SCHEMA_NONE);
+
+ verifyErrorSchemaReturned(props);
+ }
+
+ private void verifyErrorSchemaReturned(Properties props) throws SerDeException {
+ AvroSerDe asd = new AvroSerDe();
+ asd.initialize(new Configuration(), props);
+ assertTrue(asd.getObjectInspector() instanceof StandardStructObjectInspector);
+ StandardStructObjectInspector oi = (StandardStructObjectInspector)asd.getObjectInspector();
+ List<? extends StructField> allStructFieldRefs = oi.getAllStructFieldRefs();
+ assertEquals(SchemaResolutionProblem.SIGNAL_BAD_SCHEMA.getFields().size(), allStructFieldRefs.size());
+ StructField firstField = allStructFieldRefs.get(0);
+ assertTrue(firstField.toString().contains("error_error_error_error_error_error_error"));
+
+ try {
+ Writable mock = Mockito.mock(Writable.class);
+ asd.deserialize(mock);
+ fail("Should have thrown a BadSchemaException");
+ } catch (BadSchemaException bse) {
+ // good
+ }
+
+ try {
+ Object o = Mockito.mock(Object.class);
+ ObjectInspector mockOI = Mockito.mock(ObjectInspector.class);
+ asd.serialize(o, mockOI);
+ fail("Should have thrown a BadSchemaException");
+ } catch (BadSchemaException bse) {
+ // good
+ }
+ }
+
+ @Test
+ public void getSerializedClassReturnsCorrectType() {
+ AvroSerDe asd = new AvroSerDe();
+ assertEquals(AvroGenericRecordWritable.class, asd.getSerializedClass());
+ }
+}