You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by sm...@apache.org on 2012/10/12 00:06:33 UTC
svn commit: r1397333 - in /pig/trunk: ./
contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/
contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/
contrib/piggybank/java/src/test/java/org/apache/pig/pi...
Author: sms
Date: Thu Oct 11 22:06:32 2012
New Revision: 1397333
URL: http://svn.apache.org/viewvc?rev=1397333&view=rev
Log:
PIG-2579: Support for multiple input schemas in AvroStorage (cheolsoo via sms)
Added:
pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_complex_types/
pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_complex_types/test_record_double.avro (with props)
pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_complex_types/test_record_enum.avro (with props)
pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_complex_types/test_record_float.avro (with props)
pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_complex_types/test_record_int.avro (with props)
pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_complex_types/test_record_long.avro (with props)
pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_complex_types/test_record_string.avro (with props)
pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_primitive_types/
pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_primitive_types/test_double.avro (with props)
pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_primitive_types/test_enum.avro (with props)
pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_primitive_types/test_float.avro (with props)
pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_primitive_types/test_int.avro (with props)
pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_primitive_types/test_long.avro (with props)
pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_primitive_types/test_string.avro (with props)
Removed:
pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_generic_union_schema.avro
pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_recursive_schema.avro
Modified:
pig/trunk/CHANGES.txt
pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorage.java
pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorageUtils.java
pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroInputFormat.java
pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroRecordReader.java
pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/TestAvroStorage.java
pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/TestAvroStorageUtils.java
Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1397333&r1=1397332&r2=1397333&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Thu Oct 11 22:06:32 2012
@@ -25,6 +25,8 @@ PIG-1891 Enable StoreFunc to make intell
IMPROVEMENTS
+PIG-2579: Support for multiple input schemas in AvroStorage (cheolsoo via sms)
+
PIG-2946: Documentation of "history" and "clear" commands (xalan via azaroth)
PIG-2877: Make SchemaTuple work in foreach (and thus, in loads) (jcoveney)
Modified: pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorage.java
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorage.java?rev=1397333&r1=1397332&r2=1397333&view=diff
==============================================================================
--- pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorage.java (original)
+++ pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorage.java Thu Oct 11 22:06:32 2012
@@ -71,6 +71,9 @@ public class AvroStorage extends FileInp
private static final String AVRO_OUTPUT_SCHEMA_PROPERTY = "avro_output_schema";
private static final String SCHEMA_DELIM = "#";
private static final String SCHEMA_KEYVALUE_DELIM = "@";
+ private static final String NO_SCHEMA_CHECK = "no_schema_check";
+ private static final String IGNORE_BAD_FILES = "ignore_bad_files";
+ private static final String MULTIPLE_SCHEMAS = "multiple_schemas";
/* FIXME: we use this variable to distinguish schemas specified
* by different AvroStorage calls and will remove this once
@@ -87,6 +90,18 @@ public class AvroStorage extends FileInp
private PigAvroRecordReader reader = null; /* avro record writer */
private Schema inputAvroSchema = null; /* input avro schema */
+ /* if multiple avro record schemas are merged, this map associates each input
+ * record with a remapping of its fields relative to the merged schema. please
+ * see AvroStorageUtils.getSchemaToMergedSchemaMap() for more details.
+ */
+ private Map<Path, Map<Integer, Integer>> schemaToMergedSchemaMap = null;
+
+ /* whether input avro files have the same schema or not. If input avro files
+ * do not have the same schema, we merge these schemas into a single schema
+ * and derive the pig schema from it.
+ */
+ private boolean useMultipleSchemas = false;
+
private boolean checkSchema = true; /*whether check schema of input directories*/
private boolean ignoreBadFiles = false; /* whether ignore corrupted files during load */
@@ -108,16 +123,17 @@ public class AvroStorage extends FileInp
* @throws ParseException
*/
public AvroStorage(String[] parts) throws IOException, ParseException {
-
outputAvroSchema = null;
nullable = true;
checkSchema = true;
if (parts.length == 1
- && !parts[0].equalsIgnoreCase("no_schema_check")
- && !parts[0].equalsIgnoreCase("ignore_bad_files")) {
- /* If one parameter is given, and that is neither 'no_schema_check'
- * nor 'ignore_bad_files', then it must be a json string.
+ && !parts[0].equalsIgnoreCase(NO_SCHEMA_CHECK)
+ && !parts[0].equalsIgnoreCase(IGNORE_BAD_FILES)
+ && !parts[0].equalsIgnoreCase(MULTIPLE_SCHEMAS)) {
+ /* If one parameter is given, and that is not 'no_schema_check',
+ * 'ignore_bad_files', or 'multiple_schemas', then it must be a
+ * json string.
*/
init(parseJsonString(parts[0]));
} else {
@@ -126,12 +142,8 @@ public class AvroStorage extends FileInp
}
}
-
/**
* Set input location and obtain input schema.
- *
- * FIXME: currently we assume all avro files under the same "location"
- * share the same schema and will throw exception if not.
*/
@Override
public void setLocation(String location, Job job) throws IOException {
@@ -139,20 +151,44 @@ public class AvroStorage extends FileInp
return;
}
Set<Path> paths = new HashSet<Path>();
- if (AvroStorageUtils.getAllSubDirs(new Path(location), job, paths)) {
+ Configuration conf = job.getConfiguration();
+ if (AvroStorageUtils.getAllSubDirs(new Path(location), conf, paths)) {
+ setInputAvroSchema(paths, conf);
FileInputFormat.setInputPaths(job, paths.toArray(new Path[0]));
- inputAvroSchema = getAvroSchema(location, job);
} else {
throw new IOException("Input path \'" + location + "\' is not found");
}
}
- protected Schema getAvroSchema(String location, Job job) throws IOException {
- Path path = AvroStorageUtils.getConcretePathFromGlob(location, job);
- if (path == null) {
+ /**
+ * Set input avro schema. If the 'multiple_schemas' option is enabled, we merge multiple
+ * schemas and use that merged schema; otherwise, we simply use the schema from the first
+ * file in the paths set.
+ *
+ * @param paths set of input files
+ * @param conf configuration
+ * @return avro schema
+ * @throws IOException
+ */
+ protected void setInputAvroSchema(Set<Path> paths, Configuration conf) throws IOException {
+ inputAvroSchema = useMultipleSchemas ? getMergedSchema(paths, conf)
+ : getAvroSchema(paths, conf);
+ }
+
+ /**
+ * Get avro schema of first input file that matches the location pattern.
+ *
+ * @param paths set of input files
+ * @param conf configuration
+ * @return avro schema
+ * @throws IOException
+ */
+ protected Schema getAvroSchema(Set<Path> paths, Configuration conf) throws IOException {
+ if (paths == null || paths.isEmpty()) {
return null;
}
- FileSystem fs = FileSystem.get(path.toUri(), job.getConfiguration());
+ Path path = paths.iterator().next();
+ FileSystem fs = FileSystem.get(path.toUri(), conf);
return getAvroSchema(path, fs);
}
@@ -205,11 +241,35 @@ public class AvroStorage extends FileInp
if (schema == null)
System.err.println("Cannot get avro schema! Input path " + path + " might be empty.");
- System.err.println(schema.toString());
return schema;
}
/**
+ * Merge multiple input avro schemas into one. Note that we can't merge arbitrary schemas.
+ * Please see AvroStorageUtils.mergeSchema() for what's allowed and what's not allowed.
+ *
+ * @param paths set of input files
+ * @param conf configuration
+ * @return avro schema
+ * @throws IOException
+ */
+ protected Schema getMergedSchema(Set<Path> paths, Configuration conf) throws IOException {
+ Schema result = null;
+ Map<Path, Schema> mergedFiles = new HashMap<Path, Schema>();
+ for (Path path : paths) {
+ FileSystem fs = FileSystem.get(path.toUri(), conf);
+ Schema schema = getSchema(path, fs);
+ result = AvroStorageUtils.mergeSchema(result, schema);
+ mergedFiles.put(path, schema);
+ }
+ // schemaToMergedSchemaMap is only needed when merging multiple records.
+ if (mergedFiles.size() > 1 && result.getType().equals(Schema.Type.RECORD)) {
+ schemaToMergedSchemaMap = AvroStorageUtils.getSchemaToMergedSchemaMap(result, mergedFiles);
+ }
+ return result;
+ }
+
+ /**
* This method is called by {@link #getAvroSchema}. The default implementation
* returns the schema of an avro file; or the schema of the last file in a first-level
* directory (it does not contain sub-directories).
@@ -261,10 +321,21 @@ public class AvroStorage extends FileInp
@Override
public InputFormat getInputFormat() throws IOException {
AvroStorageLog.funcCall("getInputFormat");
- if(inputAvroSchema != null)
- return new PigAvroInputFormat(inputAvroSchema, ignoreBadFiles);
- else
- return new TextInputFormat();
+ InputFormat result = null;
+ if(inputAvroSchema != null) {
+ if (useMultipleSchemas) {
+ // When merging multiple avro schemas, we use embedded schemas
+ // to load input files. So no input avro schema is passed.
+ result = new PigAvroInputFormat(
+ null, ignoreBadFiles, schemaToMergedSchemaMap);
+ } else {
+ result = new PigAvroInputFormat(
+ inputAvroSchema, ignoreBadFiles, schemaToMergedSchemaMap);
+ }
+ } else {
+ result = new TextInputFormat();
+ }
+ return result;
}
@SuppressWarnings("rawtypes")
@@ -295,13 +366,16 @@ public class AvroStorage extends FileInp
* PigSchema.
*/
@Override
- public ResourceSchema getSchema(String location, Job job)
- throws IOException {
+ public ResourceSchema getSchema(String location, Job job) throws IOException {
/* get avro schema */
AvroStorageLog.funcCall("getSchema");
if (inputAvroSchema == null) {
- inputAvroSchema = getAvroSchema(location, job);
+ Set<Path> paths = new HashSet<Path>();
+ Configuration conf = job.getConfiguration();
+ if (AvroStorageUtils.getAllSubDirs(new Path(location), conf, paths)) {
+ setInputAvroSchema(paths, conf);
+ }
}
if(inputAvroSchema != null) {
AvroStorageLog.details( "avro input schema:" + inputAvroSchema);
@@ -310,11 +384,12 @@ public class AvroStorage extends FileInp
ResourceSchema pigSchema = AvroSchema2Pig.convert(inputAvroSchema);
AvroStorageLog.details("pig input schema:" + pigSchema);
if (pigSchema.getFields().length == 1){
- pigSchema = pigSchema.getFields()[0].getSchema();
+ pigSchema = pigSchema.getFields()[0].getSchema();
}
return pigSchema;
- } else
+ } else {
return null;
+ }
}
@Override
@@ -380,14 +455,18 @@ public class AvroStorage extends FileInp
for (int i = 0; i < parts.length; ) {
String name = parts[i].trim();
- if (name.equalsIgnoreCase("no_schema_check")) {
+ if (name.equalsIgnoreCase(NO_SCHEMA_CHECK)) {
checkSchema = false;
/* parameter only, so increase iteration counter by 1 */
i += 1;
- } else if (name.equalsIgnoreCase("ignore_bad_files")) {
+ } else if (name.equalsIgnoreCase(IGNORE_BAD_FILES)) {
ignoreBadFiles = true;
/* parameter only, so increase iteration counter by 1 */
i += 1;
+ } else if (name.equalsIgnoreCase(MULTIPLE_SCHEMAS)) {
+ useMultipleSchemas = true;
+ /* parameter only, so increase iteration counter by 1 */
+ i += 1;
} else {
String value = parts[i+1].trim();
if (name.equalsIgnoreCase("debug")
@@ -572,7 +651,6 @@ public class AvroStorage extends FileInp
: append;
property.setProperty(AVRO_OUTPUT_SCHEMA_PROPERTY, newSchemaStr);
AvroStorageLog.details("New schemas=" + newSchemaStr);
-
}
private String getSchemaKey() {
Modified: pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorageUtils.java
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorageUtils.java?rev=1397333&r1=1397332&r2=1397333&view=diff
==============================================================================
--- pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorageUtils.java (original)
+++ pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorageUtils.java Thu Oct 11 22:06:32 2012
@@ -22,9 +22,12 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
-import java.util.LinkedList;
+import java.util.LinkedHashMap;
import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
import java.util.Set;
import java.net.URI;
import org.apache.avro.Schema;
@@ -95,20 +98,16 @@ public class AvroStorageUtils {
/**
* get input paths to job config
*/
- public static boolean addInputPaths(String pathString, Job job)
- throws IOException
- {
+ public static boolean addInputPaths(String pathString, Job job) throws IOException {
Configuration conf = job.getConfiguration();
FileSystem fs = FileSystem.get(conf);
HashSet<Path> paths = new HashSet<Path>();
- if (getAllSubDirs(new Path(pathString), job, paths))
- {
+ if (getAllSubDirs(new Path(pathString), conf, paths)) {
paths.addAll(Arrays.asList(FileInputFormat.getInputPaths(job)));
FileInputFormat.setInputPaths(job, paths.toArray(new Path[0]));
return true;
}
return false;
-
}
/**
@@ -116,8 +115,8 @@ public class AvroStorageUtils {
*
* @throws IOException
*/
- static boolean getAllSubDirs(Path path, Job job, Set<Path> paths) throws IOException {
- FileSystem fs = FileSystem.get(path.toUri(), job.getConfiguration());
+ public static boolean getAllSubDirs(Path path, Configuration conf, Set<Path> paths) throws IOException {
+ FileSystem fs = FileSystem.get(path.toUri(), conf);
FileStatus[] matchedFiles = fs.globStatus(path, PATH_FILTER);
if (matchedFiles == null || matchedFiles.length == 0) {
return false;
@@ -125,7 +124,7 @@ public class AvroStorageUtils {
for (FileStatus file : matchedFiles) {
if (file.isDir()) {
for (FileStatus sub : fs.listStatus(file.getPath())) {
- getAllSubDirs(sub.getPath(), job, paths);
+ getAllSubDirs(sub.getPath(), conf, paths);
}
} else {
AvroStorageLog.details("Add input file:" + file);
@@ -135,21 +134,6 @@ public class AvroStorageUtils {
return true;
}
- /**
- * Return the first path that matches the glob pattern in the file system.
- *
- * @throws IOException
- */
- public static Path getConcretePathFromGlob(String pattern, Job job) throws IOException {
- Path path = new Path(pattern);
- FileSystem fs = FileSystem.get(path.toUri(), job.getConfiguration());
- FileStatus[] matchedFiles = fs.globStatus(path, PATH_FILTER);
- if (matchedFiles == null || matchedFiles.length == 0) {
- return null;
- }
- return matchedFiles[0].getPath();
- }
-
/** check whether there is NO directory in the input file (status) list*/
public static boolean noDir(FileStatus [] ss) {
for (FileStatus s : ss) {
@@ -182,6 +166,291 @@ public class AvroStorageUtils {
}
/**
+ * This method merges two primitive avro types into one. This method must
+ * be used only to merge two primitive types. For complex types, null will
+ * be returned unless they are both the same type. Also note that not every
+ * primitive type can be merged. For types that cannot be merged, null is
+ * returned.
+ *
+ * @param x first avro type to merge
+ * @param y second avro type to merge
+ * @return merged avro type
+ */
+ private static Schema.Type mergeType(Schema.Type x, Schema.Type y) {
+ if (x.equals(y)) {
+ return x;
+ }
+
+ switch(x) {
+ case INT:
+ switch (y) {
+ case LONG:
+ return Schema.Type.LONG;
+ case FLOAT:
+ return Schema.Type.FLOAT;
+ case DOUBLE:
+ return Schema.Type.DOUBLE;
+ case ENUM:
+ case STRING:
+ return Schema.Type.STRING;
+ }
+
+ case LONG:
+ switch (y) {
+ case INT:
+ return Schema.Type.LONG;
+ case FLOAT:
+ return Schema.Type.FLOAT;
+ case DOUBLE:
+ return Schema.Type.DOUBLE;
+ case ENUM:
+ case STRING:
+ return Schema.Type.STRING;
+ }
+
+ case FLOAT:
+ switch (y) {
+ case INT:
+ case LONG:
+ return Schema.Type.FLOAT;
+ case DOUBLE:
+ return Schema.Type.DOUBLE;
+ case ENUM:
+ case STRING:
+ return Schema.Type.STRING;
+ }
+
+ case DOUBLE:
+ switch (y) {
+ case INT:
+ case LONG:
+ case FLOAT:
+ return Schema.Type.DOUBLE;
+ case ENUM:
+ case STRING:
+ return Schema.Type.STRING;
+ }
+
+ case ENUM:
+ switch (y) {
+ case INT:
+ case LONG:
+ case FLOAT:
+ case DOUBLE:
+ case STRING:
+ return Schema.Type.STRING;
+ }
+
+ case STRING:
+ switch (y) {
+ case INT:
+ case LONG:
+ case FLOAT:
+ case DOUBLE:
+ case ENUM:
+ return Schema.Type.STRING;
+ }
+ }
+
+ // else return just null in particular, bytes and boolean
+ return null;
+ }
+
+ /**
+ * This method merges two avro schemas into one. Note that not every avro schema
+ * can be merged. For complex types to be merged, they must be the same type.
+ * For primitive types to be merged, they must meet certain conditions. For
+ * schemas that cannot be merged, an exception is thrown.
+ *
+ * @param x first avro schema to merge
+ * @param y second avro schema to merge
+ * @return merged avro schema
+ * @throws IOException
+ */
+ public static Schema mergeSchema(Schema x, Schema y) throws IOException {
+ if (x == null) {
+ return y;
+ }
+ if (y == null) {
+ return x;
+ }
+ if (x.equals(y)) {
+ return x;
+ }
+
+ Schema.Type xType = x.getType();
+ Schema.Type yType = y.getType();
+
+ switch (xType) {
+ case RECORD:
+ if (!yType.equals(Schema.Type.RECORD)) {
+ throw new IOException("Cannot merge " + xType + " with " + yType);
+ }
+
+ List<Schema.Field> xFields = x.getFields();
+ List<Schema.Field> yFields = y.getFields();
+
+ // LinkedHashMap is used to keep fields in insertion order.
+ // It's convenient for testing to have determinitic behaviors.
+ Map<String, Schema> fieldName2Schema =
+ new LinkedHashMap<String, Schema>(xFields.size() + yFields.size());
+
+ for (Schema.Field xField : xFields) {
+ fieldName2Schema.put(xField.name(), xField.schema());
+ }
+ for (Schema.Field yField : yFields) {
+ String name = yField.name();
+ Schema currSchema = yField.schema();
+ Schema prevSchema = fieldName2Schema.get(name);
+ if (prevSchema == null) {
+ fieldName2Schema.put(name, currSchema);
+ } else {
+ fieldName2Schema.put(name, mergeSchema(prevSchema, currSchema));
+ }
+ }
+
+ List<Schema.Field> mergedFields = new ArrayList<Schema.Field>(fieldName2Schema.size());
+ for (Entry<String, Schema> entry : fieldName2Schema.entrySet()) {
+ mergedFields.add(new Schema.Field(entry.getKey(), entry.getValue(), "auto-gen", null));
+ }
+ Schema result = Schema.createRecord(
+ "merged", null, "merged schema (generated by AvroStorage)", false);
+ result.setFields(mergedFields);
+ return result;
+
+ case ARRAY:
+ if (!yType.equals(Schema.Type.ARRAY)) {
+ throw new IOException("Cannot merge " + xType + " with " + yType);
+ }
+ return Schema.createArray(mergeSchema(x.getElementType(), y.getElementType()));
+
+ case MAP:
+ if (!yType.equals(Schema.Type.MAP)) {
+ throw new IOException("Cannot merge " + xType + " with " + yType);
+ }
+ return Schema.createMap(mergeSchema(x.getValueType(), y.getValueType()));
+
+ case UNION:
+ if (!yType.equals(Schema.Type.UNION)) {
+ throw new IOException("Cannot merge " + xType + " with " + yType);
+ }
+ List<Schema> xTypes = x.getTypes();
+ List<Schema> yTypes = y.getTypes();
+
+ List<Schema> unionTypes = new ArrayList<Schema>();
+ for (Schema xSchema : xTypes) {
+ unionTypes.add(xSchema);
+ }
+ for (Schema ySchema : yTypes) {
+ if (!unionTypes.contains(ySchema)) {
+ unionTypes.add(ySchema);
+ }
+ }
+ return Schema.createUnion(unionTypes);
+
+ case FIXED:
+ if (!yType.equals(Schema.Type.FIXED)) {
+ throw new IOException("Cannot merge " + xType + " with " + yType);
+ }
+ int xSize = x.getFixedSize();
+ int ySize = y.getFixedSize();
+ if (xSize != ySize) {
+ throw new IOException("Cannot merge FIXED types with different sizes: " + xSize + " and " + ySize);
+ }
+ return Schema.createFixed("merged", null, "merged schema (generated by AvroStorage)", xSize);
+
+ default: // primitive types
+ Schema.Type mergedType = mergeType(xType ,yType);
+ if (mergedType == null) {
+ throw new IOException("Cannot merge " + xType + " with " + yType);
+ }
+ return Schema.create(mergedType);
+ }
+ }
+
+ /**
+ * When merging multiple avro record schemas, we build a map (schemaToMergedSchemaMap)
+ * to associate each input record with a remapping of its fields relative to the merged
+ * schema. Take the following two schemas for example:
+ *
+ * // path1
+ * { "type": "record",
+ * "name": "x",
+ * "fields": [ { "name": "xField", "type": "string" } ]
+ * }
+ *
+ * // path2
+ * { "type": "record",
+ * "name": "y",
+ * "fields": [ { "name": "yField", "type": "string" } ]
+ * }
+ *
+ * The merged schema will be something like this:
+ *
+ * // merged
+ * { "type": "record",
+ * "name": "merged",
+ * "fields": [ { "name": "xField", "type": "string" },
+ * { "name": "yField", "type": "string" } ]
+ * }
+ *
+ * The schemaToMergedSchemaMap will look like this:
+ *
+ * // schemaToMergedSchemaMap
+ * { path1 : { 0 : 0 },
+ * path2 : { 0 : 1 }
+ * }
+ *
+ * The meaning of the map is:
+ * - The field at index '0' of 'path1' is moved to index '0' in merged schema.
+ * - The field at index '0' of 'path2' is moved to index '1' in merged schema.
+ *
+ * With this map, we can now remap the field position of the original schema to
+ * that of the merged schema. This is necessary because in the backend, we don't
+ * use the merged avro schema but embedded avro schemas of input files to load
+ * them. Therefore, we must relocate each field from old positions in the original
+ * schema to new positions in the merged schema.
+ *
+ * @param mergedSchema new schema generated from multiple input schemas
+ * @param mergedFiles input avro files that are merged
+ * @return schemaToMergedSchemaMap that maps old position of each field in the
+ * original schema to new position in the new schema
+ * @throws IOException
+ */
+ public static Map<Path, Map<Integer, Integer>> getSchemaToMergedSchemaMap(
+ Schema mergedSchema, Map<Path, Schema> mergedFiles) throws IOException {
+
+ if (!mergedSchema.getType().equals(Schema.Type.RECORD)) {
+ throw new IOException("Remapping of non-record schemas is not supported");
+ }
+
+ Map<Path, Map<Integer, Integer>> result =
+ new HashMap<Path, Map<Integer, Integer>>(mergedFiles.size());
+
+ // map from field position in old schema to field position in new schema
+ for (Map.Entry<Path, Schema> entry : mergedFiles.entrySet()) {
+ Path path = entry.getKey();
+ Schema schema = entry.getValue();
+ if (!schema.getType().equals(Schema.Type.RECORD)) {
+ throw new IOException("Remapping of non-record schemas is not supported");
+ }
+ List<Field> fields = schema.getFields();
+ Map<Integer, Integer> oldPos2NewPos = result.get(path);
+ if (oldPos2NewPos == null) {
+ oldPos2NewPos = new HashMap<Integer, Integer>(fields.size());
+ result.put(path, oldPos2NewPos);
+ }
+ for (Field field : fields) {
+ String fieldName = field.name();
+ int oldPos = schema.getField(fieldName).pos();
+ int newPos = mergedSchema.getField(fieldName).pos();
+ oldPos2NewPos.put(oldPos, newPos);
+ }
+ }
+ return result;
+ }
+
+ /**
* Wrap an avro schema as a nullable union if needed.
* For instance, wrap schema "int" as ["null", "int"]
*/
Modified: pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroInputFormat.java
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroInputFormat.java?rev=1397333&r1=1397332&r2=1397333&view=diff
==============================================================================
--- pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroInputFormat.java (original)
+++ pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroInputFormat.java Thu Oct 11 22:06:32 2012
@@ -20,8 +20,10 @@ package org.apache.pig.piggybank.storage
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
import org.apache.avro.Schema;
import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.InputSplit;
@@ -33,13 +35,19 @@ import org.apache.hadoop.mapreduce.lib.i
/**
* The InputFormat for avro data.
- *
+ *
*/
public class PigAvroInputFormat extends FileInputFormat<NullWritable, Writable> {
private Schema schema = null; /* avro schema */
private boolean ignoreBadFiles = false; /* whether ignore corrupted files during load */
+ /* if multiple avro record schemas are merged, this map associates each input
+ * record with a remapping of its fields relative to the merged schema. please
+ * see AvroStorageUtils.getSchemaToMergedSchemaMap() for more details.
+ */
+ private Map<Path, Map<Integer, Integer>> schemaToMergedSchemaMap;
+
/**
* empty constructor
*/
@@ -50,14 +58,18 @@ public class PigAvroInputFormat extends
* constructor called by AvroStorage to pass in schema and ignoreBadFiles.
* @param schema input data schema
* @param ignoreBadFiles whether ignore corrupted files during load
+ * @param schemaToMergedSchemaMap map that associates each input record
+ * with a remapping of its fields relative to the merged schema
*/
- public PigAvroInputFormat(Schema schema, boolean ignoreBadFiles) {
+ public PigAvroInputFormat(Schema schema, boolean ignoreBadFiles,
+ Map<Path, Map<Integer, Integer>> schemaToMergedSchemaMap) {
this.schema = schema;
this.ignoreBadFiles = ignoreBadFiles;
+ this.schemaToMergedSchemaMap = schemaToMergedSchemaMap;
}
/**
- * Create and return an avro record reader.
+ * Create and return an avro record reader.
* It uses the input schema passed in to the
* constructor.
*/
@@ -66,7 +78,8 @@ public class PigAvroInputFormat extends
createRecordReader(InputSplit split, TaskAttemptContext context)
throws IOException, InterruptedException {
context.setStatus(split.toString());
- return new PigAvroRecordReader(context, (FileSplit) split, schema, ignoreBadFiles);
+ return new PigAvroRecordReader(context, (FileSplit) split, schema,
+ ignoreBadFiles, schemaToMergedSchemaMap);
}
}
Modified: pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroRecordReader.java
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroRecordReader.java?rev=1397333&r1=1397332&r2=1397333&view=diff
==============================================================================
--- pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroRecordReader.java (original)
+++ pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroRecordReader.java Thu Oct 11 22:06:32 2012
@@ -33,10 +33,12 @@ import org.apache.hadoop.mapreduce.lib.i
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
+import java.util.ArrayList;
+import java.util.Map;
+
/**
- * This is an implementation of record reader which reads in avro data and
+ * This is an implementation of record reader which reads in avro data and
* convert them into <NullWritable, Writable> pairs.
- *
*/
public class PigAvroRecordReader extends RecordReader<NullWritable, Writable> {
@@ -47,17 +49,32 @@ public class PigAvroRecordReader extends
private long start;
private long end;
private Path path;
- private boolean ignoreBadFiles;
+ private boolean ignoreBadFiles; /* whether ignore corrupted files during load */
+
+ private TupleFactory tupleFactory = TupleFactory.getInstance();
+
+ /* if multiple avro record schemas are merged, this list will hold field objects
+ * of the merged schema.
+ */
+ private ArrayList<Object> mProtoTuple;
+
+ /* if multiple avro record schemas are merged, this map associates each input
+ * record with a remapping of its fields relative to the merged schema. please
+ * see AvroStorageUtils.getSchemaToMergedSchemaMap() for more details.
+ */
+ private Map<Path, Map<Integer, Integer>> schemaToMergedSchemaMap;
/**
* constructor to initialize input and avro data reader
*/
public PigAvroRecordReader(TaskAttemptContext context, FileSplit split,
- Schema schema, boolean ignoreBadFiles) throws IOException {
+ Schema schema, boolean ignoreBadFiles,
+ Map<Path, Map<Integer, Integer>> schemaToMergedSchemaMap) throws IOException {
this.path = split.getPath();
this.in = new AvroStorageInputStream(path, context);
- if(schema == null)
- throw new IOException("Need to provide input avro schema");
+ if(schema == null) {
+ AvroStorageLog.details("No avro schema given; assuming the schema is embedded");
+ }
try {
this.reader = new DataFileReader<Object>(in, new PigAvroDatumReader(schema));
@@ -69,6 +86,22 @@ public class PigAvroRecordReader extends
this.start = in.tell();
this.end = split.getStart() + split.getLength();
this.ignoreBadFiles = ignoreBadFiles;
+ this.schemaToMergedSchemaMap = schemaToMergedSchemaMap;
+ if (schemaToMergedSchemaMap != null) {
+ // initialize mProtoTuple
+ int maxPos = 0;
+ for (Map<Integer, Integer> map : schemaToMergedSchemaMap.values()) {
+ for (Integer i : map.values()) {
+ maxPos = Math.max(i, maxPos);
+ }
+ }
+ int tupleSize = maxPos + 1;
+ AvroStorageLog.details("Creating proto tuple of fixed size: " + tupleSize);
+ mProtoTuple = new ArrayList<Object>(tupleSize);
+ for (int i = 0; i < tupleSize; i++) {
+ mProtoTuple.add(i, null);
+ }
+ }
}
@Override
@@ -93,20 +126,45 @@ public class PigAvroRecordReader extends
@Override
public Writable getCurrentValue() throws IOException, InterruptedException {
Object obj = reader.next();
+ Tuple result = null;
if (obj instanceof Tuple) {
AvroStorageLog.details("Class =" + obj.getClass());
- return (Tuple) obj;
+ result = (Tuple) obj;
} else {
AvroStorageLog.details("Wrap calss " + obj.getClass() + " as a tuple.");
- return wrapAsTuple(obj);
+ result = wrapAsTuple(obj);
+ }
+ if (schemaToMergedSchemaMap != null) {
+ // remap the position of fields to the merged schema
+ Map<Integer, Integer> map = schemaToMergedSchemaMap.get(path);
+ if (map == null) {
+ throw new IOException("The schema of '" + path + "' " +
+ "is not merged by AvroStorage.");
+ }
+ result = remap(result, map);
+ }
+ return result;
+ }
+
+ /**
+ * Remap the position of fields to the merged schema
+ */
+ private Tuple remap(Tuple tuple, Map<Integer, Integer> map) throws IOException {
+ try {
+ for (int pos = 0; pos < tuple.size(); pos++) {
+ mProtoTuple.set(map.get(pos), tuple.get(pos));
+ }
+ } catch (Exception e) {
+ throw new IOException(e);
}
+ return tupleFactory.newTuple(mProtoTuple);
}
/**
* Wrap non-tuple value as a tuple
*/
protected Tuple wrapAsTuple(Object in) {
- Tuple tuple = TupleFactory.getInstance().newTuple();
+ Tuple tuple = tupleFactory.newTuple();
tuple.append(in);
return tuple;
}
Modified: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/TestAvroStorage.java
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/TestAvroStorage.java?rev=1397333&r1=1397332&r2=1397333&view=diff
==============================================================================
--- pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/TestAvroStorage.java (original)
+++ pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/TestAvroStorage.java Thu Oct 11 22:06:32 2012
@@ -159,6 +159,8 @@ public class TestAvroStorage {
" } ]" +
" }";
final private String testCorruptedFile = getInputFile("test_corrupted_file.avro");
+ final private String testMultipleSchemas1File = getInputFile("test_primitive_types/*");
+ final private String testMultipleSchemas2File = getInputFile("test_complex_types/*");
@BeforeClass
public static void setup() throws ExecException {
@@ -461,6 +463,105 @@ public class TestAvroStorage {
}
@Test
+ public void testMultipleSchemas1() throws IOException {
+ // Verify that multiple primitive types can be loaded.
+ // Input Avro files have the following schemas:
+ // "int"
+ // "long"
+ // "float"
+ // "double"
+ // "string"
+ // { "type" : "enum", "name" : "foo", "symbols" : [ "6" ] }
+ // Merged Avro schema looks like this:
+ // "string"
+ // The relation 'in' looks like this: (order of rows can be different.)
+ // (6)
+ // (4.0)
+ // (3.0)
+ // (5)
+ // (2)
+ // (1)
+ // Avro file stored after processing looks like this:
+ // "1"
+ // "2"
+ // "3.0"
+ // "4.0"
+ // "5"
+ // "6"
+ String output= outbasedir + "testMultipleSchemas1";
+ String expected = basedir + "expected_testMultipleSchemas1.avro";
+ deleteDirectory(new File(output));
+ String [] queries = {
+ " in = LOAD '" + testMultipleSchemas1File +
+ "' USING org.apache.pig.piggybank.storage.avro.AvroStorage ('multiple_schemas');",
+ " s = FOREACH in GENERATE StringConcat($0);",
+ " o = ORDER s BY $0;",
+ " STORE o INTO '" + output +
+ "' USING org.apache.pig.piggybank.storage.avro.AvroStorage ('schema', '\"string\"');"
+ };
+ testAvroStorage(queries);
+ verifyResults(output, expected);
+ }
+
+ @Test
+ public void testMultipleSchemas2() throws IOException {
+ // Verify that multiple complex types (records) can be loaded.
+ // Input Avro files have the following schemas:
+ // { "type" : "record", "name" : "r", "fields" : [ { "name" : "i", "type" : "int" } ] }
+ // { "type" : "record", "name" : "r", "fields" : [ { "name" : "l", "type" : "long" } ] }
+ // { "type" : "record", "name" : "r", "fields" : [ { "name" : "f", "type" : "float" } ] }
+ // { "type" : "record", "name" : "r", "fields" : [ { "name" : "d", "type" : "double" } ] }
+ // { "type" : "record", "name" : "r", "fields" : [ { "name" : "s", "type" : "string" } ] }
+ // { "type" : "record", "name" : "r", "fields" : [ { "name" : "e", "type" : {
+ // "type" : "enum", "name" : "foo", "symbols" : [ "6" ] } } ] }
+ // Merged Avro schema looks like this:
+ // { "type" : "record",
+ // "name" : "merged",
+ // "fields" : [ { "name" : "i", "type" : "int" },
+ // { "name" : "l", "type" : "long" },
+ // { "name" : "f", "type" : "float" },
+ // { "name" : "d", "type" : "double" },
+ // { "name" : "s", "type" : "string" },
+ // { "name" : "e", "type" : {
+ // "type" : "enum", "name" : "foo", "symbols" : [ "6" ] } }
+ // ]
+ // }
+ // The relation 'in' looks like this: (order of rows can be different.)
+ // (,,6,,,)
+ // (,,,,4.0,)
+ // (,,,,,3.0)
+ // (,5,,,,)
+ // (,,,2,,)
+ // (1,,,,,)
+ // Avro file stored after processing looks like this:
+ // "1"
+ // "2"
+ // "3.0"
+ // "4.0"
+ // "5"
+ // "6"
+ String output= outbasedir + "testMultipleSchemas2";
+ String expected = basedir + "expected_testMultipleSchemas2.avro";
+ deleteDirectory(new File(output));
+ String [] queries = {
+ " in = LOAD '" + testMultipleSchemas2File +
+ "' USING org.apache.pig.piggybank.storage.avro.AvroStorage ('multiple_schemas');",
+ " f = FOREACH in GENERATE ($0 is not null ? (chararray)$0 : '')," +
+ " ($1 is not null ? (chararray)$1 : '')," +
+ " ($2 is not null ? (chararray)$2 : '')," +
+ " ($3 is not null ? (chararray)$3 : '')," +
+ " ($4 is not null ? (chararray)$4 : '')," +
+ " ($5 is not null ? (chararray)$5 : '');",
+ " c = FOREACH f GENERATE StringConcat( $0, $1, $2, $3, $4, $5 );",
+ " o = ORDER c BY $0;",
+ " STORE o INTO '" + output +
+ "' USING org.apache.pig.piggybank.storage.avro.AvroStorage ('schema', '\"string\"');"
+ };
+ testAvroStorage(queries);
+ verifyResults(output, expected);
+ }
+
+ @Test
public void testDir() throws IOException {
// Verify that all files in a directory including its sub-directories are loaded.
String output= outbasedir + "testDir";
Modified: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/TestAvroStorageUtils.java
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/TestAvroStorageUtils.java?rev=1397333&r1=1397332&r2=1397333&view=diff
==============================================================================
--- pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/TestAvroStorageUtils.java (original)
+++ pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/TestAvroStorageUtils.java Thu Oct 11 22:06:32 2012
@@ -26,6 +26,10 @@ import org.junit.Assert;
import org.junit.Test;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
import static org.junit.Assert.*;
@@ -89,39 +93,383 @@ public class TestAvroStorageUtils {
}
@Test
- public void testGetConcretePathFromGlob() throws IOException {
+ public void testGetAllSubDirs() throws IOException {
final String basedir = "file://" + System.getProperty("user.dir");
final String tempdir = Long.toString(System.currentTimeMillis());
final String nonexistentpath = basedir + "/" + tempdir + "/this_path_does_not_exist";
- Path[] paths = null;
- Path concretePath = null;
- Job job = new Job(new Configuration());
+ String locationStr = null;
+ Set<Path> paths = new HashSet<Path>();
+ Configuration conf = new Configuration();
// existent path
- String locationStr = basedir;
- concretePath = AvroStorageUtils.getConcretePathFromGlob(locationStr, job);
- assertEquals(basedir, concretePath.toUri().toString());
+ locationStr = basedir;
+ assertTrue(AvroStorageUtils.getAllSubDirs(new Path(locationStr), conf, paths));
+ assertFalse(paths.isEmpty());
+ paths.clear();
// non-existent path
locationStr = nonexistentpath;
- concretePath = AvroStorageUtils.getConcretePathFromGlob(locationStr, job);
- assertEquals(null, concretePath);
+ assertFalse(AvroStorageUtils.getAllSubDirs(new Path(locationStr), conf, paths));
+ assertTrue(paths.isEmpty());
+ paths.clear();
// empty glob pattern
locationStr = basedir + "/{}";
- concretePath = AvroStorageUtils.getConcretePathFromGlob(locationStr, job);
- assertEquals(null, concretePath);
+ assertFalse(AvroStorageUtils.getAllSubDirs(new Path(locationStr), conf, paths));
+ assertTrue(paths.isEmpty());
+ paths.clear();
// bad glob pattern
locationStr = basedir + "/{1,";
try {
- concretePath = AvroStorageUtils.getConcretePathFromGlob(locationStr, job);
- Assert.fail();
+ AvroStorageUtils.getAllSubDirs(new Path(locationStr), conf, paths);
+ Assert.fail("Negative test to test illegal file pattern. Should not be succeeding!");
} catch (IOException e) {
// The message of the exception for illegal file pattern is rather long,
// so we simply confirm if it contains 'illegal file pattern'.
assertTrue(e.getMessage().contains("Illegal file pattern"));
}
}
+
+ // test merging null and non-null
+ @Test
+ public void testMergeSchema1() throws IOException {
+ Schema nonNull = Schema.create(Schema.Type.INT);
+ assertEquals(AvroStorageUtils.mergeSchema(null, null), null);
+ assertEquals(AvroStorageUtils.mergeSchema(null, nonNull), nonNull);
+ assertEquals(AvroStorageUtils.mergeSchema(nonNull, null), nonNull);
+ }
+
+ // test merging primitive types
+ @Test
+ public void testMergeSchema2() throws IOException {
+ Schema.Type primitiveType[] = {
+ Schema.Type.NULL,
+ Schema.Type.BOOLEAN,
+ Schema.Type.BYTES,
+ Schema.Type.INT,
+ Schema.Type.LONG,
+ Schema.Type.FLOAT,
+ Schema.Type.DOUBLE,
+ Schema.Type.ENUM,
+ Schema.Type.STRING,
+ };
+
+ // First index is type1, and second index is type2. Note that Schema.Type.NULL
+ // means the Avro null type while null means undefined.
+ Schema.Type expectedType[][] = {
+ {
+ Schema.Type.NULL, // = null + null
+ null, // = null + boolean
+ null, // = null + bytes
+ null, // = null + int
+ null, // = null + long
+ null, // = null + float
+ null, // = null + double
+ null, // = null + enum
+ null, // = null + string
+ },
+ {
+ null, // = boolean + null
+ Schema.Type.BOOLEAN, // = boolean + boolean
+ null, // = boolean + bytes
+ null, // = boolean + int
+ null, // = boolean + long
+ null, // = boolean + float
+ null, // = boolean + double
+ null, // = boolean + enum
+ null, // = boolean + string
+ },
+ {
+ null, // = bytes + null
+ null, // = bytes + boolean
+ Schema.Type.BYTES, // = bytes + bytes
+ null, // = bytes + int
+ null, // = bytes + long
+ null, // = bytes + float
+ null, // = bytes + double
+ null, // = bytes + enum
+ null, // = bytes + string
+ },
+ {
+ null, // = int + null
+ null, // = int + boolean
+ null, // = int + bytes
+ Schema.Type.INT, // = int + int
+ Schema.Type.LONG, // = int + long
+ Schema.Type.FLOAT, // = int + float
+ Schema.Type.DOUBLE, // = int + double
+ Schema.Type.STRING, // = int + enum
+ Schema.Type.STRING, // = int + string
+ },
+ {
+ null, // = long + null
+ null, // = long + boolean
+ null, // = long + bytes
+ Schema.Type.LONG, // = long + int
+ Schema.Type.LONG, // = long + long
+ Schema.Type.FLOAT, // = long + float
+ Schema.Type.DOUBLE, // = long + double
+ Schema.Type.STRING, // = long + enum
+ Schema.Type.STRING, // = long + string
+ },
+ {
+ null, // = float + null
+ null, // = float + boolean
+ null, // = float + bytes
+ Schema.Type.FLOAT, // = float + int
+ Schema.Type.FLOAT, // = float + long
+ Schema.Type.FLOAT, // = float + float
+ Schema.Type.DOUBLE, // = float + double
+ Schema.Type.STRING, // = float + enum
+ Schema.Type.STRING, // = float + string
+ },
+ {
+ null, // = double + null
+ null, // = double + boolean
+ null, // = double + bytes
+ Schema.Type.DOUBLE, // = double + int
+ Schema.Type.DOUBLE, // = double + long
+ Schema.Type.DOUBLE, // = double + float
+ Schema.Type.DOUBLE, // = double + double
+ Schema.Type.STRING, // = double + enum
+ Schema.Type.STRING, // = double + string
+ },
+ {
+ null, // = enum + null
+ null, // = enum + boolean
+ null, // = enum + bytes
+ Schema.Type.STRING, // = enum + int
+ Schema.Type.STRING, // = enum + long
+ Schema.Type.STRING, // = enum + float
+ Schema.Type.STRING, // = enum + double
+ Schema.Type.ENUM, // = enum + enum
+ Schema.Type.STRING, // = enum + string
+ },
+ {
+ null, // = string + null
+ null, // = string + boolean
+ null, // = string + bytes
+ Schema.Type.STRING, // = string + int
+ Schema.Type.STRING, // = string + long
+ Schema.Type.STRING, // = string + float
+ Schema.Type.STRING, // = string + double
+ Schema.Type.STRING, // = string + enum
+ Schema.Type.STRING, // = string + string
+ },
+ };
+
+ List<String> enumSymbols = new ArrayList<String>();
+ enumSymbols.add("sym1");
+ enumSymbols.add("sym2");
+ Schema enumSchema = Schema.createEnum("enum", null, null, enumSymbols);
+
+ for (int i = 0; i < primitiveType.length; i++) {
+ Schema.Type x = primitiveType[i];
+ for (int j = 0; j < primitiveType.length; j++) {
+ Schema.Type y = primitiveType[j];
+ if (expectedType[i][j] == null) {
+ try {
+ Schema z = AvroStorageUtils.mergeSchema(
+ x.equals(Schema.Type.ENUM) ? enumSchema : Schema.create(x),
+ y.equals(Schema.Type.ENUM) ? enumSchema : Schema.create(y));
+ Assert.fail("exception is expected, but " + z.getType() + " is returned");
+ } catch (IOException e) {
+ assertEquals("Cannot merge "+ x +" with " +y, e.getMessage());
+ }
+ } else {
+ Schema z = AvroStorageUtils.mergeSchema(
+ x.equals(Schema.Type.ENUM) ? enumSchema : Schema.create(x),
+ y.equals(Schema.Type.ENUM) ? enumSchema : Schema.create(y));
+ assertEquals(expectedType[i][j], z.getType());
+ }
+ }
+ }
+ }
+
+ // test merging different kinds of complex types (negative test)
+ // different kinds of complex types cannot be merged
+ @Test
+ public void testMergeSchema3() throws IOException {
+ Schema complexType[] = {
+ Schema.createRecord(new ArrayList<Schema.Field>()),
+ Schema.createArray(Schema.create(Schema.Type.INT)),
+ Schema.createMap(Schema.create(Schema.Type.INT)),
+ Schema.createUnion(new ArrayList<Schema>()),
+ Schema.createFixed("fixed", null, null, 1),
+ };
+
+ for (int i = 0; i < complexType.length; i++) {
+ Schema x = complexType[i];
+ for (int j = 0; j < complexType.length; j++) {
+ Schema y = complexType[j];
+ if (i != j) {
+ try {
+ Schema z = AvroStorageUtils.mergeSchema(x, y);
+ Assert.fail("exception is expected, but " + z.getType() + " is returned");
+ } catch (IOException e) {
+ assertEquals("Cannot merge "+ x.getType()+ " with "+ y.getType(), e.getMessage());
+ }
+ }
+ }
+ }
+ }
+
+ // test merging two records
+ @Test
+ public void testMergeSchema4() throws IOException {
+ Schema x, y, z;
+
+ // fields have the same names and types
+ List<Schema.Field> fields1 = new ArrayList<Schema.Field>();
+ fields1.add(new Schema.Field("f1", Schema.create(Schema.Type.INT), null, null));
+ x = Schema.createRecord(fields1);
+
+ List<Schema.Field> fields2 = new ArrayList<Schema.Field>();
+ fields2.add(new Schema.Field("f1", Schema.create(Schema.Type.INT), null, null));
+ y = Schema.createRecord(fields2);
+
+ z = AvroStorageUtils.mergeSchema(x, y);
+ assertEquals(x.getFields().size(), z.getFields().size());
+ assertEquals(Schema.Type.INT, z.getField("f1").schema().getType());
+
+ // fields have the same names and mergeable types
+ List<Schema.Field> fields3 = new ArrayList<Schema.Field>();
+ fields3.add(new Schema.Field("f1", Schema.create(Schema.Type.INT), null, null));
+ x = Schema.createRecord(fields3);
+
+ List<Schema.Field> fields4 = new ArrayList<Schema.Field>();
+ fields4.add(new Schema.Field("f1", Schema.create(Schema.Type.DOUBLE), null, null));
+ y = Schema.createRecord(fields4);
+
+ z = AvroStorageUtils.mergeSchema(x, y);
+ assertEquals(x.getFields().size(), z.getFields().size());
+ assertEquals(Schema.Type.DOUBLE, z.getField("f1").schema().getType());
+
+ // fields have the same names but not mergeable types
+ List<Schema.Field> fields5 = new ArrayList<Schema.Field>();
+ fields5.add(new Schema.Field("f1", Schema.create(Schema.Type.INT), null, null));
+ x = Schema.createRecord(fields5);
+
+ List<Schema.Field> fields6 = new ArrayList<Schema.Field>();
+ fields6.add(new Schema.Field("f1", Schema.create(Schema.Type.BOOLEAN), null, null));
+ y = Schema.createRecord(fields6);
+
+ try {
+ z = AvroStorageUtils.mergeSchema(x, y);
+ Assert.fail("exception is expected, but " + z.getType() + " is returned");
+ } catch (IOException e) {
+ assertEquals("Cannot merge "+ x.getField("f1").schema().getType() +
+ " with "+ y.getField("f1").schema().getType(), e.getMessage());
+ }
+
+ // fields have different names
+ List<Schema.Field> fields7 = new ArrayList<Schema.Field>();
+ fields7.add(new Schema.Field("f1", Schema.create(Schema.Type.INT), null, null));
+ x = Schema.createRecord(fields7);
+
+ List<Schema.Field> fields8 = new ArrayList<Schema.Field>();
+ fields8.add(new Schema.Field("f2", Schema.create(Schema.Type.DOUBLE), null, null));
+ y = Schema.createRecord(fields8);
+
+ z = AvroStorageUtils.mergeSchema(x, y);
+ assertEquals(x.getFields().size() + y.getFields().size(), z.getFields().size());
+ assertEquals(Schema.Type.INT, z.getField("f1").schema().getType());
+ assertEquals(Schema.Type.DOUBLE, z.getField("f2").schema().getType());
+ }
+
+ // test merging two maps
+ @Test
+ public void testMergeSchema5() throws IOException {
+ Schema x, y, z;
+
+ // element types are mergeable
+ x = Schema.createArray(Schema.create(Schema.Type.INT));
+ y = Schema.createArray(Schema.create(Schema.Type.DOUBLE));
+
+ z = AvroStorageUtils.mergeSchema(x, y);
+ assertEquals(Schema.Type.ARRAY, z.getType());
+ assertEquals(Schema.Type.DOUBLE, z.getElementType().getType());
+
+ // element types are not mergeable
+ x = Schema.createArray(Schema.create(Schema.Type.INT));
+ y = Schema.createArray(Schema.create(Schema.Type.BOOLEAN));
+
+ try {
+ z = AvroStorageUtils.mergeSchema(x, y);
+ Assert.fail("exception is expected, but " + z.getType() + " is returned");
+ } catch (IOException e) {
+ assertEquals("Cannot merge "+ x.getElementType().getType() +
+ " with "+ y.getElementType().getType(), e.getMessage());
+ }
+ }
+
+ // test merging two maps
+ @Test
+ public void testMergeSchema6() throws IOException {
+ Schema x, y, z;
+
+ // value types are mergeable
+ x = Schema.createMap(Schema.create(Schema.Type.INT));
+ y = Schema.createMap(Schema.create(Schema.Type.DOUBLE));
+
+ z = AvroStorageUtils.mergeSchema(x, y);
+ assertEquals(Schema.Type.MAP, z.getType());
+ assertEquals(Schema.Type.DOUBLE, z.getValueType().getType());
+
+ // value types are not mergeable
+ x = Schema.createMap(Schema.create(Schema.Type.INT));
+ y = Schema.createMap(Schema.create(Schema.Type.BOOLEAN));
+
+ try {
+ z = AvroStorageUtils.mergeSchema(x, y);
+ Assert.fail("exception is expected, but " + z.getType() + " is returned");
+ } catch (IOException e) {
+ assertEquals("Cannot merge "+ x.getValueType().getType() +
+ " with "+ y.getValueType().getType(), e.getMessage());
+ }
+ }
+
+ // test merging two unions
+ @Test
+ public void testMergeSchema7() throws IOException {
+ Schema x, y, z;
+
+ List<Schema> types1 = new ArrayList<Schema>();
+ types1.add(Schema.create(Schema.Type.INT));
+ List<Schema> types2 = new ArrayList<Schema>();
+ types2.add(Schema.create(Schema.Type.DOUBLE));
+
+ x = Schema.createUnion(types1);
+ y = Schema.createUnion(types2);
+
+ z = AvroStorageUtils.mergeSchema(x, y);
+ assertEquals(Schema.Type.UNION, z.getType());
+ assertTrue(z.getTypes().contains(types1.get(0)));
+ assertTrue(z.getTypes().contains(types2.get(0)));
+ }
+
+ // test merging two fixeds
+ @Test
+ public void testMergeSchema8() throws IOException {
+ Schema x, y, z;
+
+ x = Schema.createFixed("fixed1", null, null, 1);
+ y = Schema.createFixed("fixed2", null, null, 1);
+
+ z = AvroStorageUtils.mergeSchema(x, y);
+ assertEquals(Schema.Type.FIXED, z.getType());
+ assertEquals(x.getFixedSize(), z.getFixedSize());
+
+ x = Schema.createFixed("fixed1", null, null, 1);
+ y = Schema.createFixed("fixed2", null, null, 2);
+
+ try {
+ z = AvroStorageUtils.mergeSchema(x, y);
+ Assert.fail("exception is expected, but " + z.getType() + " is returned");
+ } catch (IOException e) {
+ assertTrue(e.getMessage().contains("Cannot merge FIXED types with different sizes"));
+ }
+ }
}
Added: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_complex_types/test_record_double.avro
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_complex_types/test_record_double.avro?rev=1397333&view=auto
==============================================================================
Binary file - no diff available.
Propchange: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_complex_types/test_record_double.avro
------------------------------------------------------------------------------
svn:mime-type = application/octet-stream
Added: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_complex_types/test_record_enum.avro
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_complex_types/test_record_enum.avro?rev=1397333&view=auto
==============================================================================
Binary file - no diff available.
Propchange: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_complex_types/test_record_enum.avro
------------------------------------------------------------------------------
svn:mime-type = application/octet-stream
Added: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_complex_types/test_record_float.avro
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_complex_types/test_record_float.avro?rev=1397333&view=auto
==============================================================================
Binary file - no diff available.
Propchange: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_complex_types/test_record_float.avro
------------------------------------------------------------------------------
svn:mime-type = application/octet-stream
Added: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_complex_types/test_record_int.avro
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_complex_types/test_record_int.avro?rev=1397333&view=auto
==============================================================================
Binary file - no diff available.
Propchange: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_complex_types/test_record_int.avro
------------------------------------------------------------------------------
svn:mime-type = application/octet-stream
Added: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_complex_types/test_record_long.avro
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_complex_types/test_record_long.avro?rev=1397333&view=auto
==============================================================================
Binary file - no diff available.
Propchange: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_complex_types/test_record_long.avro
------------------------------------------------------------------------------
svn:mime-type = application/octet-stream
Added: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_complex_types/test_record_string.avro
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_complex_types/test_record_string.avro?rev=1397333&view=auto
==============================================================================
Binary file - no diff available.
Propchange: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_complex_types/test_record_string.avro
------------------------------------------------------------------------------
svn:mime-type = application/octet-stream
Added: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_primitive_types/test_double.avro
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_primitive_types/test_double.avro?rev=1397333&view=auto
==============================================================================
Binary file - no diff available.
Propchange: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_primitive_types/test_double.avro
------------------------------------------------------------------------------
svn:mime-type = application/octet-stream
Added: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_primitive_types/test_enum.avro
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_primitive_types/test_enum.avro?rev=1397333&view=auto
==============================================================================
Binary file - no diff available.
Propchange: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_primitive_types/test_enum.avro
------------------------------------------------------------------------------
svn:mime-type = application/octet-stream
Added: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_primitive_types/test_float.avro
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_primitive_types/test_float.avro?rev=1397333&view=auto
==============================================================================
Binary file - no diff available.
Propchange: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_primitive_types/test_float.avro
------------------------------------------------------------------------------
svn:mime-type = application/octet-stream
Added: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_primitive_types/test_int.avro
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_primitive_types/test_int.avro?rev=1397333&view=auto
==============================================================================
Binary file - no diff available.
Propchange: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_primitive_types/test_int.avro
------------------------------------------------------------------------------
svn:mime-type = application/octet-stream
Added: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_primitive_types/test_long.avro
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_primitive_types/test_long.avro?rev=1397333&view=auto
==============================================================================
Binary file - no diff available.
Propchange: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_primitive_types/test_long.avro
------------------------------------------------------------------------------
svn:mime-type = application/octet-stream
Added: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_primitive_types/test_string.avro
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_primitive_types/test_string.avro?rev=1397333&view=auto
==============================================================================
Binary file - no diff available.
Propchange: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_primitive_types/test_string.avro
------------------------------------------------------------------------------
svn:mime-type = application/octet-stream