You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ro...@apache.org on 2013/05/16 01:46:14 UTC
svn commit: r1483127 - in /pig/trunk: ./
contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/
contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/
contrib/piggybank/java/src/test/java/org/apache/pig/pi...
Author: rohini
Date: Wed May 15 23:46:14 2013
New Revision: 1483127
URL: http://svn.apache.org/r1483127
Log:
PIG-3321: AVRO: Support user specified schema on load (harveyc via rohini)
Added:
pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/expected_testUserDefinedLoadSchema.avro (with props)
pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_user_defined_load_schema/
pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_user_defined_load_schema/testUserDefinedLoadSchema_input1.avro (with props)
pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_user_defined_load_schema/testUserDefinedLoadSchema_input2.avro (with props)
Modified:
pig/trunk/CHANGES.txt
pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorage.java
pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorageUtils.java
pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroDatumReader.java
pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroInputFormat.java
pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroRecordReader.java
pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/TestAvroStorage.java
Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1483127&r1=1483126&r2=1483127&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Wed May 15 23:46:14 2013
@@ -28,6 +28,8 @@ PIG-3174: Remove rpm and deb artifacts
IMPROVEMENTS
+PIG-3321: AVRO: Support user specified schema on load (harveyc via rohini)
+
PIG-2959: Add a pig.cmd for Pig to run under Windows (daijy)
PIG-3311: add pig-withouthadoop-h2 to mvn-jar (julien)
Modified: pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorage.java
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorage.java?rev=1483127&r1=1483126&r2=1483127&view=diff
==============================================================================
--- pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorage.java (original)
+++ pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorage.java Wed May 15 23:46:14 2013
@@ -89,6 +89,7 @@ public class AvroStorage extends FileInp
/* loadFunc parameters */
private PigAvroRecordReader reader = null; /* avro record writer */
private Schema inputAvroSchema = null; /* input avro schema */
+ private Schema userSpecifiedAvroSchema = null; /* avro schema specified in constructor args */
/* if multiple avro record schemas are merged, this map associates each input
* record with a remapping of its fields relative to the merged schema. please
@@ -171,8 +172,13 @@ public class AvroStorage extends FileInp
* @throws IOException
*/
protected void setInputAvroSchema(Set<Path> paths, Configuration conf) throws IOException {
- inputAvroSchema = useMultipleSchemas ? getMergedSchema(paths, conf)
- : getAvroSchema(paths, conf);
+ if(userSpecifiedAvroSchema != null) {
+ inputAvroSchema = userSpecifiedAvroSchema;
+ }
+ else {
+ inputAvroSchema = useMultipleSchemas ? getMergedSchema(paths, conf)
+ : getAvroSchema(paths, conf);
+ }
}
/**
@@ -280,17 +286,7 @@ public class AvroStorage extends FileInp
* @throws IOException
*/
protected Schema getSchema(Path path, FileSystem fs) throws IOException {
- /* get path of the last file */
- Path lastFile = AvroStorageUtils.getLast(path, fs);
-
- /* read in file and obtain schema */
- GenericDatumReader<Object> avroReader = new GenericDatumReader<Object>();
- InputStream hdfsInputStream = fs.open(lastFile);
- DataFileStream<Object> avroDataStream = new DataFileStream<Object>(hdfsInputStream, avroReader);
- Schema ret = avroDataStream.getSchema();
- avroDataStream.close();
-
- return ret;
+ return AvroStorageUtils.getSchema(path, fs);
}
/**
@@ -538,16 +534,19 @@ public class AvroStorage extends FileInp
AvroStorageLog.details("data path=" + path.toUri().toString());
FileSystem fs = FileSystem.get(path.toUri(), new Configuration());
outputAvroSchema = getAvroSchema(path, fs);
+ userSpecifiedAvroSchema = outputAvroSchema;
} else if (name.equalsIgnoreCase("nullable")) {
nullable = (Boolean) value;
} else if (name.equalsIgnoreCase("schema")) {
outputAvroSchema = Schema.parse((String) value);
+ userSpecifiedAvroSchema = outputAvroSchema;
} else if (name.equalsIgnoreCase("schema_uri")) {
/* use the contents of the specified path as output schema */
Path path = new Path( ((String) value).trim());
AvroStorageLog.details("schema_uri path=" + path.toUri().toString());
FileSystem fs = FileSystem.get(path.toUri(), new Configuration());
outputAvroSchema = getSchemaFromFile(path, fs);
+ userSpecifiedAvroSchema = outputAvroSchema;
} else if (name.matches("field\\d+")) {
/*set schema of dth field */
if (fields == null)
Modified: pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorageUtils.java
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorageUtils.java?rev=1483127&r1=1483126&r2=1483127&view=diff
==============================================================================
--- pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorageUtils.java (original)
+++ pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorageUtils.java Wed May 15 23:46:14 2013
@@ -19,6 +19,7 @@ package org.apache.pig.piggybank.storage
import java.io.FileNotFoundException;
import java.io.IOException;
+import java.io.InputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -32,6 +33,8 @@ import java.util.Set;
import java.net.URI;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Field;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.file.DataFileStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@@ -672,4 +675,28 @@ public class AvroStorageUtils {
}
}
+ /**
+ * This method is called by {@link #getAvroSchema}. The default implementation
+ * returns the schema of an avro file; or the schema of the last file in a first-level
+ * directory (it does not contain sub-directories).
+ *
+ * @param path path of a file or first level directory
+ * @param fs file system
+ * @return avro schema
+ * @throws IOException
+ */
+ public static Schema getSchema(Path path, FileSystem fs) throws IOException {
+ /* get path of the last file */
+ Path lastFile = AvroStorageUtils.getLast(path, fs);
+
+ /* read in file and obtain schema */
+ GenericDatumReader<Object> avroReader = new GenericDatumReader<Object>();
+ InputStream hdfsInputStream = fs.open(lastFile);
+ DataFileStream<Object> avroDataStream = new DataFileStream<Object>(hdfsInputStream, avroReader);
+ Schema ret = avroDataStream.getSchema();
+ avroDataStream.close();
+
+ return ret;
+ }
+
}
Modified: pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroDatumReader.java
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroDatumReader.java?rev=1483127&r1=1483126&r2=1483127&view=diff
==============================================================================
--- pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroDatumReader.java (original)
+++ pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroDatumReader.java Wed May 15 23:46:14 2013
@@ -60,11 +60,17 @@ public class PigAvroDatumReader extends
@Override
protected Object readRecord(Object old, Schema expected, ResolvingDecoder in) throws IOException {
+ // find out the order in which we will receive fields from the ResolvingDecoder
+ Field[] readOrderedFields = in.readFieldOrder();
+
/* create an empty tuple */
- Tuple tuple = (Tuple) newRecord(old, expected);
+ Tuple tuple = TupleFactory.getInstance().newTuple(readOrderedFields.length);
- for (Field f : in.readFieldOrder()) {
- tuple.append(read(null, f.schema(), in));
+ /* read fields and put in output order in tuple
+ * The ResolvingDecoder figures out the writer schema to reader schema mapping for us
+ */
+ for (Field f : readOrderedFields) {
+ tuple.set(f.pos(), read(old, f.schema(), in));
}
return tuple;
Modified: pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroInputFormat.java
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroInputFormat.java?rev=1483127&r1=1483126&r2=1483127&view=diff
==============================================================================
--- pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroInputFormat.java (original)
+++ pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroInputFormat.java Wed May 15 23:46:14 2013
@@ -39,7 +39,7 @@ import org.apache.hadoop.mapreduce.lib.i
*/
public class PigAvroInputFormat extends FileInputFormat<NullWritable, Writable> {
- private Schema schema = null; /* avro schema */
+ private Schema readerSchema = null; /* avro schema */
private boolean ignoreBadFiles = false; /* whether ignore corrupted files during load */
/* if multiple avro record schemas are merged, this map associates each input
@@ -56,14 +56,14 @@ public class PigAvroInputFormat extends
/**
* constructor called by AvroStorage to pass in schema and ignoreBadFiles.
- * @param schema input data schema
+ * @param readerSchema reader schema
* @param ignoreBadFiles whether ignore corrupted files during load
* @param schemaToMergedSchemaMap map that associates each input record
* with a remapping of its fields relative to the merged schema
*/
- public PigAvroInputFormat(Schema schema, boolean ignoreBadFiles,
+ public PigAvroInputFormat(Schema readerSchema, boolean ignoreBadFiles,
Map<Path, Map<Integer, Integer>> schemaToMergedSchemaMap) {
- this.schema = schema;
+ this.readerSchema = readerSchema;
this.ignoreBadFiles = ignoreBadFiles;
this.schemaToMergedSchemaMap = schemaToMergedSchemaMap;
}
@@ -78,7 +78,7 @@ public class PigAvroInputFormat extends
createRecordReader(InputSplit split, TaskAttemptContext context)
throws IOException, InterruptedException {
context.setStatus(split.toString());
- return new PigAvroRecordReader(context, (FileSplit) split, schema,
+ return new PigAvroRecordReader(context, (FileSplit) split, readerSchema,
ignoreBadFiles, schemaToMergedSchemaMap);
}
Modified: pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroRecordReader.java
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroRecordReader.java?rev=1483127&r1=1483126&r2=1483127&view=diff
==============================================================================
--- pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroRecordReader.java (original)
+++ pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroRecordReader.java Wed May 15 23:46:14 2013
@@ -24,6 +24,7 @@ import org.apache.avro.file.DataFileRead
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.InputSplit;
@@ -68,16 +69,25 @@ public class PigAvroRecordReader extends
* constructor to initialize input and avro data reader
*/
public PigAvroRecordReader(TaskAttemptContext context, FileSplit split,
- Schema schema, boolean ignoreBadFiles,
+ Schema readerSchema, boolean ignoreBadFiles,
Map<Path, Map<Integer, Integer>> schemaToMergedSchemaMap) throws IOException {
this.path = split.getPath();
this.in = new AvroStorageInputStream(path, context);
- if(schema == null) {
+ if(readerSchema == null) {
AvroStorageLog.details("No avro schema given; assuming the schema is embedded");
}
+ Schema writerSchema;
try {
- this.reader = new DataFileReader<Object>(in, new PigAvroDatumReader(schema));
+ FileSystem fs = FileSystem.get(path.toUri(), context.getConfiguration());
+ writerSchema = AvroStorageUtils.getSchema(path, fs);
+ } catch (IOException e) {
+ AvroStorageLog.details("No avro writer schema found in '"+path+"'; assuming writer schema matches reader schema");
+ writerSchema = null;
+ }
+
+ try {
+ this.reader = new DataFileReader<Object>(in, new PigAvroDatumReader(writerSchema, readerSchema));
} catch (IOException e) {
throw new IOException("Error initializing data file reader for file (" +
split.getPath() + ")", e);
Modified: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/TestAvroStorage.java
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/TestAvroStorage.java?rev=1483127&r1=1483126&r2=1483127&view=diff
==============================================================================
--- pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/TestAvroStorage.java (original)
+++ pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/TestAvroStorage.java Wed May 15 23:46:14 2013
@@ -180,6 +180,7 @@ public class TestAvroStorage {
final private String testCorruptedFile = getInputFile("test_corrupted_file.avro");
final private String testMultipleSchemas1File = getInputFile("test_primitive_types/*");
final private String testMultipleSchemas2File = getInputFile("test_complex_types/*");
+ final private String testUserDefinedLoadSchemaFile = getInputFile("test_user_defined_load_schema/*");
@BeforeClass
public static void setup() throws ExecException, IOException {
@@ -584,6 +585,41 @@ public class TestAvroStorage {
}
@Test
+ public void testUserDefinedLoadSchema() throws IOException {
+ // Verify that user specified schema correctly maps to input schemas
+ // Input Avro files have the following schemas:
+ // name:"string", address:[customField1:"int", addressLine:"string"]
+ // address:[addressLine:"string", customField2:"int"], name:"string"
+ // User Avro schema looks like this:
+ // name:"string", address:[customField1:"int", customField2:"int", customField3:"int"]
+ // This test will confirm that AvroStorage correctly maps fields from writer to reader schema,
+ // dropping, adding, and reordering fields where needed.
+ String output= outbasedir + "testUserDefinedLoadSchema";
+ String expected = basedir + "expected_testUserDefinedLoadSchema.avro";
+ String customSchema =
+ "{\"type\": \"record\", \"name\": \"employee\", \"fields\": [ "
+ +"{ \"default\": \"***\", \"type\": \"string\", \"name\": \"name\" }, "
+ +"{ \"name\": \"address\", \"type\": { "
+ +"\"type\": \"record\", \"name\": \"addressDetails\", \"fields\": [ "
+ +"{ \"default\": 0, \"type\": \"int\", \"name\": \"customField1\" }, "
+ +"{ \"default\": 0, \"type\": \"int\", \"name\": \"customField2\" }, "
+ +"{ \"default\": 0, \"type\": \"int\", \"name\": \"customField3\" } "
+ +"] "
+ +"} } "
+ +"] } ";
+
+ deleteDirectory(new File(output));
+ String [] queries = {
+ " in = LOAD '" + testUserDefinedLoadSchemaFile
+ + "' USING org.apache.pig.piggybank.storage.avro.AvroStorage ('schema', '" + customSchema + "');",
+ " o = ORDER in BY name;",
+ " STORE o INTO '" + output + "' USING org.apache.pig.piggybank.storage.avro.AvroStorage ();"
+ };
+ testAvroStorage(queries);
+ verifyResults(output, expected);
+ }
+
+ @Test
public void testDir() throws IOException {
// Verify that all files in a directory including its sub-directories are loaded.
String output= outbasedir + "testDir";
Added: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/expected_testUserDefinedLoadSchema.avro
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/expected_testUserDefinedLoadSchema.avro?rev=1483127&view=auto
==============================================================================
Binary file - no diff available.
Propchange: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/expected_testUserDefinedLoadSchema.avro
------------------------------------------------------------------------------
svn:mime-type = application/octet-stream
Added: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_user_defined_load_schema/testUserDefinedLoadSchema_input1.avro
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_user_defined_load_schema/testUserDefinedLoadSchema_input1.avro?rev=1483127&view=auto
==============================================================================
Binary file - no diff available.
Propchange: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_user_defined_load_schema/testUserDefinedLoadSchema_input1.avro
------------------------------------------------------------------------------
svn:mime-type = application/octet-stream
Added: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_user_defined_load_schema/testUserDefinedLoadSchema_input2.avro
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_user_defined_load_schema/testUserDefinedLoadSchema_input2.avro?rev=1483127&view=auto
==============================================================================
Binary file - no diff available.
Propchange: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_user_defined_load_schema/testUserDefinedLoadSchema_input2.avro
------------------------------------------------------------------------------
svn:mime-type = application/octet-stream