You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ro...@apache.org on 2013/06/14 15:34:40 UTC
svn commit: r1493062 - in /pig/trunk: ./
contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/
contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/
contrib/piggybank/java/src/test/java/org/apache/pig/pi...
Author: rohini
Date: Fri Jun 14 13:34:39 2013
New Revision: 1493062
URL: http://svn.apache.org/r1493062
Log:
PIG-3318: AVRO: 'default value' not honored when merging schemas on load with AvroStorage (viraj via rohini)
Added:
pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_merge_schemas_default/
pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_merge_schemas_default/Employee3.avro (with props)
pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_merge_schemas_default/Employee4.avro (with props)
pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_merge_schemas_default/Employee6.avro (with props)
Modified:
pig/trunk/CHANGES.txt
pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorage.java
pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorageUtils.java
pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroInputFormat.java
pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroRecordReader.java
pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/TestAvroStorage.java
Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1493062&r1=1493061&r2=1493062&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Fri Jun 14 13:34:39 2013
@@ -196,6 +196,8 @@ PIG-3013: BinInterSedes improve chararra
BUG FIXES
+PIG-3318: AVRO: 'default value' not honored when merging schemas on load with AvroStorage (viraj via rohini)
+
PIG-3250: Pig dryrun generates wrong output in .expanded file for 'SPLIT....OTHERWISE...' command (dreambird via cheolsoo)
PIG-3331: Default values not stored in avro file when using specific schemas during store in AvroStorage (viraj via rohini)
Modified: pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorage.java
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorage.java?rev=1493062&r1=1493061&r2=1493062&view=diff
==============================================================================
--- pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorage.java (original)
+++ pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorage.java Fri Jun 14 13:34:39 2013
@@ -319,15 +319,8 @@ public class AvroStorage extends FileInp
AvroStorageLog.funcCall("getInputFormat");
InputFormat result = null;
if(inputAvroSchema != null) {
- if (useMultipleSchemas) {
- // When merging multiple avro schemas, we use embedded schemas
- // to load input files. So no input avro schema is passed.
- result = new PigAvroInputFormat(
- null, ignoreBadFiles, schemaToMergedSchemaMap);
- } else {
- result = new PigAvroInputFormat(
- inputAvroSchema, ignoreBadFiles, schemaToMergedSchemaMap);
- }
+ result = new PigAvroInputFormat(
+ inputAvroSchema, ignoreBadFiles, schemaToMergedSchemaMap, useMultipleSchemas);
} else {
result = new TextInputFormat();
}
Modified: pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorageUtils.java
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorageUtils.java?rev=1493062&r1=1493061&r2=1493062&view=diff
==============================================================================
--- pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorageUtils.java (original)
+++ pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorageUtils.java Fri Jun 14 13:34:39 2013
@@ -48,7 +48,7 @@ import org.apache.pig.ResourceSchema.Res
import org.apache.pig.data.DataType;
import org.apache.pig.piggybank.storage.avro.AvroStorageLog;
-
+import org.codehaus.jackson.JsonNode;
/**
* This is utility class for this package
*/
@@ -305,12 +305,15 @@ public class AvroStorageUtils {
List<Schema.Field> yFields = y.getFields();
// LinkedHashMap is used to keep fields in insertion order.
- // It's convenient for testing to have determinitic behaviors.
+ // It's convenient for testing to have deterministic behaviors.
Map<String, Schema> fieldName2Schema =
new LinkedHashMap<String, Schema>(xFields.size() + yFields.size());
+ Map<String, JsonNode> fieldName2Default =
+ new LinkedHashMap<String, JsonNode>(xFields.size() + yFields.size());
for (Schema.Field xField : xFields) {
fieldName2Schema.put(xField.name(), xField.schema());
+ fieldName2Default.put(xField.name(),xField.defaultValue());
}
for (Schema.Field yField : yFields) {
String name = yField.name();
@@ -318,14 +321,28 @@ public class AvroStorageUtils {
Schema prevSchema = fieldName2Schema.get(name);
if (prevSchema == null) {
fieldName2Schema.put(name, currSchema);
+ fieldName2Default.put(name, yField.defaultValue());
} else {
fieldName2Schema.put(name, mergeSchema(prevSchema, currSchema));
+ //during merging of schemas for records it to okay to have one field with a default
+ // and another null so the one with the default will be considered
+ JsonNode xDefaultValue = fieldName2Default.get(name);
+ JsonNode yDefaultValue = yField.defaultValue();
+ if (xDefaultValue != null) {
+ // need to check if the default values in the schemas are the same
+ if (yDefaultValue != null && !xDefaultValue.equals(yDefaultValue)) {
+ throw new IOException(
+ "Cannot merge schema's which have different default values - " + xDefaultValue +
+ " and " + yDefaultValue);
+ }
+ } else {
+ fieldName2Default.put(name, yDefaultValue);
+ }
}
}
-
List<Schema.Field> mergedFields = new ArrayList<Schema.Field>(fieldName2Schema.size());
for (Entry<String, Schema> entry : fieldName2Schema.entrySet()) {
- mergedFields.add(new Schema.Field(entry.getKey(), entry.getValue(), "auto-gen", null));
+ mergedFields.add(new Schema.Field(entry.getKey(), entry.getValue(), "auto-gen", fieldName2Default.get(entry.getKey())));
}
Schema result = Schema.createRecord(
"merged", null, "merged schema (generated by AvroStorage)", false);
Modified: pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroInputFormat.java
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroInputFormat.java?rev=1493062&r1=1493061&r2=1493062&view=diff
==============================================================================
--- pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroInputFormat.java (original)
+++ pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroInputFormat.java Fri Jun 14 13:34:39 2013
@@ -40,6 +40,8 @@ import org.apache.hadoop.mapreduce.lib.i
public class PigAvroInputFormat extends FileInputFormat<NullWritable, Writable> {
private Schema readerSchema = null; /* avro schema */
+ /* establish is multiple_schema flag is used to pass this to the RecordReader*/
+ private boolean useMultipleSchemas = false;
private boolean ignoreBadFiles = false; /* whether ignore corrupted files during load */
/* if multiple avro record schemas are merged, this map associates each input
@@ -62,10 +64,12 @@ public class PigAvroInputFormat extends
* with a remapping of its fields relative to the merged schema
*/
public PigAvroInputFormat(Schema readerSchema, boolean ignoreBadFiles,
- Map<Path, Map<Integer, Integer>> schemaToMergedSchemaMap) {
+ Map<Path, Map<Integer, Integer>> schemaToMergedSchemaMap,
+ boolean useMultipleSchemas) {
this.readerSchema = readerSchema;
this.ignoreBadFiles = ignoreBadFiles;
this.schemaToMergedSchemaMap = schemaToMergedSchemaMap;
+ this.useMultipleSchemas = useMultipleSchemas;
}
/**
@@ -79,7 +83,7 @@ public class PigAvroInputFormat extends
throws IOException, InterruptedException {
context.setStatus(split.toString());
return new PigAvroRecordReader(context, (FileSplit) split, readerSchema,
- ignoreBadFiles, schemaToMergedSchemaMap);
+ ignoreBadFiles, schemaToMergedSchemaMap, useMultipleSchemas);
}
}
Modified: pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroRecordReader.java
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroRecordReader.java?rev=1493062&r1=1493061&r2=1493062&view=diff
==============================================================================
--- pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroRecordReader.java (original)
+++ pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroRecordReader.java Fri Jun 14 13:34:39 2013
@@ -35,8 +35,11 @@ import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import java.util.ArrayList;
+import java.util.List;
import java.util.Map;
+import org.codehaus.jackson.JsonNode;
+
/**
* This is an implementation of record reader which reads in avro data and
* convert them into <NullWritable, Writable> pairs.
@@ -59,6 +62,9 @@ public class PigAvroRecordReader extends
*/
private ArrayList<Object> mProtoTuple;
+ /* establish is multiple_schema flag is used to pass this to the RecordReader*/
+ private boolean useMultipleSchemas = false;
+
/* if multiple avro record schemas are merged, this map associates each input
* record with a remapping of its fields relative to the merged schema. please
* see AvroStorageUtils.getSchemaToMergedSchemaMap() for more details.
@@ -70,9 +76,11 @@ public class PigAvroRecordReader extends
*/
public PigAvroRecordReader(TaskAttemptContext context, FileSplit split,
Schema readerSchema, boolean ignoreBadFiles,
- Map<Path, Map<Integer, Integer>> schemaToMergedSchemaMap) throws IOException {
+ Map<Path, Map<Integer, Integer>> schemaToMergedSchemaMap,
+ boolean useMultipleSchemas) throws IOException {
this.path = split.getPath();
this.in = new AvroStorageInputStream(path, context);
+ this.useMultipleSchemas = useMultipleSchemas;
if(readerSchema == null) {
AvroStorageLog.details("No avro schema given; assuming the schema is embedded");
}
@@ -87,7 +95,12 @@ public class PigAvroRecordReader extends
}
try {
- this.reader = new DataFileReader<Object>(in, new PigAvroDatumReader(writerSchema, readerSchema));
+ if (useMultipleSchemas) {
+ this.reader = new DataFileReader<Object>(in, new PigAvroDatumReader(writerSchema, null));
+ }
+ else {
+ this.reader = new DataFileReader<Object>(in, new PigAvroDatumReader(writerSchema, readerSchema));
+ }
} catch (IOException e) {
throw new IOException("Error initializing data file reader for file (" +
split.getPath() + ")", e);
@@ -98,7 +111,7 @@ public class PigAvroRecordReader extends
this.ignoreBadFiles = ignoreBadFiles;
this.schemaToMergedSchemaMap = schemaToMergedSchemaMap;
if (schemaToMergedSchemaMap != null) {
- // initialize mProtoTuple
+ // initialize mProtoTuple with the right default values
int maxPos = 0;
for (Map<Integer, Integer> map : schemaToMergedSchemaMap.values()) {
for (Integer i : map.values()) {
@@ -109,7 +122,44 @@ public class PigAvroRecordReader extends
AvroStorageLog.details("Creating proto tuple of fixed size: " + tupleSize);
mProtoTuple = new ArrayList<Object>(tupleSize);
for (int i = 0; i < tupleSize; i++) {
- mProtoTuple.add(i, null);
+ // Get the list of fields from the passed schema
+ List<Schema.Field> subFields = readerSchema.getFields();
+ JsonNode defValue = subFields.get(i).defaultValue();
+ if (defValue != null) {
+ Schema.Type type = subFields.get(i).schema().getType();
+ switch (type) {
+ case BOOLEAN:
+ mProtoTuple.add(i, defValue.getBooleanValue());
+ break;
+ case ENUM:
+ mProtoTuple.add(i, defValue.getTextValue());
+ break;
+ case FIXED:
+ mProtoTuple.add(i, defValue.getTextValue());
+ break;
+ case INT:
+ mProtoTuple.add(i, defValue.getIntValue());
+ break;
+ case LONG:
+ mProtoTuple.add(i, defValue.getIntValue());
+ break;
+ case FLOAT:
+ mProtoTuple.add(i, defValue.getNumberValue().floatValue());
+ break;
+ case DOUBLE:
+ mProtoTuple.add(i, defValue.getNumberValue().doubleValue());
+ break;
+ case STRING:
+ mProtoTuple.add(i, defValue.getTextValue());
+ break;
+ default:
+ mProtoTuple.add(i, null);
+ break;
+ }
+ }
+ else {
+ mProtoTuple.add(i, null);
+ }
}
}
}
Modified: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/TestAvroStorage.java
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/TestAvroStorage.java?rev=1493062&r1=1493061&r2=1493062&view=diff
==============================================================================
--- pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/TestAvroStorage.java (original)
+++ pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/TestAvroStorage.java Fri Jun 14 13:34:39 2013
@@ -186,6 +186,7 @@ public class TestAvroStorage {
final private String testCorruptedFile = getInputFile("test_corrupted_file.avro");
final private String testMultipleSchemas1File = getInputFile("test_primitive_types/*");
final private String testMultipleSchemas2File = getInputFile("test_complex_types/*");
+ final private String testMultipleSchemasWithDefaultValue = getInputFile("test_merge_schemas_default/{Employee{3,4,6}.avro}");
final private String testUserDefinedLoadSchemaFile = getInputFile("test_user_defined_load_schema/*");
final private String testLoadwithNullValues = getInputFile("test_loadavrowithnulls.avro");
@@ -627,6 +628,97 @@ public class TestAvroStorage {
}
@Test
+ public void testMultipleSchemasWithDefaultValue() throws IOException {
+ // ==> Employee3.avro <==
+ // {
+ // "type" : "record",
+ // "name" : "employee",
+ // "fields":[
+ // {"name" : "name", "type" : "string", "default" : "NU"},
+ // {"name" : "age", "type" : "int", "default" : 0 },
+ // {"name" : "dept", "type": "string", "default" : "DU"} ] }
+ //
+ // ==> Employee4.avro <==
+ // {
+ // "type" : "record",
+ // "name" : "employee",
+ // "fields":[
+ // {"name" : "name", "type" : "string", "default" : "NU"},
+ // {"name" : "age", "type" : "int", "default" : 0},
+ // {"name" : "dept", "type": "string", "default" : "DU"},
+ // {"name" : "office", "type": "string", "default" : "OU"} ] }
+ //
+ // ==> Employee6.avro <==
+ // {
+ // "type" : "record",
+ // "name" : "employee",
+ // "fields":[
+ // {"name" : "name", "type" : "string", "default" : "NU"},
+ // {"name" : "lastname", "type": "string", "default" : "LNU"},
+ // {"name" : "age", "type" : "int","default" : 0},
+ // {"name" : "salary", "type": "int", "default" : 0},
+ // {"name" : "dept", "type": "string","default" : "DU"},
+ // {"name" : "office", "type": "string","default" : "OU"} ] }
+ // The relation 'in' looks like this: (order of rows can be different.)
+ // Avro file stored after processing looks like this:
+ // The relation 'in' looks like this: (order of rows can be different.)
+ // Employee3.avro
+ // (Milo,30,DH)
+ // (Asmya,34,PQ)
+ // (Baljit,23,RS)
+ //
+ // Employee4.avro
+ // (Praj,54,RMX,Champaign)
+ // (Buba,767,HD,Sunnyvale)
+ // (Manku,375,MS,New York)
+ //
+ // Employee6.avro
+ // (Pune,Warriors,60,5466,Astrophysics,UTA)
+ // (Rajsathan,Royals,20,1378,Biochemistry,Stanford)
+ // (Chennai,Superkings,50,7338,Microbiology,Hopkins)
+ // (Mumbai,Indians,20,4468,Applied Math,UAH)
+
+ // Data file stored after without looks like this with the
+ // following schema and data
+ // {name: chararray,age: int,dept: chararray,office: chararray,
+ // lastname: chararray,salary: int}
+ //(Asmya,34,PQ,OU,LNU,0)
+ //(Baljit,23,RS,OU,LNU,0)
+ //(Buba,767,HD,Sunnyvale,LNU,0)
+ //(Chennai,50,Microbiology,Hopkins,Superkings,7338)
+ //(Manku,375,MS,New York,LNU,0)
+ //(Milo,30,DH,OU,LNU,0)
+ //(Mumbai,20,Applied Math,UAH,Indians,4468)
+ //(Praj,54,RMX,Champaign,LNU,0)
+ //(Pune,60,Astrophysics,UTA,Warriors,5466)
+ //(Rajsathan,20,Biochemistry,Stanford,Royals,1378)
+
+ Data data = resetData(pigServerLocal);
+ String output= outbasedir + "testMultipleSchemasWithDefaultValue";
+ deleteDirectory(new File(output));
+ String expected = basedir + "expected_testMultipleSchemasWithDefaultValue.avro";
+ String [] queries = {
+ " a = LOAD '" + testMultipleSchemasWithDefaultValue +
+ "' USING org.apache.pig.piggybank.storage.avro.AvroStorage ('multiple_schemas');",
+ " b = foreach a generate name,age,dept,office,lastname,salary;",
+ " c = filter b by age < 40 ;",
+ " d = order c by name;",
+ " STORE d INTO '" + output+ "' using mock.Storage();"
+ };
+ testAvroStorage(queries);
+ List<Tuple> out = data.get(output);
+ assertEquals(out + " size", 5, out.size());
+ assertEquals(
+ schema("name: chararray,age: int,dept: chararray,office: chararray,lastname: chararray,salary: int"),
+ data.getSchema(output));
+ assertEquals(tuple("Asmya", 34, "PQ", "OU", "LNU", 0), out.get(0));
+ assertEquals(tuple("Baljit", 23, "RS", "OU", "LNU", 0), out.get(1));
+ assertEquals(tuple("Milo", 30, "DH", "OU", "LNU", 0), out.get(2));
+ assertEquals(tuple("Mumbai", 20, "Applied Math", "UAH", "Indians", 4468), out.get(3));
+ assertEquals(tuple("Rajsathan", 20, "Biochemistry", "Stanford", "Royals", 1378), out.get(4));
+ }
+
+ @Test
// Verify the default values specified in the schema in AvroStorage
// are actually written to the schema in the output avro file
public void testDefaultValueSchemaWrite() throws IOException {
@@ -1185,9 +1277,6 @@ public class TestAvroStorage {
}
private void verifyResults(String outPath, String expectedOutpath, String expectedCodec) throws IOException {
- // Seems compress for Avro is broken in 23. Skip this test and open Jira PIG-
- if (Util.isHadoop23())
- return;
FileSystem fs = FileSystem.getLocal(new Configuration()) ;
Added: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_merge_schemas_default/Employee3.avro
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_merge_schemas_default/Employee3.avro?rev=1493062&view=auto
==============================================================================
Binary file - no diff available.
Propchange: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_merge_schemas_default/Employee3.avro
------------------------------------------------------------------------------
svn:mime-type = application/octet-stream
Added: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_merge_schemas_default/Employee4.avro
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_merge_schemas_default/Employee4.avro?rev=1493062&view=auto
==============================================================================
Binary file - no diff available.
Propchange: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_merge_schemas_default/Employee4.avro
------------------------------------------------------------------------------
svn:mime-type = application/octet-stream
Added: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_merge_schemas_default/Employee6.avro
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_merge_schemas_default/Employee6.avro?rev=1493062&view=auto
==============================================================================
Binary file - no diff available.
Propchange: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_merge_schemas_default/Employee6.avro
------------------------------------------------------------------------------
svn:mime-type = application/octet-stream