You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by sm...@apache.org on 2012/08/20 09:14:55 UTC
svn commit: r1374925 - in /pig/trunk: ./
contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/
contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/
contrib/piggybank/java/src/test/java/org/apache/pig/pi...
Author: sms
Date: Mon Aug 20 07:14:54 2012
New Revision: 1374925
URL: http://svn.apache.org/viewvc?rev=1374925&view=rev
Log:
PIG-2875: Add recursive record support to AvroStorage (cheolsoo via sms)
Added:
pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/expected_testRecursiveRecordReference1.avro (with props)
pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/expected_testRecursiveRecordReference2.avro (with props)
pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/expected_testRecursiveRecordReference3.avro (with props)
pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_generic_union.avro (with props)
pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_recursive_record_in_array.avro (with props)
pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_recursive_record_in_map.avro (with props)
pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_recursive_record_in_record.avro (with props)
pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_recursive_record_in_union.avro (with props)
pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_recursive_record_in_union.avsc
Modified:
pig/trunk/CHANGES.txt
pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroSchema2Pig.java
pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroSchemaManager.java
pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorage.java
pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorageUtils.java
pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/TestAvroStorage.java
pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/TestAvroStorageUtils.java
Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1374925&r1=1374924&r2=1374925&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Mon Aug 20 07:14:54 2012
@@ -24,6 +24,8 @@ INCOMPATIBLE CHANGES
IMPROVEMENTS
+PIG-2875: Add recursive record support to AvroStorage (cheolsoo via sms)
+
PIG-2662: skew join does not honor its config parameters (rajesh.balamohan via thejas)
PIG-2871: Refactor signature for PigReducerEstimator (billgraham)
Modified: pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroSchema2Pig.java
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroSchema2Pig.java?rev=1374925&r1=1374924&r2=1374925&view=diff
==============================================================================
--- pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroSchema2Pig.java (original)
+++ pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroSchema2Pig.java Mon Aug 20 07:14:54 2012
@@ -5,9 +5,9 @@
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
@@ -18,7 +18,10 @@
package org.apache.pig.piggybank.storage.avro;
import java.io.IOException;
+import java.util.HashSet;
import java.util.List;
+import java.util.Set;
+
import org.apache.avro.Schema;
import org.apache.pig.ResourceSchema;
import org.apache.pig.ResourceSchema.ResourceFieldSchema;
@@ -47,13 +50,11 @@ public class AvroSchema2Pig {
*/
public static ResourceSchema convert(Schema schema) throws IOException {
- if (AvroStorageUtils.containsRecursiveRecord(schema))
- throw new IOException ("We don't accept schema containing recursive records.");
-
if (AvroStorageUtils.containsGenericUnion(schema))
throw new IOException ("We don't accept schema containing generic unions.");
- ResourceFieldSchema inSchema = inconvert(schema, FIELD);
+ Set<Schema> visitedRecords = new HashSet<Schema>();
+ ResourceFieldSchema inSchema = inconvert(schema, FIELD, visitedRecords);
ResourceSchema tupleSchema;
if (inSchema.getType() == DataType.TUPLE) {
@@ -73,7 +74,8 @@ public class AvroSchema2Pig {
/**
* Convert a schema with field name to a pig schema
*/
- private static ResourceFieldSchema inconvert(Schema in, String fieldName) throws IOException {
+ private static ResourceFieldSchema inconvert(Schema in, String fieldName, Set<Schema> visitedRecords)
+ throws IOException {
AvroStorageLog.details("InConvert avro schema with field name " + fieldName);
@@ -85,24 +87,29 @@ public class AvroSchema2Pig {
AvroStorageLog.details("convert to a pig tuple");
- fieldSchema.setType(DataType.TUPLE);
- ResourceSchema tupleSchema = new ResourceSchema();
- List<Schema.Field> fields = in.getFields();
- ResourceFieldSchema[] childFields = new ResourceFieldSchema[fields.size()];
- int index = 0;
- for (Schema.Field field : fields) {
- childFields[index++] = inconvert(field.schema(), field.name());
- }
+ if (visitedRecords.contains(in)) {
+ fieldSchema.setType(DataType.BYTEARRAY);
+ } else {
+ visitedRecords.add(in);
+ fieldSchema.setType(DataType.TUPLE);
+ ResourceSchema tupleSchema = new ResourceSchema();
+ List<Schema.Field> fields = in.getFields();
+ ResourceFieldSchema[] childFields = new ResourceFieldSchema[fields.size()];
+ int index = 0;
+ for (Schema.Field field : fields) {
+ childFields[index++] = inconvert(field.schema(), field.name(), visitedRecords);
+ }
- tupleSchema.setFields(childFields);
- fieldSchema.setSchema(tupleSchema);
+ tupleSchema.setFields(childFields);
+ fieldSchema.setSchema(tupleSchema);
+ }
} else if (avroType.equals(Schema.Type.ARRAY)) {
AvroStorageLog.details("convert array to a pig bag");
fieldSchema.setType(DataType.BAG);
Schema elemSchema = in.getElementType();
- ResourceFieldSchema subFieldSchema = inconvert(elemSchema, ARRAY_FIELD);
+ ResourceFieldSchema subFieldSchema = inconvert(elemSchema, ARRAY_FIELD, visitedRecords);
add2BagSchema(fieldSchema, subFieldSchema);
} else if (avroType.equals(Schema.Type.MAP)) {
@@ -114,7 +121,7 @@ public class AvroSchema2Pig {
if (AvroStorageUtils.isAcceptableUnion(in)) {
Schema acceptSchema = AvroStorageUtils.getAcceptedType(in);
- ResourceFieldSchema realFieldSchema = inconvert(acceptSchema, null);
+ ResourceFieldSchema realFieldSchema = inconvert(acceptSchema, null, visitedRecords);
fieldSchema.setType(realFieldSchema.getType());
fieldSchema.setSchema(realFieldSchema.getSchema());
} else
@@ -138,7 +145,7 @@ public class AvroSchema2Pig {
fieldSchema.setType(DataType.LONG);
} else if (avroType.equals(Schema.Type.STRING)) {
fieldSchema.setType(DataType.CHARARRAY);
- } else if (avroType.equals(Schema.Type.NULL)) {
+ } else if (avroType.equals(Schema.Type.NULL)) {
// value of NULL is always NULL
fieldSchema.setType(DataType.INTEGER);
} else {
@@ -154,7 +161,7 @@ public class AvroSchema2Pig {
ResourceFieldSchema subFieldSchema)
throws IOException {
- ResourceFieldSchema wrapped = (subFieldSchema.getType() == DataType.TUPLE)
+ ResourceFieldSchema wrapped = (subFieldSchema.getType() == DataType.TUPLE)
? subFieldSchema
: AvroStorageUtils.wrapAsTuple(subFieldSchema);
Modified: pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroSchemaManager.java
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroSchemaManager.java?rev=1374925&r1=1374924&r2=1374925&view=diff
==============================================================================
--- pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroSchemaManager.java (original)
+++ pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroSchemaManager.java Mon Aug 20 07:14:54 2012
@@ -5,9 +5,9 @@
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
@@ -17,6 +17,7 @@
package org.apache.pig.piggybank.storage.avro;
+import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -27,7 +28,7 @@ import org.apache.avro.Schema.Type;
/**
* This class creates two maps out of a given Avro schema. And it supports
* looking up avro schemas using either type name or field name.
- *
+ *
* 1. map[type name] = > avro schema
* 2. map[field name] => avro schema
*
@@ -42,11 +43,18 @@ public class AvroSchemaManager {
/**
* Construct with a given schema
*/
- public AvroSchemaManager(Schema schema) {
+ public AvroSchemaManager(Schema schema) throws IOException {
name2Schema = new HashMap<String, Schema>();
typeName2Schema = new HashMap<String, Schema>();
+ if (AvroStorageUtils.containsRecursiveRecord(schema)) {
+ throw new IOException ("Schema containing recursive records cannot be referred to"
+ + " by 'data' and 'schema_file'. Please instead use 'same' with a path that"
+ + " points to an avro file encoded by the same schema as what you want to use,"
+ + " or use 'schema' with a json string representation." );
+ }
+
init(null, schema, false);
}
@@ -58,7 +66,7 @@ public class AvroSchemaManager {
/**
* Initialize given a schema
*/
- protected void init(String namespace, Schema schema,
+ protected void init(String namespace, Schema schema,
boolean ignoreNameMap) {
/* put to map[type name]=>schema */
Modified: pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorage.java
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorage.java?rev=1374925&r1=1374924&r2=1374925&view=diff
==============================================================================
--- pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorage.java (original)
+++ pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorage.java Mon Aug 20 07:14:54 2012
@@ -5,9 +5,9 @@
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
@@ -67,22 +67,22 @@ import org.json.simple.parser.ParseExcep
public class AvroStorage extends FileInputLoadFunc implements StoreFuncInterface, LoadMetadata {
/* storeFunc parameters */
- private static final String NOTNULL = "NOTNULL";
+ private static final String NOTNULL = "NOTNULL";
private static final String AVRO_OUTPUT_SCHEMA_PROPERTY = "avro_output_schema";
private static final String SCHEMA_DELIM = "#";
private static final String SCHEMA_KEYVALUE_DELIM = "@";
-
+
/* FIXME: we use this variable to distinguish schemas specified
- * by different AvroStorage calls and will remove this once
- * StoreFunc provides access to Pig output schema in backend.
+ * by different AvroStorage calls and will remove this once
+ * StoreFunc provides access to Pig output schema in backend.
* Default value is 0.
*/
private int storeFuncIndex = 0;
private PigAvroRecordWriter writer = null; /* avro record writer */
- private Schema outputAvroSchema = null; /* output avro schema */
+ private Schema outputAvroSchema = null; /* output avro schema */
/* indicate whether data is nullable */
- private boolean nullable = true;
-
+ private boolean nullable = true;
+
/* loadFunc parameters */
private PigAvroRecordReader reader = null; /* avro record writer */
private Schema inputAvroSchema = null; /* input avro schema */
@@ -110,24 +110,25 @@ public class AvroStorage extends FileInp
outputAvroSchema = null;
nullable = true;
-
checkSchema = true;
- if (parts.length == 1 && parts[0].equalsIgnoreCase("no_schema_check")) {
- checkSchema = false;
- } else {
- /*parse input parameters */
- Map<String, Object> map = parts.length > 1 ? parseStringList(parts) : parseJsonString(parts[0]);
- init(map); /* initialize */
+ if (parts.length == 1 && !parts[0].equalsIgnoreCase("no_schema_check")) {
+ /* If one parameter is given, and that is not 'no_schema_check',
+ * then it must be a json string.
+ */
+ init(parseJsonString(parts[0]));
+ } else {
+ /* parse parameters */
+ init(parseStringList(parts));
}
}
-
+
/**
- * Set input location and obtain input schema.
- *
- * FIXME: currently we assume all avro files under the same "location"
- * share the same schema and will throw exception if not.
+ * Set input location and obtain input schema.
+ *
+ * FIXME: currently we assume all avro files under the same "location"
+ * share the same schema and will throw exception if not.
*/
@Override
public void setLocation(String location, Job job) throws IOException {
@@ -155,12 +156,12 @@ public class AvroStorage extends FileInp
/**
* Get avro schema of input path. There are three cases:
* 1. if path is a file, then return its avro schema;
- * 2. if path is a first-level directory (no sub-directories), then
- * return the avro schema of one underlying file;
- * 3. if path contains sub-directories, then recursively check
- * whether all of them share the same schema and return it
+ * 2. if path is a first-level directory (no sub-directories), then
+ * return the avro schema of one underlying file;
+ * 3. if path contains sub-directories, then recursively check
+ * whether all of them share the same schema and return it
* if so or throw an exception if not.
- *
+ *
* @param path input path
* @param fs file system
* @return avro schema of data
@@ -175,13 +176,13 @@ public class AvroStorage extends FileInp
if (!fs.isDirectory(path)) {
return getSchema(path, fs);
}
-
+
FileStatus[] ss = fs.listStatus(path, AvroStorageUtils.PATH_FILTER);
Schema schema = null;
if (ss.length > 0) {
if (AvroStorageUtils.noDir(ss))
return getSchema(path, fs);
-
+
/*otherwise, check whether schemas of underlying directories are the same */
for (FileStatus s : ss) {
Schema newSchema = getAvroSchema(s.getPath(), fs);
@@ -197,19 +198,19 @@ public class AvroStorage extends FileInp
}
}
}
-
+
if (schema == null)
System.err.println("Cannot get avro schema! Input path " + path + " might be empty.");
-
+
System.err.println(schema.toString());
return schema;
}
/**
- * This method is called by {@link #getAvroSchema}. The default implementation
- * returns the schema of an avro file; or the schema of the last file in a first-level
+ * This method is called by {@link #getAvroSchema}. The default implementation
+ * returns the schema of an avro file; or the schema of the last file in a first-level
* directory (it does not contain sub-directories).
- *
+ *
* @param path path of a file or first level directory
* @param fs file system
* @return avro schema
@@ -263,7 +264,7 @@ public class AvroStorage extends FileInp
return new TextInputFormat();
}
- @SuppressWarnings("rawtypes")
+ @SuppressWarnings("rawtypes")
@Override
public void prepareToRead(RecordReader reader, PigSplit split)
throws IOException {
@@ -283,9 +284,9 @@ public class AvroStorage extends FileInp
}
}
-
+
/* implement LoadMetadata */
-
+
/**
* Get avro schema from "location" and return the converted
* PigSchema.
@@ -329,8 +330,8 @@ public class AvroStorage extends FileInp
}
/**
- * build a property map from a json object
- *
+ * build a property map from a json object
+ *
* @param jsonString json object in string format
* @return a property map
* @throws ParseException
@@ -357,14 +358,14 @@ public class AvroStorage extends FileInp
/* convert avro schema (as json object) to string */
obj.put(key, value.toString().trim());
}
-
+
}
return obj;
}
/**
- * build a property map from a string list
- *
+ * build a property map from a string list
+ *
* @param parts input string list
* @return a property map
* @throws IOException
@@ -374,25 +375,34 @@ public class AvroStorage extends FileInp
Map<String, Object> map = new HashMap<String, Object>();
- for (int i = 0; i < parts.length - 1; i += 2) {
+ for (int i = 0; i < parts.length; ) {
String name = parts[i].trim();
- String value = parts[i+1].trim();
- if (name.equalsIgnoreCase("debug")
- || name.equalsIgnoreCase("index")) {
- /* store value as integer */
- map.put(name, Integer.parseInt(value));
- } else if (name.equalsIgnoreCase("data")
- || name.equalsIgnoreCase("same")
- || name.equalsIgnoreCase("schema")
- || name.equalsIgnoreCase("schema_file")
- || name.matches("field\\d+")) {
- /* store value as string */
- map.put(name, value);
- } else if (name.equalsIgnoreCase("nullable")) {
- /* store value as boolean */
- map.put(name, Boolean.getBoolean(value));
- } else
- throw new IOException("Invalid parameter:" + name);
+ if (name.equalsIgnoreCase("no_schema_check")) {
+ checkSchema = false;
+ /* parameter only, so increase iteration counter by 1 */
+ i += 1;
+ } else {
+ String value = parts[i+1].trim();
+ if (name.equalsIgnoreCase("debug")
+ || name.equalsIgnoreCase("index")) {
+ /* store value as integer */
+ map.put(name, Integer.parseInt(value));
+ } else if (name.equalsIgnoreCase("data")
+ || name.equalsIgnoreCase("same")
+ || name.equalsIgnoreCase("schema")
+ || name.equalsIgnoreCase("schema_file")
+ || name.matches("field\\d+")) {
+ /* store value as string */
+ map.put(name, value);
+ } else if (name.equalsIgnoreCase("nullable")) {
+ /* store value as boolean */
+ map.put(name, Boolean.getBoolean(value));
+ } else {
+ throw new IOException("Invalid parameter:" + name);
+ }
+ /* parameter/value pair, so increase iteration counter by 2 */
+ i += 2;
+ }
}
return map;
}
@@ -432,7 +442,7 @@ public class AvroStorage extends FileInp
String name = entry.getKey().trim();
Object value = entry.getValue();
- if (name.equalsIgnoreCase("index")) {
+ if (name.equalsIgnoreCase("index")) {
/* set index of store function */
storeFuncIndex = (Integer) value;
} else if (name.equalsIgnoreCase("same")) {
@@ -518,7 +528,7 @@ public class AvroStorage extends FileInp
}
/**
- * Append newly specified schema
+ * Append newly specified schema
*/
@Override
public void checkSchema(ResourceSchema s) throws IOException {
@@ -529,7 +539,7 @@ public class AvroStorage extends FileInp
AvroStorageLog.details("Previously defined schemas=" + prevSchemaStr);
String key = getSchemaKey();
- Map<String, String> schemaMap = (prevSchemaStr != null)
+ Map<String, String> schemaMap = (prevSchemaStr != null)
? parseSchemaMap(prevSchemaStr)
: null;
@@ -539,17 +549,19 @@ public class AvroStorage extends FileInp
}
/* validate and convert output schema */
- Schema schema = outputAvroSchema != null
- ? PigSchema2Avro.validateAndConvert(outputAvroSchema, s)
- : PigSchema2Avro.convert(s, nullable);
+ Schema schema = outputAvroSchema != null
+ ? (checkSchema
+ ? PigSchema2Avro.validateAndConvert(outputAvroSchema, s)
+ : outputAvroSchema)
+ : PigSchema2Avro.convert(s, nullable);
AvroStorageLog.info("key=" + key + " outputSchema=" + schema);
String schemaStr = schema.toString();
String append = key + SCHEMA_KEYVALUE_DELIM + schemaStr;
- String newSchemaStr = (schemaMap != null)
- ? prevSchemaStr + SCHEMA_DELIM + append
+ String newSchemaStr = (schemaMap != null)
+ ? prevSchemaStr + SCHEMA_DELIM + append
: append;
property.setProperty(AVRO_OUTPUT_SCHEMA_PROPERTY, newSchemaStr);
AvroStorageLog.details("New schemas=" + newSchemaStr);
@@ -598,7 +610,7 @@ public class AvroStorage extends FileInp
return new PigAvroOutputFormat(schema);
}
- @SuppressWarnings("rawtypes")
+ @SuppressWarnings("rawtypes")
@Override
public void prepareToWrite(RecordWriter writer) throws IOException {
this.writer = (PigAvroRecordWriter) writer;
Modified: pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorageUtils.java
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorageUtils.java?rev=1374925&r1=1374924&r2=1374925&view=diff
==============================================================================
--- pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorageUtils.java (original)
+++ pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorageUtils.java Mon Aug 20 07:14:54 2012
@@ -5,9 +5,9 @@
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
@@ -113,7 +113,7 @@ public class AvroStorageUtils {
/**
* Adds all non-hidden directories and subdirectories to set param
- *
+ *
* @throws IOException
*/
static boolean getAllSubDirs(Path path, Job job, Set<Path> paths) throws IOException {
@@ -158,15 +158,15 @@ public class AvroStorageUtils {
}
return true;
}
-
- /** get last file of a hdfs path if it is a directory;
+
+ /** get last file of a hdfs path if it is a directory;
* or return the file itself if path is a file
*/
public static Path getLast(String path, FileSystem fs) throws IOException {
return getLast(new Path(path), fs);
}
- /** get last file of a hdfs path if it is a directory;
+ /** get last file of a hdfs path if it is a directory;
* or return the file itself if path is a file
*/
public static Path getLast(Path path, FileSystem fs) throws IOException {
@@ -182,7 +182,7 @@ public class AvroStorageUtils {
}
/**
- * Wrap an avro schema as a nullable union if needed.
+ * Wrap an avro schema as a nullable union if needed.
* For instance, wrap schema "int" as ["null", "int"]
*/
public static Schema wrapAsUnion(Schema schema, boolean nullable) {
@@ -204,21 +204,21 @@ public class AvroStorageUtils {
Set<String> set = new HashSet<String> ();
return containsRecursiveRecord(s, set);
}
-
+
/**
* Called by {@link #containsRecursiveRecord(Schema)} and it recursively checks
- * whether the input schema contains recursive records.
+ * whether the input schema contains recursive records.
*/
protected static boolean containsRecursiveRecord(Schema s, Set<String> definedRecordNames) {
-
+
/* if it is a record, check itself and all fields*/
if (s.getType().equals(Schema.Type.RECORD)) {
String name = s.getName();
if (definedRecordNames.contains(name)) return true;
-
+
/* add its own name into defined record set*/
definedRecordNames.add(s.getName());
-
+
/* check all fields */
List<Field> fields = s.getFields();
for (Field field: fields) {
@@ -226,25 +226,25 @@ public class AvroStorageUtils {
if (containsRecursiveRecord(fs, definedRecordNames))
return true;
}
-
+
/* remove its own name from the name set */
definedRecordNames.remove(s.getName());
return false;
}
-
+
/* if it is an array, check its element type */
else if (s.getType().equals(Schema.Type.ARRAY)) {
Schema fs = s.getElementType();
return containsRecursiveRecord(fs, definedRecordNames);
}
-
+
/*if it is a map, check its value type */
else if (s.getType().equals(Schema.Type.MAP)) {
Schema vs = s.getValueType();
return containsRecursiveRecord(vs, definedRecordNames);
}
-
+
/* if it is a union, check all possible types */
else if (s.getType().equals(Schema.Type.UNION)) {
List<Schema> types = s.getTypes();
@@ -254,57 +254,84 @@ public class AvroStorageUtils {
}
return false;
}
-
+
/* return false for other cases */
else {
return false;
}
}
-
+
/** determine whether the input schema contains generic unions */
public static boolean containsGenericUnion(Schema s) {
-
+ /* initialize empty set of visited records */
+ Set<Schema> set = new HashSet<Schema> ();
+ return containsGenericUnion(s, set);
+ }
+
+ /**
+ * Called by {@link #containsGenericUnion(Schema)} and it recursively checks
+ * whether the input schema contains generic unions.
+ */
+ protected static boolean containsGenericUnion(Schema s, Set<Schema> visitedRecords) {
+
/* if it is a record, check all fields*/
if (s.getType().equals(Schema.Type.RECORD)) {
+
+ /* add its own name into visited record set*/
+ visitedRecords.add(s);
+
+ /* check all fields */
List<Field> fields = s.getFields();
for (Field field: fields) {
Schema fs = field.schema();
- if (containsGenericUnion(fs))
- return true;
+ if (!visitedRecords.contains(fs)) {
+ if (containsGenericUnion(fs, visitedRecords)) {
+ return true;
+ }
+ }
}
return false;
}
-
+
/* if it is an array, check its element type */
else if (s.getType().equals(Schema.Type.ARRAY)) {
Schema fs = s.getElementType();
- return containsGenericUnion(fs) ;
+ if (!visitedRecords.contains(fs)) {
+ return containsGenericUnion(fs, visitedRecords);
+ }
+ return false;
}
-
+
/*if it is a map, check its value type */
else if (s.getType().equals(Schema.Type.MAP)) {
Schema vs = s.getValueType();
- return containsGenericUnion(vs) ;
+ if (!visitedRecords.contains(vs)) {
+ return containsGenericUnion(vs, visitedRecords);
+ }
+ return false;
}
-
+
/* if it is a union, check all possible types and itself */
else if (s.getType().equals(Schema.Type.UNION)) {
List<Schema> types = s.getTypes();
for (Schema type: types) {
- if (containsGenericUnion(type) )
- return true;
+ if (!visitedRecords.contains(type)) {
+ if (containsGenericUnion(type, visitedRecords)) {
+ return true;
+ }
+ }
}
/* check whether itself is acceptable (null-union) */
- return ! isAcceptableUnion (s);
+ return !isAcceptableUnion(s);
}
-
+
/* return false for other cases */
else {
return false;
}
}
-
- /** determine whether a union is a nullable union;
+
+ /** determine whether a union is a nullable union;
* note that this function doesn't check containing
* types of the input union recursively. */
public static boolean isAcceptableUnion(Schema in) {
@@ -347,22 +374,22 @@ public class AvroStorageUtils {
/** extract schema from a nullable union */
public static Schema getAcceptedType(Schema in) {
- if (!isAcceptableUnion(in))
+ if (!isAcceptableUnion(in))
throw new RuntimeException("Cannot call this function on a unacceptable union");
-
+
List<Schema> types = in.getTypes();
switch (types.size()) {
- case 0:
+ case 0:
return null; /*union with no type*/
- case 1:
+ case 1:
return types.get(0); /*union with one type*/
case 2:
return (types.get(0).getType().equals(Schema.Type.NULL))
- ? types.get(1)
+ ? types.get(1)
: types.get(0);
default:
return null;
- }
+ }
}
}
Modified: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/TestAvroStorage.java
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/TestAvroStorage.java?rev=1374925&r1=1374924&r2=1374925&view=diff
==============================================================================
--- pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/TestAvroStorage.java (original)
+++ pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/TestAvroStorage.java Mon Aug 20 07:14:54 2012
@@ -5,9 +5,9 @@
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
@@ -60,7 +60,7 @@ public class TestAvroStorage {
final private static String basedir = "src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/";
final private static String outbasedir = "/tmp/TestAvroStorage/";
-
+
public static final PathFilter hiddenPathFilter = new PathFilter() {
public boolean accept(Path p) {
String name = p.getName();
@@ -82,11 +82,82 @@ public class TestAvroStorage {
final private String testArrayFile = getInputFile("test_array.avro");
final private String testRecordFile = getInputFile("test_record.avro");
final private String testRecordSchema = getInputFile("test_record.avsc");
- final private String testRecursiveSchemaFile = getInputFile("test_recursive_schema.avro");
- final private String testGenericUnionSchemaFile = getInputFile("test_generic_union_schema.avro");
+ final private String testGenericUnionFile = getInputFile("test_generic_union.avro");
+ final private String testRecursiveRecordInMap = getInputFile("test_recursive_record_in_map.avro");
+ final private String testRecursiveRecordInArray = getInputFile("test_recursive_record_in_array.avro");
+ final private String testRecursiveRecordInUnion = getInputFile("test_recursive_record_in_union.avro");
+ final private String testRecursiveRecordInRecord = getInputFile("test_recursive_record_in_record.avro");
+ final private String testRecursiveRecordInUnionSchema = getInputFile("test_recursive_record_in_union.avsc");
final private String testTextFile = getInputFile("test_record.txt");
final private String testSingleTupleBagFile = getInputFile("messages.avro");
final private String testNoExtensionFile = getInputFile("test_no_extension");
+ final private String recursiveRecordInMap =
+ " {" +
+ " \"type\" : \"record\"," +
+ " \"name\" : \"recursive_record\"," +
+ " \"fields\" : [ {" +
+ " \"name\" : \"id\"," +
+ " \"type\" : \"int\"" +
+ " }, {" +
+ " \"name\" : \"nested\"," +
+ " \"type\" : [ \"null\", {" +
+ " \"type\" : \"map\"," +
+ " \"values\" : \"recursive_record\"" +
+ " } ]" +
+ " } ]" +
+ " }";
+ final private String recursiveRecordInArray =
+ " {" +
+ " \"type\" : \"record\"," +
+ " \"name\" : \"recursive_record\"," +
+ " \"fields\" : [ {" +
+ " \"name\" : \"id\"," +
+ " \"type\" : \"int\"" +
+ " }, {" +
+ " \"name\" : \"nested\"," +
+ " \"type\" : [ \"null\", {" +
+ " \"type\" : \"array\"," +
+ " \"items\" : \"recursive_record\"" +
+ " } ]" +
+ " } ]" +
+ " }";
+ final private String recursiveRecordInUnion =
+ " {" +
+ " \"type\" : \"record\"," +
+ " \"name\" : \"recursive_record\"," +
+ " \"fields\" : [ {" +
+ " \"name\" : \"value\"," +
+ " \"type\" : \"int\"" +
+ " }, {" +
+ " \"name\" : \"next\"," +
+ " \"type\" : [ \"null\", \"recursive_record\" ]" +
+ " } ]" +
+ " }";
+ final private String recursiveRecordInRecord =
+ " {" +
+ " \"type\" : \"record\"," +
+ " \"name\" : \"recursive_record\"," +
+ " \"fields\" : [ {" +
+ " \"name\" : \"id\"," +
+ " \"type\" : \"int\"" +
+ " }, {" +
+ " \"name\" : \"nested\"," +
+ " \"type\" : [ \"null\", {" +
+ " \"type\" : \"record\"," +
+ " \"name\" : \"nested_record\"," +
+ " \"fields\" : [ {" +
+ " \"name\" : \"value1\"," +
+ " \"type\" : \"string\"" +
+ " }, {" +
+ " \"name\" : \"next\"," +
+ " \"type\" : \"recursive_record\"" +
+ " }, {" +
+ " \"name\" : \"value2\"," +
+ " \"type\" : \"string\"" +
+ " } ]" +
+ " } ]" +
+ " } ]" +
+ " }";
@BeforeClass
public static void setup() throws ExecException {
@@ -100,40 +171,287 @@ public class TestAvroStorage {
}
@Test
- public void testRecursiveSchema() throws IOException {
- // Verify that a FrontendException is thrown if schema is recursive.
- String output= outbasedir + "testRecursiveSchema";
+ public void testRecursiveRecordInMap() throws IOException {
+ // Verify that recursive records in map can be loaded/saved.
+ String output= outbasedir + "testRecursiveRecordInMap";
+ String expected = testRecursiveRecordInMap;
deleteDirectory(new File(output));
String [] queries = {
- " in = LOAD '" + testRecursiveSchemaFile +
+ " in = LOAD '" + testRecursiveRecordInMap +
"' USING org.apache.pig.piggybank.storage.avro.AvroStorage ();",
" STORE in INTO '" + output +
- "' USING org.apache.pig.piggybank.storage.avro.AvroStorage ();"
+ "' USING org.apache.pig.piggybank.storage.avro.AvroStorage (" +
+ " 'no_schema_check'," +
+ " 'schema', '" + recursiveRecordInMap + "' );"
+ };
+ testAvroStorage(queries);
+ verifyResults(output, expected);
+ }
+
+ @Test
+ public void testRecursiveRecordInArray() throws IOException {
+ // Verify that recursive records in array can be loaded/saved.
+ String output= outbasedir + "testRecursiveRecordInArray";
+ String expected = testRecursiveRecordInArray;
+ deleteDirectory(new File(output));
+ String [] queries = {
+ " in = LOAD '" + testRecursiveRecordInArray +
+ "' USING org.apache.pig.piggybank.storage.avro.AvroStorage ();",
+ " STORE in INTO '" + output +
+ "' USING org.apache.pig.piggybank.storage.avro.AvroStorage (" +
+ " 'no_schema_check'," +
+ " 'schema', '" + recursiveRecordInArray + "' );"
+ };
+ testAvroStorage(queries);
+ verifyResults(output, expected);
+ }
+
+ @Test
+ public void testRecursiveRecordInUnion() throws IOException {
+ // Verify that recursive records in union can be loaded/saved.
+ String output= outbasedir + "testRecursiveRecordInUnion";
+ String expected = testRecursiveRecordInUnion;
+ deleteDirectory(new File(output));
+ String [] queries = {
+ " in = LOAD '" + testRecursiveRecordInUnion +
+ "' USING org.apache.pig.piggybank.storage.avro.AvroStorage ();",
+ " STORE in INTO '" + output +
+ "' USING org.apache.pig.piggybank.storage.avro.AvroStorage (" +
+ " 'no_schema_check'," +
+ " 'schema', '" + recursiveRecordInUnion + "' );"
+ };
+ testAvroStorage(queries);
+ verifyResults(output, expected);
+ }
+
+ @Test
+ public void testRecursiveRecordInRecord() throws IOException {
+ // Verify that recursive records in record can be loaded/saved.
+ String output= outbasedir + "testRecursiveRecordInRecord";
+ String expected = testRecursiveRecordInRecord;
+ deleteDirectory(new File(output));
+ String [] queries = {
+ " in = LOAD '" + testRecursiveRecordInRecord +
+ "' USING org.apache.pig.piggybank.storage.avro.AvroStorage ();",
+ " STORE in INTO '" + output +
+ "' USING org.apache.pig.piggybank.storage.avro.AvroStorage (" +
+ " 'no_schema_check'," +
+ " 'schema', '" + recursiveRecordInRecord + "' );"
+ };
+ testAvroStorage(queries);
+ verifyResults(output, expected);
+ }
+
+ @Test
+ public void testRecursiveRecordWithSame() throws IOException {
+ // Verify that avro schema can be specified via an external avro file
+ // instead of a json string.
+ String output= outbasedir + "testRecursiveRecordWithSame";
+ String expected = testRecursiveRecordInUnion;
+ deleteDirectory(new File(output));
+ String [] queries = {
+ " in = LOAD '" + testRecursiveRecordInUnion +
+ "' USING org.apache.pig.piggybank.storage.avro.AvroStorage ();",
+ " STORE in INTO '" + output +
+ "' USING org.apache.pig.piggybank.storage.avro.AvroStorage (" +
+ " 'no_schema_check'," +
+ " 'same', '" + testRecursiveRecordInUnion + "' );"
+ };
+ testAvroStorage(queries);
+ verifyResults(output, expected);
+ }
+
+ @Test
+ public void testRecursiveRecordReference1() throws IOException {
+ // The relation 'in' looks like this:
+ // (1,(2,(3,)))
+ // (2,(3,))
+ // (3,)
+ // $0 looks like this:
+ // (1)
+ // (2)
+ // (3)
+ // Avro file stored after filtering out nulls looks like this:
+ // 1
+ // 2
+ // 3
+ String output= outbasedir + "testRecursiveRecordReference1";
+ String expected = basedir + "expected_testRecursiveRecordReference1.avro";
+ deleteDirectory(new File(output));
+ String [] queries = {
+ " in = LOAD '" + testRecursiveRecordInUnion +
+ "' USING org.apache.pig.piggybank.storage.avro.AvroStorage ();",
+ " first = FOREACH in GENERATE $0 AS value;",
+ " filtered = FILTER first BY value is not null;",
+ " STORE filtered INTO '" + output +
+ "' USING org.apache.pig.piggybank.storage.avro.AvroStorage (" +
+ " 'no_schema_check'," +
+ " 'schema', '\"int\"' );"
+ };
+ testAvroStorage(queries);
+ verifyResults(output, expected);
+ }
+
+ @Test
+ public void testRecursiveRecordReference2() throws IOException {
+ // The relation 'in' looks like this:
+ // (1,(2,(3,)))
+ // (2,(3,))
+ // (3,)
+ // $1.$0 looks like this:
+ // (2)
+ // (3)
+ // ()
+ // Avro file stored after filtering out nulls looks like this:
+ // 2
+ // 3
+ String output= outbasedir + "testRecursiveRecordReference2";
+ String expected = basedir + "expected_testRecursiveRecordReference2.avro";
+ deleteDirectory(new File(output));
+ String [] queries = {
+ " in = LOAD '" + testRecursiveRecordInUnion +
+ "' USING org.apache.pig.piggybank.storage.avro.AvroStorage ();",
+ " second = FOREACH in GENERATE $1.$0 AS value;",
+ " filtered = FILTER second BY value is not null;",
+ " STORE filtered INTO '" + output +
+ "' USING org.apache.pig.piggybank.storage.avro.AvroStorage (" +
+ " 'no_schema_check'," +
+ " 'schema', '\"int\"' );"
+ };
+ testAvroStorage(queries);
+ verifyResults(output, expected);
+ }
+
+ @Test
+ public void testRecursiveRecordReference3() throws IOException {
+ // The relation 'in' looks like this:
+ // (1,(2,(3,)))
+ // (2,(3,))
+ // (3,)
+ // $1.$1.$0 looks like this:
+ // (3)
+ // ()
+ // ()
+ // Avro file stored after filtering out nulls looks like this:
+ // 3
+ String output= outbasedir + "testRecursiveRecordReference3";
+ String expected = basedir + "expected_testRecursiveRecordReference3.avro";
+ deleteDirectory(new File(output));
+ String [] queries = {
+ " in = LOAD '" + testRecursiveRecordInUnion +
+ "' USING org.apache.pig.piggybank.storage.avro.AvroStorage ();",
+ " third = FOREACH in GENERATE $1.$1.$0 AS value;",
+ " filtered = FILTER third BY value is not null;",
+ " STORE filtered INTO '" + output +
+ "' USING org.apache.pig.piggybank.storage.avro.AvroStorage (" +
+ " 'no_schema_check'," +
+ " 'schema', '\"int\"' );"
+ };
+ testAvroStorage(queries);
+ verifyResults(output, expected);
+ }
+
+ @Test
+ public void testRecursiveRecordWithNoAvroSchema() throws IOException {
+ // Verify that recursive records cannot be stored,
+ // if no avro schema is specified either via 'schema' or 'same'.
+ String output= outbasedir + "testRecursiveRecordWithNoAvroSchema";
+ deleteDirectory(new File(output));
+ String [] queries = {
+ " in = LOAD '" + testRecursiveRecordInUnion +
+ "' USING org.apache.pig.piggybank.storage.avro.AvroStorage ();",
+ " STORE in INTO '" + output +
+ "' USING org.apache.pig.piggybank.storage.avro.AvroStorage (" +
+ " 'no_schema_check' );"
+ };
+ // Since Avro schema is not specified via the 'schema' parameter, it is
+ // derived from Pig schema. Job is expected to fail because this derived
+ // Avro schema (bytes) is not compatible with data (tuples).
+ testAvroStorage(true, queries);
+ }
+
+ @Test
+ public void testRecursiveRecordWithSchemaCheck() throws IOException {
+ // Verify that recursive records cannot be stored if schema check is enbled.
+ String output= outbasedir + "testRecursiveWithSchemaCheck";
+ deleteDirectory(new File(output));
+ String [] queries = {
+ " in = LOAD '" + testRecursiveRecordInUnion +
+ "' USING org.apache.pig.piggybank.storage.avro.AvroStorage ();",
+ " STORE in INTO '" + output +
+ "' USING org.apache.pig.piggybank.storage.avro.AvroStorage (" +
+ " 'schema', '" + recursiveRecordInUnion + "' );"
};
try {
testAvroStorage(queries);
- Assert.fail();
+ Assert.fail("Negative test to test an exception. Should not be succeeding!");
+ } catch (IOException e) {
+ // An IOException is thrown by AvroStorage during schema check due to incompatible
+ // data types.
+ assertTrue(e.getMessage().contains("bytearray is not compatible with avro"));
+ }
+ }
+
+ @Test
+ public void testRecursiveRecordWithSchemaFile() throws IOException {
+ // Verify that recursive records cannot be stored if avro schema is specified by 'schema_file'.
+ String output= outbasedir + "testRecursiveWithSchemaFile";
+ deleteDirectory(new File(output));
+ String [] queries = {
+ " in = LOAD '" + testRecursiveRecordInUnion +
+ "' USING org.apache.pig.piggybank.storage.avro.AvroStorage ();",
+ " STORE in INTO '" + output +
+ "' USING org.apache.pig.piggybank.storage.avro.AvroStorage (" +
+ " 'no_schema_check'," +
+ " 'schema_file', '" + testRecursiveRecordInUnionSchema + "' );"
+ };
+ try {
+ testAvroStorage(queries);
+ Assert.fail("Negative test to test an exception. Should not be succeeding!");
} catch (FrontendException e) {
- // The IOException thrown by AvroStorage for recursive schema is caught
+ // The IOException thrown by AvroSchemaManager for recursive record is caught
// by the Pig frontend, and FrontendException is re-thrown.
- assertTrue(e.getMessage().contains("Cannot get schema"));
+ assertTrue(e.getMessage().contains("could not instantiate 'org.apache.pig.piggybank.storage.avro.AvroStorage'"));
+ }
+ }
+
+ @Test
+ public void testRecursiveRecordWithData() throws IOException {
+ // Verify that recursive records cannot be stored if avro schema is specified by 'data'.
+ String output= outbasedir + "testRecursiveWithData";
+ deleteDirectory(new File(output));
+ String [] queries = {
+ " in = LOAD '" + testRecursiveRecordInUnion +
+ "' USING org.apache.pig.piggybank.storage.avro.AvroStorage ();",
+ " STORE in INTO '" + output +
+ "' USING org.apache.pig.piggybank.storage.avro.AvroStorage (" +
+ " 'no_schema_check'," +
+ " 'data', '" + testRecursiveRecordInUnion + "' );"
+ };
+ try {
+ testAvroStorage(queries);
+ Assert.fail("Negative test to test an exception. Should not be succeeding!");
+ } catch (FrontendException e) {
+ // The IOException thrown by AvroSchemaManager for recursive record is caught
+ // by the Pig frontend, and FrontendException is re-thrown.
+ assertTrue(e.getMessage().contains("could not instantiate 'org.apache.pig.piggybank.storage.avro.AvroStorage'"));
}
}
@Test
- public void testGenericUnionSchema() throws IOException {
+ public void testGenericUnion() throws IOException {
// Verify that a FrontendException is thrown if schema has generic union.
- String output= outbasedir + "testGenericUnionSchema";
+ String output= outbasedir + "testGenericUnion";
deleteDirectory(new File(output));
String [] queries = {
- " in = LOAD '" + testGenericUnionSchemaFile +
+ " in = LOAD '" + testGenericUnionFile +
"' USING org.apache.pig.piggybank.storage.avro.AvroStorage ();",
" STORE in INTO '" + output +
"' USING org.apache.pig.piggybank.storage.avro.AvroStorage ();"
};
try {
testAvroStorage(queries);
- Assert.fail();
+ Assert.fail("Negative test to test an exception. Should not be succeeding!");
} catch (FrontendException e) {
// The IOException thrown by AvroStorage for generic union is caught
// by the Pig frontend, and FrontendException is re-thrown.
@@ -243,7 +561,7 @@ public class TestAvroStorage {
};
try {
testAvroStorage(queries);
- Assert.fail();
+ Assert.fail("Negative test to test an exception. Should not be succeeding!");
} catch (JobCreationException e) {
// The IOException thrown by AvroStorage for input file not found is catched
// by the Pig backend, and JobCreationException (a subclass of IOException)
@@ -256,9 +574,9 @@ public class TestAvroStorage {
public void testArrayDefault() throws IOException {
String output= outbasedir + "testArrayDefault";
String expected = basedir + "expected_testArrayDefault.avro";
-
+
deleteDirectory(new File(output));
-
+
String [] queries = {
" in = LOAD '" + testArrayFile + " ' USING org.apache.pig.piggybank.storage.avro.AvroStorage ();",
" STORE in INTO '" + output + "' USING org.apache.pig.piggybank.storage.avro.AvroStorage ();"
@@ -282,7 +600,7 @@ public class TestAvroStorage {
testAvroStorage( queries);
verifyResults(output, expected);
}
-
+
@Test
public void testArrayWithNotNull() throws IOException {
String output= outbasedir + "testArrayWithNotNull";
@@ -297,7 +615,7 @@ public class TestAvroStorage {
testAvroStorage( queries);
verifyResults(output, expected);
}
-
+
@Test
public void testArrayWithSame() throws IOException {
String output= outbasedir + "testArrayWithSame";
@@ -317,9 +635,9 @@ public class TestAvroStorage {
public void testArrayWithSnappyCompression() throws IOException {
String output= outbasedir + "testArrayWithSnappyCompression";
String expected = basedir + "expected_testArrayDefault.avro";
-
+
deleteDirectory(new File(output));
-
+
Properties properties = new Properties();
properties.setProperty("mapred.output.compress", "true");
properties.setProperty("mapred.output.compression.codec", "org.apache.hadoop.io.compress.SnappyCodec");
@@ -464,7 +782,7 @@ public class TestAvroStorage {
testAvroStorage( queries);
verifyResults(output, expected);
}
-
+
@Test
public void testSingleFieldTuples() throws IOException {
String output= outbasedir + "testSingleFieldTuples";
@@ -478,7 +796,7 @@ public class TestAvroStorage {
};
testAvroStorage( queries);
}
-
+
@Test
public void testFileWithNoExtension() throws IOException {
String output= outbasedir + "testFileWithNoExtension";
@@ -527,25 +845,37 @@ public class TestAvroStorage {
if ( path.exists()) {
File [] files = path.listFiles();
for (File file: files) {
- if (file.isDirectory())
+ if (file.isDirectory())
deleteDirectory(file);
file.delete();
}
}
}
-
+
private void testAvroStorage(String ...queries) throws IOException {
+ testAvroStorage(false, queries);
+ }
+
+ private void testAvroStorage(boolean expectedToFail, String ...queries) throws IOException {
pigServerLocal.setBatchOn();
for (String query: queries){
- if (query != null && query.length() > 0)
+ if (query != null && query.length() > 0) {
pigServerLocal.registerQuery(query);
+ }
}
- List<ExecJob> jobs = pigServerLocal.executeBatch();
- for (ExecJob job : jobs) {
- assertEquals(JOB_STATUS.COMPLETED, job.getStatus());
+ int numOfFailedJobs = 0;
+ for (ExecJob job : pigServerLocal.executeBatch()) {
+ if (job.getStatus().equals(JOB_STATUS.FAILED)) {
+ numOfFailedJobs++;
+ }
+ }
+ if (expectedToFail) {
+ assertTrue("There was no failed job!", numOfFailedJobs > 0);
+ } else {
+ assertTrue("There was a failed job!", numOfFailedJobs == 0);
}
}
-
+
private void verifyResults(String outPath, String expectedOutpath) throws IOException {
verifyResults(outPath, expectedOutpath, null);
}
@@ -554,17 +884,17 @@ public class TestAvroStorage {
// Seems compress for Avro is broken in 23. Skip this test and open Jira PIG-
if (Util.isHadoop23())
return;
-
- FileSystem fs = FileSystem.getLocal(new Configuration()) ;
-
+
+ FileSystem fs = FileSystem.getLocal(new Configuration()) ;
+
/* read in expected results*/
Set<Object> expected = getExpected (expectedOutpath);
-
+
/* read in output results and compare */
Path output = new Path(outPath);
assertTrue("Output dir does not exists!", fs.exists(output)
&& fs.getFileStatus(output).isDir());
-
+
Path[] paths = FileUtil.stat2Paths(fs.listStatus(output, hiddenPathFilter));
assertTrue("Split field dirs not found!", paths != null);
@@ -574,34 +904,34 @@ public class TestAvroStorage {
files != null);
for (Path filePath : files) {
assertTrue("This shouldn't be a directory", fs.isFile(filePath));
-
+
GenericDatumReader<Object> reader = new GenericDatumReader<Object>();
-
+
DataFileStream<Object> in = new DataFileStream<Object>(
fs.open(filePath), reader);
assertEquals("codec", expectedCodec, in.getMetaString("avro.codec"));
int count = 0;
while (in.hasNext()) {
Object obj = in.next();
- //System.out.println("obj = " + (GenericData.Array<Float>)obj);
- assertTrue("Avro result object found that's not expected: " + obj, expected.contains(obj));
- count++;
- }
+ //System.out.println("obj = " + (GenericData.Array<Float>)obj);
+ assertTrue("Avro result object found that's not expected: " + obj, expected.contains(obj));
+ count++;
+ }
in.close();
assertEquals(expected.size(), count);
}
}
}
-
+
private Set<Object> getExpected (String pathstr ) throws IOException {
-
+
Set<Object> ret = new HashSet<Object>();
- FileSystem fs = FileSystem.getLocal(new Configuration()) ;
-
+ FileSystem fs = FileSystem.getLocal(new Configuration());
+
/* read in output results and compare */
Path output = new Path(pathstr);
assertTrue("Expected output does not exists!", fs.exists(output));
-
+
Path[] paths = FileUtil.stat2Paths(fs.listStatus(output, hiddenPathFilter));
assertTrue("Split field dirs not found!", paths != null);
@@ -610,15 +940,15 @@ public class TestAvroStorage {
assertTrue("No files found for path: " + path.toUri().getPath(), files != null);
for (Path filePath : files) {
assertTrue("This shouldn't be a directory", fs.isFile(filePath));
-
+
GenericDatumReader<Object> reader = new GenericDatumReader<Object>();
-
+
DataFileStream<Object> in = new DataFileStream<Object>(fs.open(filePath), reader);
-
+
while (in.hasNext()) {
Object obj = in.next();
ret.add(obj);
- }
+ }
in.close();
}
}
Modified: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/TestAvroStorageUtils.java
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/TestAvroStorageUtils.java?rev=1374925&r1=1374924&r2=1374925&view=diff
==============================================================================
--- pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/TestAvroStorageUtils.java (original)
+++ pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/TestAvroStorageUtils.java Mon Aug 20 07:14:54 2012
@@ -38,44 +38,6 @@ public class TestAvroStorageUtils {
public final String RECORD_BEGINNING = TYPE_RECORD + NAME_NODE + FIELDS_VALUE;
- @Test
- public void canIdentifyRecursiveRecords() throws IOException {
- final String str1 = RECORD_BEGINNING +
- "{ \"name\": \"next\", \"type\": [\"null\", \"Node\"] } ] }";
- Schema s = Schema.parse(str1);
- assertTrue(AvroStorageUtils.containsRecursiveRecord(s));
-
- final String str2 = "{\"type\": \"array\", \"items\": " + str1 + "}";
- s = Schema.parse(str2);
- assertTrue(AvroStorageUtils.containsRecursiveRecord(s));
-
- final String str3 ="[\"null\", " + str2 + "]";
- s = Schema.parse(str3);
- assertTrue(AvroStorageUtils.containsRecursiveRecord(s));
-
- }
-
- @Test
- public void canIdentifyNonRecursiveRecords() throws IOException {
- final String non = RECORD_BEGINNING + "{ \"name\": \"next\", \"type\": [\"null\", \"string\"] } ] }";
- assertFalse(AvroStorageUtils.containsRecursiveRecord(Schema.parse(non)));
-
- final String s1 =
- "{ \"type\":\"record\", \"name\":\"Event\", " +
- "\"fields\":[ " +
- "{\"name\":\"f1\", " +
- "\"type\":{ \"type\":\"record\",\"name\":\"Entity\", " +
- "\"fields\":[{\"name\":\"type\", \"type\": \"string\"}," +
- "{\"name\":\"value\",\"type\": \"int\"}] " +
- "} }, " +
- " {\"name\":\"f2\",\"type\": \"Entity\"}, " +
- " {\"name\":\"f3\",\"type\": \"Entity\"} " +
- "] }";
- Schema schema1 = Schema.parse(s1);
- assertFalse(AvroStorageUtils.containsRecursiveRecord(schema1));
-
- }
-
@Test
public void testGenericUnion() throws IOException {
Added: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/expected_testRecursiveRecordReference1.avro
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/expected_testRecursiveRecordReference1.avro?rev=1374925&view=auto
==============================================================================
Binary file - no diff available.
Propchange: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/expected_testRecursiveRecordReference1.avro
------------------------------------------------------------------------------
svn:mime-type = application/octet-stream
Added: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/expected_testRecursiveRecordReference2.avro
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/expected_testRecursiveRecordReference2.avro?rev=1374925&view=auto
==============================================================================
Binary file - no diff available.
Propchange: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/expected_testRecursiveRecordReference2.avro
------------------------------------------------------------------------------
svn:mime-type = application/octet-stream
Added: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/expected_testRecursiveRecordReference3.avro
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/expected_testRecursiveRecordReference3.avro?rev=1374925&view=auto
==============================================================================
Binary file - no diff available.
Propchange: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/expected_testRecursiveRecordReference3.avro
------------------------------------------------------------------------------
svn:mime-type = application/octet-stream
Added: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_generic_union.avro
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_generic_union.avro?rev=1374925&view=auto
==============================================================================
Binary file - no diff available.
Propchange: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_generic_union.avro
------------------------------------------------------------------------------
svn:mime-type = application/octet-stream
Added: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_recursive_record_in_array.avro
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_recursive_record_in_array.avro?rev=1374925&view=auto
==============================================================================
Binary file - no diff available.
Propchange: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_recursive_record_in_array.avro
------------------------------------------------------------------------------
svn:mime-type = application/octet-stream
Added: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_recursive_record_in_map.avro
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_recursive_record_in_map.avro?rev=1374925&view=auto
==============================================================================
Binary file - no diff available.
Propchange: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_recursive_record_in_map.avro
------------------------------------------------------------------------------
svn:mime-type = application/octet-stream
Added: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_recursive_record_in_record.avro
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_recursive_record_in_record.avro?rev=1374925&view=auto
==============================================================================
Binary file - no diff available.
Propchange: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_recursive_record_in_record.avro
------------------------------------------------------------------------------
svn:mime-type = application/octet-stream
Added: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_recursive_record_in_union.avro
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_recursive_record_in_union.avro?rev=1374925&view=auto
==============================================================================
Binary file - no diff available.
Propchange: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_recursive_record_in_union.avro
------------------------------------------------------------------------------
svn:mime-type = application/octet-stream
Added: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_recursive_record_in_union.avsc
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_recursive_record_in_union.avsc?rev=1374925&view=auto
==============================================================================
--- pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_recursive_record_in_union.avsc (added)
+++ pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_recursive_record_in_union.avsc Mon Aug 20 07:14:54 2012
@@ -0,0 +1,11 @@
+{
+ "type" : "record",
+ "name" : "recursive_record",
+ "fields" : [ {
+ "name" : "value",
+ "type" : "int"
+ }, {
+ "name" : "next",
+ "type" : [ "null", "recursive_record" ]
+ } ]
+}