You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2011/01/31 22:18:25 UTC
svn commit: r1065790 [2/2] - in /pig/trunk: ./
contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/
contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/
contrib/piggybank/java/src/test/java/org/apache/pig/p...
Added: pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroRecordReader.java
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroRecordReader.java?rev=1065790&view=auto
==============================================================================
--- pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroRecordReader.java (added)
+++ pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroRecordReader.java Mon Jan 31 21:18:24 2011
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.pig.piggybank.storage.avro;
+
+import java.io.IOException;
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileReader;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+
+/**
+ * This is an implementation of record reader which reads in avro data and
+ * convert them into <NullWritable, Writable> pairs.
+ *
+ */
+public class PigAvroRecordReader extends RecordReader<NullWritable, Writable> {
+
+ private ASFsInput in;
+ private DataFileReader<Object> reader; /*reader of input avro data*/
+ private long start;
+ private long end;
+ private Schema schema = null; /* schema of input avro data*/
+
+ /**
+ * constructor to initialize input and avro data reader
+ */
+ public PigAvroRecordReader(TaskAttemptContext context, FileSplit split,
+ Schema s) throws IOException {
+ this.in = new ASFsInput(split.getPath(), context);
+ if (s != null) {
+ schema = s;
+ } else {
+ throw new IOException("Need to provide input avro schema");
+ }
+
+ this.reader = new DataFileReader<Object>(in, new PigAvroDatumReader(
+ schema));
+ reader.sync(split.getStart()); // sync to start
+ this.start = in.tell();
+ this.end = split.getStart() + split.getLength();
+ }
+
+ @Override
+ public float getProgress() throws IOException {
+ if (end == start) {
+ return 0.0f;
+ } else {
+ return Math.min(1.0f, (getPos() - start) / (float) (end - start));
+ }
+ }
+
+ public long getPos() throws IOException {
+ return in.tell();
+ }
+
+ @Override
+ public void close() throws IOException {
+ reader.close();
+ }
+
+ @Override
+ public NullWritable getCurrentKey() throws IOException,
+ InterruptedException {
+ return NullWritable.get();
+ }
+
+ @Override
+ public Writable getCurrentValue() throws IOException, InterruptedException {
+ Object obj = reader.next();
+ if (obj instanceof Tuple) {
+ ASLog.details("Class =" + obj.getClass());
+ return (Tuple) obj;
+ } else {
+ ASLog.details("Wrap calss " + obj.getClass() + " as a tuple.");
+ return wrapAsTuple(obj);
+ }
+ }
+
+ /**
+ * Wrap non-tuple value as a tuple
+ */
+ protected Tuple wrapAsTuple(Object in) throws IOException {
+ Tuple tuple = TupleFactory.getInstance().newTuple();
+ tuple.append(in);
+ return tuple;
+ }
+
+ @Override
+ public void initialize(InputSplit arg0, TaskAttemptContext arg1)
+ throws IOException, InterruptedException {
+
+ }
+
+ @Override
+ public boolean nextKeyValue() throws IOException, InterruptedException {
+ if (!reader.hasNext() || reader.pastSync(end))
+ return false;
+
+ return true;
+ }
+
+}
Added: pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroRecordWriter.java
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroRecordWriter.java?rev=1065790&view=auto
==============================================================================
--- pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroRecordWriter.java (added)
+++ pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroRecordWriter.java Mon Jan 31 21:18:24 2011
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.pig.piggybank.storage.avro;
+
+import java.io.IOException;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+/**
+ * The RecordWriter used to output pig results as avro data
+ */
+public class PigAvroRecordWriter extends RecordWriter<NullWritable, Object> {
+
+ private DataFileWriter<Object> writer;
+
+ /**
+ * construct with avro writer
+ * @param writer avro data writer
+ */
+ public PigAvroRecordWriter(DataFileWriter<Object> writer) {
+ this.writer = writer;
+
+ }
+
+ @Override
+ public void close(TaskAttemptContext context) throws IOException,
+ InterruptedException {
+ writer.close();
+ }
+
+ @Override
+ public void write(NullWritable key, Object value) throws IOException,
+ InterruptedException {
+ writer.append(value);
+ }
+}
Added: pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigSchema2Avro.java
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigSchema2Avro.java?rev=1065790&view=auto
==============================================================================
--- pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigSchema2Avro.java (added)
+++ pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigSchema2Avro.java Mon Jan 31 21:18:24 2011
@@ -0,0 +1,435 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.pig.piggybank.storage.avro;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.avro.Schema;
+import org.apache.avro.Schema.Field;
+import org.apache.pig.ResourceSchema;
+import org.apache.pig.ResourceSchema.ResourceFieldSchema;
+import org.apache.pig.data.DataType;
+
+/**
+ * This class contains functions to convert Pig schema to Avro. It consists of
+ * two sets of methods:
+ *
+ * 1. Convert a Pig schema to Avro schema;
+ * 2. Validate whether a Pig schema is compatible with a given Avro schema.
+ * Notice that the Avro schema doesn't need to cover all fields in Pig schema,
+ * and the missing fields are converted using methods in set 1.
+ *
+ */
+public class PigSchema2Avro {
+
+ public static final String TUPLE_NAME = "TUPLE";
+ public static final String FIELD_NAME = "FIELD";
+ public static int tupleIndex = 0;
+
+ // //////////////////////////////////////////////////////////
+ // Methods in Set 1: Convert Pig schema to Avro schema
+ // //////////////////////////////////////////////////////////
+
+ /**
+ * Convert a pig ResourceSchema to avro schema
+ *
+ */
+ public static Schema convert(ResourceSchema pigSchema, boolean nullable)
+ throws IOException {
+ ResourceFieldSchema[] pigFields = pigSchema.getFields();
+
+ /* remove the pig tuple wrapper */
+ if (pigFields.length == 1 && ASCommons.isTupleWrapper(pigFields[0])) {
+
+ ASLog.details("Ignore the pig tuple wrapper.");
+ ResourceFieldSchema[] listSchemas = pigFields[0].getSchema()
+ .getFields();
+ if (listSchemas.length != 1)
+ throw new IOException("Expect one subfield from " + pigFields);
+
+ return convert(listSchemas[0], nullable);
+ } else
+ return convertRecord(pigFields, nullable);
+ }
+
+ /**
+ * Convert a Pig ResourceFieldSchema to avro schema
+ *
+ */
+ protected static Schema convert(ResourceFieldSchema pigSchema,
+ boolean nullable) throws IOException {
+
+ ASLog.details("Convert pig field schema:" + pigSchema);
+
+ final byte pigType = pigSchema.getType();
+
+ if (pigType == DataType.TUPLE) {
+ ASLog.details("Convert a pig field tuple: " + pigSchema);
+
+ ResourceFieldSchema[] listSchemas = pigSchema.getSchema()
+ .getFields();
+ Schema outSchema = null;
+
+ if (ASCommons.isTupleWrapper(pigSchema)) {
+ /* remove Pig tuple wrapper */
+ ASLog.details("Ignore the pig tuple wrapper.");
+ if (listSchemas.length != 1)
+ throw new IOException("Expect one subfield from "
+ + pigSchema);
+ outSchema = convert(listSchemas[0], nullable);
+ } else {
+ outSchema = convertRecord(listSchemas, nullable);
+ }
+
+ return ASCommons.wrapAsUnion(outSchema, nullable);
+
+ } else if (pigType == DataType.BAG) {
+
+ ASLog.details("Convert a pig field bag:" + pigSchema);
+
+ /* Bag elements have to be Tuples */
+ ResourceFieldSchema[] fs = pigSchema.getSchema().getFields();
+ if (fs == null || fs.length != 1
+ || fs[0].getType() != DataType.TUPLE)
+ throw new IOException("Expect one tuple field in a bag");
+
+ Schema outSchema = Schema.createArray(convert(fs[0], nullable));
+ return ASCommons.wrapAsUnion(outSchema, nullable);
+
+ } else if (pigType == DataType.MAP) {
+ /* Pig doesn't provide schema info of Map value */
+ throw new IOException("Please provide schema for Map field!");
+
+ } else if (pigType == DataType.UNKNOWN) {
+ /* Results of Pig UNION operation is of UNKNOWN type */
+ throw new IOException("Must specify a schema for UNKNOWN pig type.");
+
+ } else if (pigType == DataType.CHARARRAY
+ || pigType == DataType.BIGCHARARRAY
+ || pigType == DataType.BOOLEAN
+ || pigType == DataType.BYTE
+ || pigType == DataType.BYTEARRAY
+ || pigType == DataType.DOUBLE
+ || pigType == DataType.FLOAT
+ || pigType == DataType.INTEGER
+ || pigType == DataType.LONG) {
+
+ ASLog.details("Convert a pig field primitive:" + pigSchema);
+ Schema outSchema = convertPrimitiveType(pigType);
+ return ASCommons.wrapAsUnion(outSchema, nullable);
+
+ } else
+ throw new IOException("unsupported pig type:"
+ + DataType.findTypeName(pigType));
+ }
+
+ /**
+ * Convert pig data to Avro record
+ *
+ */
+ protected static Schema convertRecord(ResourceFieldSchema[] pigFields,
+ boolean nullable) throws IOException {
+
+ ASLog.funcCall("convertRecord");
+
+ // Type name is required for Avro record
+ String typeName = getRecordName();
+ Schema outSchema = Schema.createRecord(typeName, null, null, false);
+
+ List<Schema.Field> outFields = new ArrayList<Schema.Field>();
+ for (int i = 0; i < pigFields.length; i++) {
+
+ /* get schema */
+ Schema fieldSchema = convert(pigFields[i], nullable);
+
+ /* get field name of output */
+ String outname = pigFields[i].getName();
+ if (outname == null)
+ outname = FIELD_NAME + "_" + i; // field name cannot be null
+
+ /* get doc of output */
+ String desc = pigFields[i].getDescription();
+
+ outFields.add(new Field(outname, fieldSchema, desc, null));
+ }
+
+ outSchema.setFields(outFields);
+ return outSchema;
+
+ }
+
+ private static String getRecordName() {
+ String name = TUPLE_NAME + "_" + tupleIndex;
+ tupleIndex++;
+ return name;
+ }
+
+ /**
+ * Convert Pig primitive type to Avro type
+ *
+ */
+ protected static Schema convertPrimitiveType(byte pigType)
+ throws IOException {
+
+ if (pigType == DataType.BOOLEAN) {
+ return ASCommons.BooleanSchema;
+ } else if (pigType == DataType.BYTEARRAY) {
+ return ASCommons.BytesSchema;
+ } else if (pigType == DataType.CHARARRAY
+ || pigType == DataType.BIGCHARARRAY) {
+ return ASCommons.StringSchema;
+ } else if (pigType == DataType.DOUBLE) {
+ return ASCommons.DoubleSchema;
+ } else if (pigType == DataType.FLOAT) {
+ return ASCommons.FloatSchema;
+ } else if (pigType == DataType.INTEGER) {
+ return ASCommons.IntSchema;
+ } else if (pigType == DataType.LONG) {
+ return ASCommons.LongSchema;
+ } else
+ throw new IOException("unsupported pig type:"
+ + DataType.findTypeName(pigType));
+
+ }
+
+
+ // //////////////////////////////////////////////////////////
+ // Methods in Set 2: Validate whether a Pig schema is compatible
+ // with a given Avro schema.
+ // //////////////////////////////////////////////////////////
+
+ /**
+ * Validate whether pigSchema is compatible with avroSchema
+ */
+ public static Schema validateAndConvert(Schema avroSchema,
+ ResourceSchema pigSchema)
+ throws IOException {
+ ResourceFieldSchema[] pigFields = pigSchema.getFields();
+ return validateAndConvertRecord(avroSchema, pigFields);
+ }
+
+ /**
+ * Validate whether pigSchema is compatible with avroSchema and convert
+ * those Pig fields with to corresponding Avro schemas.
+ */
+ protected static Schema validateAndConvert(Schema avroSchema,
+ ResourceFieldSchema pigSchema)
+ throws IOException {
+
+ ASLog.details("Validate pig field schema:" + pigSchema);
+
+ /* compatibility check based on data types */
+ if (!isCompatible(avroSchema, pigSchema))
+ throw new IOException("Schemas are not compatible.\n Avro="
+ + avroSchema + "\n" + "Pig="
+ + pigSchema);
+
+ final byte pigType = pigSchema.getType();
+ if (avroSchema.getType().equals(Schema.Type.UNION)) {
+
+ ASLog.details("Validate Pig schema with Avro union:"
+ + avroSchema);
+
+ List<Schema> unionSchemas = avroSchema.getTypes();
+ for (Schema schema : unionSchemas) {
+ try {
+ @SuppressWarnings("unused")
+ Schema s = validateAndConvert(schema, pigSchema);
+ return avroSchema;
+ } catch (IOException e) {
+ // ignore the unmatched one
+ }
+ }
+ throw new IOException("pig schema " + pigSchema
+ + " is not compatible with avro "
+ + avroSchema);
+ } else if (pigType == DataType.TUPLE) {
+
+ ASLog.details("Validate a pig tuple: " + pigSchema);
+ ResourceFieldSchema[] pigFields = pigSchema.getSchema().getFields();
+ Schema outSchema = validateAndConvertRecord(avroSchema, pigFields);
+ return outSchema;
+
+ } else if (pigType == DataType.BAG) {
+
+ ASLog.details("Validate a pig bag:" + pigSchema);
+
+ /* get fields of containing tuples */
+ ResourceFieldSchema[] fs = pigSchema.getSchema().getFields();
+ if (fs == null || fs.length != 1
+ || fs[0].getType() != DataType.TUPLE)
+ throw new IOException("Expect one tuple field in a bag");
+
+ Schema inElemSchema = avroSchema.getElementType();
+ Schema outSchema = Schema.createArray(validateAndConvert(
+ inElemSchema, fs[0]));
+ return outSchema;
+ } else if (pigType == DataType.MAP) {
+
+ ASLog.details("Cannot validate a pig map. Will use user defined Avro schema.");
+ return avroSchema;
+
+ } else if (pigType == DataType.UNKNOWN || pigType == DataType.CHARARRAY
+ || pigType == DataType.BIGCHARARRAY
+ || pigType == DataType.BOOLEAN
+ || pigType == DataType.BYTE
+ || pigType == DataType.BYTEARRAY
+ || pigType == DataType.DOUBLE
+ || pigType == DataType.FLOAT
+ || pigType == DataType.INTEGER
+ || pigType == DataType.LONG) {
+
+ ASLog.details("Validate a pig primitive type:" + pigSchema);
+ return avroSchema;
+
+ } else
+ throw new IOException("Unsupported pig type:"
+ + DataType.findTypeName(pigType));
+ }
+
+ /**
+ * Validate a Pig tuple is compatible with Avro record. If the Avro schema
+ * is not complete (with uncovered fields), then convert those fields using
+ * methods in set 1.
+ *
+ * Notice that users can get rid of Pig tuple wrappers, e.g. an Avro schema
+ * "int" is compatible with a Pig schema "T:(int)"
+ *
+ */
+ protected static Schema validateAndConvertRecord(Schema avroSchema,
+ ResourceFieldSchema[] pigFields)
+ throws IOException {
+
+ /* Get rid of Pig tuple wrappers. */
+ if (!avroSchema.getType().equals(Schema.Type.RECORD)) {
+ if (pigFields.length != 1)
+ throw new IOException(
+ "Expect only one field in Pig tuple schema. Avro schema is "
+ + avroSchema.getType());
+
+ return validateAndConvert(avroSchema, pigFields[0]);
+ }
+
+ /* validate and convert a pig tuple with avro record */
+ boolean isPartialSchema = ASCommons.isUDPartialRecordSchema(avroSchema);
+ ASLog.details("isPartialSchema=" + isPartialSchema);
+
+ String typeName = isPartialSchema ? getRecordName() : avroSchema
+ .getName();
+ Schema outSchema = Schema.createRecord(typeName, avroSchema.getDoc(),
+ avroSchema.getNamespace(), false);
+
+ List<Schema.Field> inFields = avroSchema.getFields();
+ if (!isPartialSchema && inFields.size() != pigFields.length) {
+ throw new IOException("Expect " + inFields.size()
+ + " fields in pig schema."
+ + " But there are "
+ + pigFields.length);
+ }
+
+ List<Schema.Field> outFields = new ArrayList<Schema.Field>();
+
+ for (int i = 0; i < pigFields.length; i++) {
+ /* get user defined avro field schema */
+ Field inputField = isPartialSchema
+ ? ASCommons.getUDField(avroSchema, i)
+ : inFields.get(i);
+
+ /* get schema */
+ Schema fieldSchema = null;
+ if (inputField == null) {
+ /* convert pig schema (nullable) */
+ fieldSchema = convert(pigFields[i], true);
+ } else if (inputField.schema() == null) {
+ /* convert pig schema (not-null) */
+ fieldSchema = convert(pigFields[i], false);
+ } else {
+ /* validate pigFields[i] with given avro schema */
+ fieldSchema = validateAndConvert(inputField.schema(),
+ pigFields[i]);
+ }
+
+ /* get field name of output */
+ String outname = (isPartialSchema) ? pigFields[i].getName()
+ : inputField.name();
+ if (outname == null)
+ outname = FIELD_NAME + "_" + i; // field name cannot be null
+
+ /* get doc of output */
+ String doc = (isPartialSchema) ? pigFields[i].getDescription()
+ : inputField.doc();
+
+ outFields.add(new Field(outname, fieldSchema, doc, null));
+ }
+
+ outSchema.setFields(outFields);
+ return outSchema;
+
+ }
+
+ /**
+ * Check whether Avro type is compatible with Pig type
+ *
+ */
+ protected static boolean isCompatible(Schema avroSchema,
+ ResourceFieldSchema pigSchema)
+ throws IOException {
+
+ Schema.Type avroType = avroSchema.getType();
+ byte pigType = pigSchema.getType();
+
+ if (avroType.equals(Schema.Type.UNION)) {
+ return true;
+ } else if (pigType == DataType.TUPLE) {
+ /* Tuple is compatible with any type; for users may want to
+ get rid of the tuple wrapper */
+ return true;
+ }
+ return (avroType.equals(Schema.Type.ARRAY) && pigType == DataType.BAG)
+ || (avroType.equals(Schema.Type.MAP) && pigType == DataType.MAP)
+ || (avroType.equals(Schema.Type.STRING)
+ && pigType == DataType.CHARARRAY
+ || pigType == DataType.BIGCHARARRAY)
+ || (avroType.equals(Schema.Type.ENUM)
+ && pigType == DataType.CHARARRAY)
+ || (avroType.equals(Schema.Type.BOOLEAN)
+ && pigType == DataType.BOOLEAN
+ || pigType == DataType.INTEGER)
+ || (avroType.equals(Schema.Type.BYTES)
+ && pigType == DataType.BYTEARRAY)
+ || (avroType.equals(Schema.Type.DOUBLE)
+ && pigType == DataType.DOUBLE
+ || pigType == DataType.FLOAT
+ || pigType == DataType.INTEGER
+ || pigType == DataType.LONG)
+ || (avroType.equals(Schema.Type.FLOAT)
+ && pigType == DataType.FLOAT
+ || pigType == DataType.INTEGER
+ || pigType == DataType.LONG)
+ || (avroType.equals(Schema.Type.FIXED)
+ && pigType == DataType.BYTEARRAY)
+ || (avroType.equals(Schema.Type.INT)
+ && pigType == DataType.INTEGER)
+ || (avroType.equals(Schema.Type.LONG)
+ && pigType == DataType.LONG
+ || pigType == DataType.INTEGER);
+
+ }
+
+}
Added: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestAvroStorage.java
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestAvroStorage.java?rev=1065790&view=auto
==============================================================================
--- pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestAvroStorage.java (added)
+++ pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestAvroStorage.java Mon Jan 31 21:18:24 2011
@@ -0,0 +1,371 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.pig.piggybank.test.storage;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+import junit.framework.Assert;
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileStream;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.piggybank.storage.avro.ASCommons;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestAvroStorage {
+
+ protected static final Log LOG = LogFactory.getLog(TestAvroStorage.class);
+ private static PigServer pigServerLocal = null;
+ private static String basedir ;
+ private static String outbasedir ;
+
+ public static final PathFilter hiddenPathFilter = new PathFilter() {
+ public boolean accept(Path p) {
+ String name = p.getName();
+ return !name.startsWith("_") && !name.startsWith(".");
+ }
+ };
+
+ @BeforeClass
+ public static void setup() throws ExecException {
+ if (pigServerLocal == null)
+ pigServerLocal = new PigServer(ExecType.LOCAL);
+
+ basedir = "src/test/java/org/apache/pig/piggybank/test/storage/TestAvroStorageData/";
+ outbasedir = "/tmp/TestAvroStorage/";
+
+ deleteDirectory(new File (outbasedir));
+ }
+
+ @Test
+ public void testRecursiveRecord() throws IOException {
+ final String str1 =
+ "{ \"type\" : \"record\", " +
+ "\"name\": \"Node\" , " +
+ "\"fields\": [ { \"name\": \"value\", \"type\":\"int\"}, " +
+ "{ \"name\": \"next\", \"type\": [\"null\", \"Node\"] } ] }";
+ Schema s = Schema.parse(str1);
+ Assert.assertTrue(ASCommons.containRecursiveRecord(s));
+
+ final String str2 = "{\"type\": \"array\", \"items\": " + str1 + "}";
+ s = Schema.parse(str2);
+ Assert.assertTrue(ASCommons.containRecursiveRecord(s));
+
+ final String str3 ="[\"null\", " + str2 + "]";
+ s = Schema.parse(str3);
+ Assert.assertTrue(ASCommons.containRecursiveRecord(s));
+
+ final String str4 =
+ "{ \"type\" : \"record\", " +
+ "\"name\": \"Node\" , " +
+ "\"fields\": [ { \"name\": \"value\", \"type\":\"int\"}, " +
+ "{ \"name\": \"next\", \"type\": [\"null\", \"string\"] } ] }";
+ s = Schema.parse(str4);
+ Assert.assertFalse(ASCommons.containRecursiveRecord(s));
+ }
+
+ @Test
+ public void testGenericUnion() throws IOException {
+
+ final String str1 = "[ \"string\", \"int\", \"boolean\" ]";
+ Schema s = Schema.parse(str1);
+ Assert.assertTrue(ASCommons.containGenericUnion(s));
+
+ final String str2 = "[ \"string\", \"int\", \"null\" ]";
+ s = Schema.parse(str2);
+ Assert.assertTrue(ASCommons.containGenericUnion(s));
+
+ final String str3 = "[ \"string\", \"null\" ]";
+ s = Schema.parse(str3);
+ Assert.assertFalse(ASCommons.containGenericUnion(s));
+ Schema realSchema = ASCommons.getAcceptedType(s);
+ Assert.assertEquals(ASCommons.StringSchema, realSchema);
+
+ final String str4 = "{\"type\": \"array\", \"items\": " + str2 + "}";
+ s = Schema.parse(str4);
+ Assert.assertTrue(ASCommons.containGenericUnion(s));
+ try {
+ realSchema = ASCommons.getAcceptedType(s);
+ Assert.assertFalse("Should throw a runtime exception when trying to " +
+ "get accepted type from a unacceptable union", false);
+ } catch (Exception e) {
+
+ }
+
+ final String str5 =
+ "{ \"type\" : \"record\", " +
+ "\"name\": \"Node\" , " +
+ "\"fields\": [ { \"name\": \"value\", \"type\":\"int\"}, " +
+ "{ \"name\": \"next\", \"type\": [\"null\", \"int\"] } ] }";
+ s = Schema.parse(str5);
+ Assert.assertFalse(ASCommons.containGenericUnion(s));
+
+ final String str6 =
+ "{ \"type\" : \"record\", " +
+ "\"name\": \"Node\" , " +
+ "\"fields\": [ { \"name\": \"value\", \"type\":\"int\"}, " +
+ "{ \"name\": \"next\", \"type\": [\"string\", \"int\"] } ] }";
+ s = Schema.parse(str6);
+ Assert.assertTrue(ASCommons.containGenericUnion(s));
+
+ final String str7 = "[ \"string\" ]"; /*union with one type*/
+ s = Schema.parse(str7);
+ Assert.assertFalse(ASCommons.containGenericUnion(s));
+ realSchema = ASCommons.getAcceptedType(s);
+ Assert.assertEquals(ASCommons.StringSchema, realSchema);
+
+ final String str8 = "[ ]"; /*union with no type*/
+ s = Schema.parse(str8);
+ Assert.assertFalse(ASCommons.containGenericUnion(s));
+ realSchema = ASCommons.getAcceptedType(s);
+ Assert.assertNull(realSchema);
+ }
+
+ @Test
+ public void testArrayDefault() throws IOException {
+ String input = basedir + "test_array.avro";
+ String output= outbasedir + "testArrayDefault";
+ String expected = basedir + "expected_testArrayDefault.avro";
+
+ deleteDirectory(new File(output));
+
+ String [] queries = {
+ " in = LOAD '" + input + " ' USING org.apache.pig.piggybank.storage.avro.AvroStorage ();",
+ " STORE in INTO '" + output + "' USING org.apache.pig.piggybank.storage.avro.AvroStorage ();"
+ };
+ testAvroStorage( queries);
+ verifyResults(output, expected);
+ }
+
+ @Test
+ public void testArrayWithSchema() throws IOException {
+ String input = basedir + "test_array.avro";
+ String output= outbasedir + "testArrayWithSchema";
+ String expected = basedir + "expected_testArrayWithSchema.avro";
+ deleteDirectory(new File(output));
+ String [] queries = {
+ " in = LOAD '" + input + " ' USING org.apache.pig.piggybank.storage.avro.AvroStorage ();",
+ " STORE in INTO '" + output +
+ "' USING org.apache.pig.piggybank.storage.avro.AvroStorage ( " +
+ " 'schema', '{\"type\":\"array\",\"items\":\"float\"}' );"
+ };
+ testAvroStorage( queries);
+ verifyResults(output, expected);
+ }
+
+ @Test
+ public void testArrayWithNotNull() throws IOException {
+ String input = basedir + "test_array.avro";
+ String output= outbasedir + "testArrayWithNotNull";
+ String expected = basedir + "expected_testArrayWithSchema.avro";
+ deleteDirectory(new File(output));
+ String [] queries = {
+ " in = LOAD '" + input + " ' USING org.apache.pig.piggybank.storage.avro.AvroStorage ();",
+ " STORE in INTO '" + output +
+ "' USING org.apache.pig.piggybank.storage.avro.AvroStorage ( " +
+ " '{\"nullable\": false }' );"
+ };
+ testAvroStorage( queries);
+ verifyResults(output, expected);
+ }
+
+ @Test
+ public void testArrayWithSame() throws IOException {
+
+ String input = basedir + "test_array.avro";
+ String output= outbasedir + "testArrayWithSame";
+ String expected = basedir + "expected_testArrayWithSchema.avro";
+ deleteDirectory(new File(output));
+ String [] queries = {
+ " in = LOAD '" + input + " ' USING org.apache.pig.piggybank.storage.avro.AvroStorage ();",
+ " STORE in INTO '" + output +
+ "' USING org.apache.pig.piggybank.storage.avro.AvroStorage ( " +
+ " 'same', '" + input + "' );"
+ };
+ testAvroStorage( queries);
+ verifyResults(output, expected);
+ }
+
+ @Test
+ public void testRecordWithSplit() throws IOException {
+
+ String input = basedir + "test_record.avro";
+ String output1= outbasedir + "testRecordSplit1";
+ String output2= outbasedir + "testRecordSplit2";
+ String expected1 = basedir + "expected_testRecordSplit1.avro";
+ String expected2 = basedir + "expected_testRecordSplit2.avro";
+ deleteDirectory(new File(output1));
+ deleteDirectory(new File(output2));
+ String [] queries = {
+ " avro = LOAD '" + input + " ' USING org.apache.pig.piggybank.storage.avro.AvroStorage ();",
+ " groups = GROUP avro BY member_id;",
+ " sc = FOREACH groups GENERATE group AS key, COUNT(avro) AS cnt;",
+ " STORE sc INTO '" + output1 + "' " +
+ " USING org.apache.pig.piggybank.storage.avro.AvroStorage (" +
+ "'{\"index\": 1, " +
+ " \"schema\": {\"type\":\"record\", " +
+ " \"name\":\"result\", " +
+ " \"fields\":[ {\"name\":\"member_id\",\"type\":\"int\"}, " +
+ "{\"name\":\"count\", \"type\":\"long\"} " +
+ "]" +
+ "}" +
+ " }');",
+ " STORE sc INTO '" + output2 +
+ " 'USING org.apache.pig.piggybank.storage.avro.AvroStorage ('index', '2');"
+ };
+ testAvroStorage( queries);
+ verifyResults(output1, expected1);
+ verifyResults(output2, expected2);
+ }
+
+ @Test
+ public void testRecordWithFieldSchema() throws IOException {
+
+ String input = basedir + "test_record.avro";
+ String output= outbasedir + "testRecordWithFieldSchema";
+ String expected = basedir + "expected_testRecordWithFieldSchema.avro";
+ deleteDirectory(new File(output));
+ String [] queries = {
+ " avro = LOAD '" + input + " ' USING org.apache.pig.piggybank.storage.avro.AvroStorage ();",
+ " avro1 = FILTER avro BY member_id > 1211;",
+ " avro2 = FOREACH avro1 GENERATE member_id, browser_id, tracking_time, act_content ;",
+ " STORE avro2 INTO '" + output + "' " +
+ " USING org.apache.pig.piggybank.storage.avro.AvroStorage (" +
+ "'{\"data\": \"" + input + "\" ," +
+ " \"field0\": \"int\", " +
+ " \"field1\": \"def:browser_id\", " +
+ " \"field3\": \"def:act_content\" " +
+ " }');"
+ };
+ testAvroStorage( queries);
+ verifyResults(output, expected);
+ }
+
+ private static void deleteDirectory (File path) {
+ if ( path.exists()) {
+ File [] files = path.listFiles();
+ for (File file: files) {
+ if (file.isDirectory())
+ deleteDirectory(file);
+ file.delete();
+ }
+ }
+ }
+
+ private void testAvroStorage(String ...queries)
+ throws IOException {
+
+ PigServer pigServer = pigServerLocal;
+ pigServer.setBatchOn();
+ for (String query: queries){
+ if (query != null && query.length() > 0)
+ pigServer.registerQuery(query);
+ }
+
+ pigServer.executeBatch();
+ }
+
+ private void verifyResults(String outPath,
+ String expectedOutpath)
+ throws IOException {
+ FileSystem fs = FileSystem.getLocal(new Configuration()) ;
+
+ /* read in expected results*/
+ Set<Object> expected = getExpected (expectedOutpath);
+
+ /* read in output results and compare */
+ Path output = new Path(outPath);
+ Assert.assertTrue("Output dir does not exists!", fs.exists(output)
+ && fs.getFileStatus(output).isDir());
+
+ Path[] paths = FileUtil.stat2Paths(fs.listStatus(output, hiddenPathFilter));
+ Assert.assertTrue("Split field dirs not found!", paths != null);
+
+ for (Path path : paths) {
+ //String splitField = path.getName();
+ Path[] files = FileUtil.stat2Paths(fs.listStatus(path, hiddenPathFilter));
+ Assert.assertTrue("No files found for path: " + path.toUri().getPath(),
+ files != null);
+ for (Path filePath : files) {
+ Assert.assertTrue("This shouldn't be a directory", fs.isFile(filePath));
+
+ GenericDatumReader<Object> reader = new GenericDatumReader<Object>();
+
+ DataFileStream<Object> in = new DataFileStream<Object>(
+ fs.open(filePath), reader);
+
+ int count = 0;
+ while (in.hasNext()) {
+ Object obj = in.next();
+
+ Assert.assertTrue( expected.contains(obj));
+ count++;
+ }
+ in.close();
+ Assert.assertEquals(count, expected.size());
+ }
+ }
+ }
+
+ private Set<Object> getExpected (String pathstr ) throws IOException {
+
+ Set<Object> ret = new HashSet<Object>();
+ FileSystem fs = FileSystem.getLocal(new Configuration()) ;
+
+ /* read in output results and compare */
+ Path output = new Path(pathstr);
+ Assert.assertTrue("Expected output does not exists!", fs.exists(output));
+
+ Path[] paths = FileUtil.stat2Paths(fs.listStatus(output, hiddenPathFilter));
+ Assert.assertTrue("Split field dirs not found!", paths != null);
+
+ for (Path path : paths) {
+ //String splitField = path.getName();
+ Path[] files = FileUtil.stat2Paths(fs.listStatus(path, hiddenPathFilter));
+ Assert.assertTrue("No files found for path: " + path.toUri().getPath(),
+ files != null);
+ for (Path filePath : files) {
+ Assert.assertTrue("This shouldn't be a directory", fs.isFile(filePath));
+
+ GenericDatumReader<Object> reader = new GenericDatumReader<Object>();
+
+ DataFileStream<Object> in = new DataFileStream<Object>(
+ fs.open(filePath), reader);
+
+ while (in.hasNext()) {
+ Object obj = in.next();
+ ret.add(obj);
+ }
+ in.close();
+ }
+ }
+ return ret;
+ }
+
+}
Added: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestAvroStorageData/expected_testArrayDefault.avro
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestAvroStorageData/expected_testArrayDefault.avro?rev=1065790&view=auto
==============================================================================
Binary file - no diff available.
Propchange: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestAvroStorageData/expected_testArrayDefault.avro
------------------------------------------------------------------------------
svn:mime-type = application/octet-stream
Added: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestAvroStorageData/expected_testArrayWithSchema.avro
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestAvroStorageData/expected_testArrayWithSchema.avro?rev=1065790&view=auto
==============================================================================
Binary file - no diff available.
Propchange: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestAvroStorageData/expected_testArrayWithSchema.avro
------------------------------------------------------------------------------
svn:mime-type = application/octet-stream
Added: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestAvroStorageData/expected_testRecordSplit1.avro
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestAvroStorageData/expected_testRecordSplit1.avro?rev=1065790&view=auto
==============================================================================
Binary file - no diff available.
Propchange: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestAvroStorageData/expected_testRecordSplit1.avro
------------------------------------------------------------------------------
svn:mime-type = application/octet-stream
Added: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestAvroStorageData/expected_testRecordSplit2.avro
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestAvroStorageData/expected_testRecordSplit2.avro?rev=1065790&view=auto
==============================================================================
Binary file - no diff available.
Propchange: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestAvroStorageData/expected_testRecordSplit2.avro
------------------------------------------------------------------------------
svn:mime-type = application/octet-stream
Added: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestAvroStorageData/expected_testRecordWithFieldSchema.avro
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestAvroStorageData/expected_testRecordWithFieldSchema.avro?rev=1065790&view=auto
==============================================================================
Binary file - no diff available.
Propchange: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestAvroStorageData/expected_testRecordWithFieldSchema.avro
------------------------------------------------------------------------------
svn:mime-type = application/octet-stream
Added: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestAvroStorageData/test_array.avro
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestAvroStorageData/test_array.avro?rev=1065790&view=auto
==============================================================================
Binary file - no diff available.
Propchange: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestAvroStorageData/test_array.avro
------------------------------------------------------------------------------
svn:mime-type = application/octet-stream
Added: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestAvroStorageData/test_record.avro
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestAvroStorageData/test_record.avro?rev=1065790&view=auto
==============================================================================
Binary file - no diff available.
Propchange: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestAvroStorageData/test_record.avro
------------------------------------------------------------------------------
svn:mime-type = application/octet-stream
Modified: pig/trunk/ivy.xml
URL: http://svn.apache.org/viewvc/pig/trunk/ivy.xml?rev=1065790&r1=1065789&r2=1065790&view=diff
==============================================================================
--- pig/trunk/ivy.xml (original)
+++ pig/trunk/ivy.xml Mon Jan 31 21:18:24 2011
@@ -65,6 +65,11 @@
<dependency org="commons-cli" name="commons-cli" rev="${commons-cli.version}"
conf="checkstyle->master"/>
+ <dependency org="org.apache.avro" name="avro" rev="${avro.version}"
+ conf="compile->master;checkstyle->master"/>
+ <dependency org="com.googlecode.json-simple" name="json-simple" rev="${json-simple.version}"
+ conf="compile->master;checkstyle->master"/>
+
<dependency org="jdiff" name="jdiff" rev="${jdiff.version}"
conf="jdiff->default"/>
<dependency org="xerces" name="xerces" rev="${xerces.version}"
Modified: pig/trunk/ivy/libraries.properties
URL: http://svn.apache.org/viewvc/pig/trunk/ivy/libraries.properties?rev=1065790&r1=1065789&r2=1065790&view=diff
==============================================================================
--- pig/trunk/ivy/libraries.properties (original)
+++ pig/trunk/ivy/libraries.properties Mon Jan 31 21:18:24 2011
@@ -30,7 +30,7 @@ hadoop-core.version=0.20.2
hadoop-test.version=0.20.2
hbase.version=0.20.6
hsqldb.version=1.8.0.10
-jackson.version=1.0.1
+jackson.version=1.6.0
javacc.version=4.2
jdiff.version=1.0.9
jetty-util.version=6.1.14
@@ -45,3 +45,6 @@ slf4j-api.version=1.4.3
slf4j-log4j12.version=1.4.3
xerces.version=1.4.4
wagon-http.version=1.0-beta-2
+
+avro.version=1.4.0
+json-simple.version=1.1