You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2011/02/01 02:40:59 UTC
svn commit: r1065886 [1/2] - in /pig/trunk: ./ .eclipse.templates/
contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/
contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/
contrib/piggybank/java/src/t...
Author: daijy
Date: Tue Feb 1 01:40:58 2011
New Revision: 1065886
URL: http://svn.apache.org/viewvc?rev=1065886&view=rev
Log:
PIG-1748: Add load/store function AvroStorage for avro data (again)
Added:
pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/
pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroSchema2Pig.java
pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroSchemaManager.java
pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorage.java
pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorageInputStream.java
pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorageLog.java
pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorageUtils.java
pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroDatumReader.java
pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroDatumWriter.java
pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroInputFormat.java
pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroOutputFormat.java
pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroRecordReader.java
pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroRecordWriter.java
pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigSchema2Avro.java
pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/
pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/TestAvroStorage.java
pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/TestAvroStorageUtils.java
pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/
pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/expected_testArrayDefault.avro (with props)
pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/expected_testArrayWithSchema.avro (with props)
pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/expected_testRecordSplit1.avro (with props)
pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/expected_testRecordSplit2.avro (with props)
pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/expected_testRecordWithFieldSchema.avro (with props)
pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_array.avro (with props)
pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_record.avro (with props)
Modified:
pig/trunk/.eclipse.templates/.classpath
pig/trunk/CHANGES.txt
pig/trunk/ivy.xml
pig/trunk/ivy/libraries.properties
Modified: pig/trunk/.eclipse.templates/.classpath
URL: http://svn.apache.org/viewvc/pig/trunk/.eclipse.templates/.classpath?rev=1065886&r1=1065885&r2=1065886&view=diff
==============================================================================
--- pig/trunk/.eclipse.templates/.classpath (original)
+++ pig/trunk/.eclipse.templates/.classpath Tue Feb 1 01:40:58 2011
@@ -7,8 +7,8 @@
<classpathentry kind="src" path="test"/>
<classpathentry kind="src" path="tutorial/src"/>
<classpathentry exported="true" kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER"/>
- <classpathentry exported="true" kind="lib" path="build/ivy/lib/Pig/jackson-core-asl-1.0.1.jar"/>
- <classpathentry exported="true" kind="lib" path="build/ivy/lib/Pig/jackson-mapper-asl-1.0.1.jar"/>
+ <classpathentry exported="true" kind="lib" path="build/ivy/lib/Pig/jackson-core-asl-1.6.0.jar"/>
+ <classpathentry exported="true" kind="lib" path="build/ivy/lib/Pig/jackson-mapper-asl-1.6.0.jar"/>
<classpathentry exported="true" kind="lib" path="build/ivy/lib/Pig/javacc-4.2.jar"/>
<classpathentry exported="true" kind="lib" path="build/ivy/lib/Pig/javacc.jar"/>
<classpathentry exported="true" kind="lib" path="build/ivy/lib/Pig/jline-0.9.94.jar"/>
@@ -49,7 +49,9 @@
<classpathentry exported="true" kind="lib" path="build/ivy/lib/Pig/servlet-api-2.5-6.1.14.jar"/>
<classpathentry exported="true" kind="lib" path="build/ivy/lib/Pig/slf4j-api-1.5.2.jar"/>
<classpathentry exported="true" kind="lib" path="build/ivy/lib/Pig/slf4j-log4j12-1.4.3.jar"/>
- <classpathentry exported="true" kind="lib" path="build/ivy/lib/Pig/xmlenc-0.52.jar"/>
+ <classpathentry exported="true" kind="lib" path="build/ivy/lib/Pig/xmlenc-0.52.jar"/>
+ <classpathentry exported="true" kind="lib" path="build/ivy/lib/Pig/avro-1.4.1.jar" />
+ <classpathentry exported="true" kind="lib" path="build/ivy/lib/Pig/json-simple-1.1.jar" />
<classpathentry exported="true" kind="lib" path="lib/automaton.jar"/>
<classpathentry kind="output" path="build/classes"/>
</classpath>
Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1065886&r1=1065885&r2=1065886&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Tue Feb 1 01:40:58 2011
@@ -24,6 +24,8 @@ INCOMPATIBLE CHANGES
IMPROVEMENTS
+PIG-1748: Add load/store function AvroStorage for avro data (guolin2001, jghoman via daijy)
+
PIG-1769: Consistency for HBaseStorage (dvryaboy)
PIG-1786: Move describe/nested describe to new logical plan (daijy)
Added: pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroSchema2Pig.java
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroSchema2Pig.java?rev=1065886&view=auto
==============================================================================
--- pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroSchema2Pig.java (added)
+++ pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroSchema2Pig.java Tue Feb 1 01:40:58 2011
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.pig.piggybank.storage.avro;
+
+import java.io.IOException;
+import java.util.List;
+import org.apache.avro.Schema;
+import org.apache.pig.ResourceSchema;
+import org.apache.pig.ResourceSchema.ResourceFieldSchema;
+import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
+import org.apache.pig.data.DataType;
+
+/**
+ * This class converts Avro schema to Pig schema
+ */
+public class AvroSchema2Pig {
+
+ public static String RECORD = "RECORD";
+ public static String FIELD = "FIELD";
+ public static String ARRAY_FIELD = "ARRAY_ELEM";
+ public static String MAP_VALUE_FIELD = "m_value";
+
+ /**
+ * Wrap a pig type to a field schema
+ */
+ public static ResourceFieldSchema getPigSchema(byte pigType, String fieldName) {
+ return new ResourceFieldSchema( new FieldSchema(fieldName, pigType));
+ }
+
+ /**
+ * Convert an Avro schema to a Pig schema
+ */
+ public static ResourceSchema convert(Schema schema) throws IOException {
+
+ if (AvroStorageUtils.containsGenericUnion(schema))
+ throw new IOException ("We don't accept schema containing generic unions.");
+
+ if (AvroStorageUtils.containsRecursiveRecord(schema))
+ throw new IOException ("We don't accept schema containing recursive records.");
+
+ ResourceFieldSchema inSchema = inconvert(schema, FIELD);
+
+ ResourceSchema tupleSchema;
+ if (inSchema.getType() == DataType.TUPLE) {
+ tupleSchema = inSchema.getSchema();
+ } else { // other typs
+ ResourceFieldSchema tupleWrapper = AvroStorageUtils.wrapAsTuple(inSchema);
+
+ ResourceSchema topSchema = new ResourceSchema();
+ topSchema.setFields(new ResourceFieldSchema[] { tupleWrapper });
+
+ tupleSchema = topSchema;
+
+ }
+ return tupleSchema;
+ }
+
+ /**
+ * Convert a schema with field name to a pig schema
+ */
+ private static ResourceFieldSchema inconvert(Schema in, String fieldName) throws IOException {
+
+ AvroStorageLog.details("InConvert avro schema with field name " + fieldName);
+
+ Schema.Type avroType = in.getType();
+ ResourceFieldSchema fieldSchema = new ResourceFieldSchema();
+ fieldSchema.setName(fieldName);
+
+ if (avroType.equals(Schema.Type.RECORD)) {
+
+ AvroStorageLog.details("convert to a pig tuple");
+
+ fieldSchema.setType(DataType.TUPLE);
+ ResourceSchema tupleSchema = new ResourceSchema();
+ List<Schema.Field> fields = in.getFields();
+ ResourceFieldSchema[] childFields = new ResourceFieldSchema[fields.size()];
+ int index = 0;
+ for (Schema.Field field : fields) {
+ childFields[index++] = inconvert(field.schema(), field.name());
+ }
+
+ tupleSchema.setFields(childFields);
+ fieldSchema.setSchema(tupleSchema);
+
+ } else if (avroType.equals(Schema.Type.ARRAY)) {
+
+ AvroStorageLog.details("convert array to a pig bag");
+ fieldSchema.setType(DataType.BAG);
+ Schema elemSchema = in.getElementType();
+ ResourceFieldSchema subFieldSchema = inconvert(elemSchema, ARRAY_FIELD);
+ add2BagSchema(fieldSchema, subFieldSchema);
+
+ } else if (avroType.equals(Schema.Type.MAP)) {
+
+ AvroStorageLog.details("convert map to a pig map");
+ fieldSchema.setType(DataType.MAP);
+
+ } else if (avroType.equals(Schema.Type.UNION)) {
+
+ if (AvroStorageUtils.isAcceptableUnion(in)) {
+ Schema acceptSchema = AvroStorageUtils.getAcceptedType(in);
+ ResourceFieldSchema realFieldSchema = inconvert(acceptSchema, null);
+ fieldSchema.setType(realFieldSchema.getType());
+ fieldSchema.setSchema(realFieldSchema.getSchema());
+ } else
+ throw new IOException("Do not support generic union:" + in);
+
+ } else if (avroType.equals(Schema.Type.FIXED)) {
+ fieldSchema.setType(DataType.BYTEARRAY);
+ } else if (avroType.equals(Schema.Type.BOOLEAN)) {
+ fieldSchema.setType(DataType.BOOLEAN);
+ } else if (avroType.equals(Schema.Type.BYTES)) {
+ fieldSchema.setType(DataType.BYTEARRAY);
+ } else if (avroType.equals(Schema.Type.DOUBLE)) {
+ fieldSchema.setType(DataType.DOUBLE);
+ } else if (avroType.equals(Schema.Type.ENUM)) {
+ fieldSchema.setType(DataType.CHARARRAY);
+ } else if (avroType.equals(Schema.Type.FLOAT)) {
+ fieldSchema.setType(DataType.FLOAT);
+ } else if (avroType.equals(Schema.Type.INT)) {
+ fieldSchema.setType(DataType.INTEGER);
+ } else if (avroType.equals(Schema.Type.LONG)) {
+ fieldSchema.setType(DataType.LONG);
+ } else if (avroType.equals(Schema.Type.STRING)) {
+ fieldSchema.setType(DataType.CHARARRAY);
+ } else if (avroType.equals(Schema.Type.NULL)) {
+ // value of NULL is always NULL
+ fieldSchema.setType(DataType.INTEGER);
+ } else {
+ throw new IOException("Unsupported avro type:" + avroType);
+ }
+ return fieldSchema;
+ }
+
+ /**
+ * Add a field schema to a bag schema
+ */
+ static protected void add2BagSchema(ResourceFieldSchema fieldSchema,
+ ResourceFieldSchema subFieldSchema)
+ throws IOException {
+
+ ResourceFieldSchema wrapped = (subFieldSchema.getType() == DataType.TUPLE)
+ ? subFieldSchema
+ : AvroStorageUtils.wrapAsTuple(subFieldSchema);
+
+ ResourceSchema listSchema = new ResourceSchema();
+ listSchema.setFields(new ResourceFieldSchema[] { wrapped });
+
+ fieldSchema.setSchema(listSchema);
+
+ }
+
+}
Added: pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroSchemaManager.java
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroSchemaManager.java?rev=1065886&view=auto
==============================================================================
--- pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroSchemaManager.java (added)
+++ pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroSchemaManager.java Tue Feb 1 01:40:58 2011
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.pig.piggybank.storage.avro;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.avro.Schema;
+import org.apache.avro.Schema.Field;
+import org.apache.avro.Schema.Type;
+
+/**
+ * This class creates two maps out of a given Avro schema. And it supports
+ * looking up avro schemas using either type name or field name.
+ *
+ * 1. map[type name] = > avro schema
+ * 2. map[field name] => avro schema
+ *
+ */
+public class AvroSchemaManager {
+
+ /**map[field name] => schema */
+ Map<String, Schema> name2Schema = null;
+ /**map[type name]=> schema*/
+ Map<String, Schema> typeName2Schema = null;
+
+ /**
+ * Construct with a given schema
+ */
+ public AvroSchemaManager(Schema schema) {
+
+ name2Schema = new HashMap<String, Schema>();
+ typeName2Schema = new HashMap<String, Schema>();
+
+ init(null, schema, false);
+ }
+
+ private boolean isNamedSchema(Schema schema) {
+ Type type = schema.getType();
+ return type.equals(Type.RECORD) || type.equals(Type.ENUM) || type.equals(Type.FIXED);
+ }
+
+ /**
+ * Initialize given a schema
+ */
+ protected void init(String namespace, Schema schema,
+ boolean ignoreNameMap) {
+
+ /* put to map[type name]=>schema */
+ if (isNamedSchema(schema)) {
+ String typeName = schema.getName();
+ if (typeName2Schema.containsKey(typeName))
+ AvroStorageLog.warn("Duplicate schemas defined for type:"
+ + typeName
+ + ". will ignore the second one:"
+ + schema);
+ else {
+ AvroStorageLog.details("add " + schema.getName() + "=" + schema
+ + " to type2Schema");
+ typeName2Schema.put(schema.getName(), schema);
+ }
+ }
+
+ /* put field schema to map[field name]=>schema*/
+ if (schema.getType().equals(Type.RECORD)) {
+
+ List<Field> fields = schema.getFields();
+ for (Field field : fields) {
+
+ Schema fieldSchema = field.schema();
+ String name = (namespace == null) ? field.name() : namespace + "." + field.name();
+
+ if (!ignoreNameMap) {
+ if (name2Schema.containsKey(name))
+ AvroStorageLog.warn("Duplicate schemas defined for alias:" + name
+ + ". Will ignore the second one:"+ fieldSchema);
+ else {
+ AvroStorageLog.details("add " + name + "=" + fieldSchema + " to name2Schema");
+ name2Schema.put(name, fieldSchema);
+ }
+ }
+
+ init(name, fieldSchema, ignoreNameMap);
+ }
+ } else if (schema.getType().equals(Type.UNION)) {
+
+ if (AvroStorageUtils.isAcceptableUnion(schema)) {
+ Schema realSchema = AvroStorageUtils.getAcceptedType(schema);
+ init(namespace, realSchema, ignoreNameMap);
+ } else {
+ List<Schema> list = schema.getTypes();
+ for (Schema s : list) {
+ init(namespace, s, true);
+ }
+ }
+ } else if (schema.getType().equals(Type.ARRAY)) {
+ Schema elemSchema = schema.getElementType();
+ init(namespace, elemSchema, true);
+ } else if (schema.getType().equals(Type.MAP)) {
+ Schema valueSchema = schema.getValueType();
+ init(namespace, valueSchema, true);
+ }
+ }
+
+ /**
+ * Look up schema using type name or field name
+ */
+ public Schema getSchema(String name) {
+ Schema schema = typeName2Schema.get(name);
+ schema = (schema == null) ? name2Schema.get(name) : schema;
+ return schema;
+
+ }
+}
Added: pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorage.java
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorage.java?rev=1065886&view=auto
==============================================================================
--- pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorage.java (added)
+++ pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorage.java Tue Feb 1 01:40:58 2011
@@ -0,0 +1,576 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.pig.piggybank.storage.avro;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.Set;
+import org.apache.avro.Schema;
+import org.apache.avro.Schema.Field;
+import org.apache.avro.file.DataFileStream;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.pig.Expression;
+import org.apache.pig.FileInputLoadFunc;
+import org.apache.pig.LoadFunc;
+import org.apache.pig.LoadMetadata;
+import org.apache.pig.ResourceSchema;
+import org.apache.pig.ResourceStatistics;
+import org.apache.pig.StoreFunc;
+import org.apache.pig.StoreFuncInterface;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.util.UDFContext;
+import org.json.simple.JSONObject;
+import org.json.simple.parser.JSONParser;
+import org.json.simple.parser.ParseException;
+
+/**
+ * AvroStorage is used to load/store Avro data <br/>
+ * Document can be found <a href='http://snaprojects.jira.com/wiki/display/HTOOLS/AvroStorage+-+Pig+support+for+Avro+data'>here</a>
+ */
+public class AvroStorage extends FileInputLoadFunc implements StoreFuncInterface, LoadMetadata {
+
+ /* storeFunc parameters */
+ private static final String NOTNULL = "NOTNULL";
+ private static final String AVRO_OUTPUT_SCHEMA_PROPERTY = "avro_output_schema";
+ private static final String SCHEMA_DELIM = "#";
+ private static final String SCHEMA_KEYVALUE_DELIM = "@";
+
+ /* FIXME: we use this variable to distinguish schemas specified
+ * by different AvroStorage calls and will remove this once
+ * StoreFunc provides access to Pig output schema in backend.
+ * Default value is 0.
+ */
+ private int storeFuncIndex = 0;
+ private PigAvroRecordWriter writer = null; /* avro record writer */
+ private Schema outputAvroSchema = null; /* output avro schema */
+ /* indicate whether data is nullable */
+ private boolean nullable = true;
+
+ /* loadFunc parameters */
+ private PigAvroRecordReader reader = null; /* avro record writer */
+ private Schema inputAvroSchema = null; /* input avro schema */
+
+ private boolean checkSchema = true; /*whether check schema of input directories*/
+
+ /**
+ * Empty constructor. Output schema is derived from pig schema.
+ */
+ public AvroStorage() {
+ outputAvroSchema = null;
+ nullable = true;
+ AvroStorageLog.setDebugLevel(0);
+ checkSchema = true;
+ }
+
+ /**
+ * Constructor of quoted string list
+ *
+ * @param parts quoted string list
+ * @throws IOException
+ * @throws ParseException
+ */
+ public AvroStorage(String[] parts) throws IOException, ParseException {
+
+ outputAvroSchema = null;
+ nullable = true;
+
+ checkSchema = true;
+ if (parts.length == 1 && parts[0].equalsIgnoreCase("no_schema_check")) {
+ checkSchema = false;
+ } else {
+ /*parse input parameters */
+ Map<String, Object> map = parts.length > 1 ? parseStringList(parts) : parseJsonString(parts[0]);
+
+ init(map); /* initialize */
+ }
+ }
+
+
+ /**
+ * Set input location and obtain input schema.
+ *
+ * FIXME: currently we assume all avro files under the same "location"
+ * share the same schema and will throw exception if not.
+ */
+ @Override
+ public void setLocation(String location, Job job) throws IOException {
+ if(AvroStorageUtils.addInputPaths(location, job) && inputAvroSchema == null) {
+ inputAvroSchema = getAvroSchema(location, job);
+ }
+ }
+
+ protected Schema getAvroSchema(String location, Job job) throws IOException {
+ Configuration conf = job.getConfiguration();
+ FileSystem fs = FileSystem.get(conf);
+ Path path = new Path(location);
+ return getAvroSchema(path, fs);
+ }
+
+ /**
+ * Get avro schema of input path. There are three cases:
+ * 1. if path is a file, then return its avro schema;
+ * 2. if path is a first-level directory (no sub-directories), then
+ * return the avro schema of one underlying file;
+ * 3. if path contains sub-directories, then recursively check
+ * whether all of them share the same schema and return it
+ * if so or throw an exception if not.
+ *
+ * @param path input path
+ * @param fs file system
+ * @return avro schema of data
+ * @throws IOException if underlying sub-directories do not share the same schema; or if input path is empty or does not exist
+ */
+ @SuppressWarnings("deprecation")
+ protected Schema getAvroSchema(Path path, FileSystem fs) throws IOException {
+ if (!fs.exists(path) || !AvroStorageUtils.PATH_FILTER.accept(path))
+ return null;
+
+ /* if path is first level directory or is a file */
+ if (!fs.isDirectory(path)) {
+ return getSchema(path, fs);
+ }
+
+ FileStatus[] ss = fs.listStatus(path, AvroStorageUtils.PATH_FILTER);
+ Schema schema = null;
+ if (ss.length > 0) {
+ if (AvroStorageUtils.noDir(ss))
+ return getSchema(path, fs);
+
+ /*otherwise, check whether schemas of underlying directories are the same */
+ for (FileStatus s : ss) {
+ Schema newSchema = getAvroSchema(s.getPath(), fs);
+ if (schema == null) {
+ schema = newSchema;
+ if(!checkSchema) {
+ System.out.println("Do not check shema; use schema of " + s.getPath());
+ return schema;
+ }
+ } else if (!schema.equals(newSchema)) {
+ throw new IOException( "Input path is " + path + ". Sub-direcotry " + s.getPath()
+ + " contains different schema " + newSchema + " than " + schema);
+ }
+ }
+ }
+
+ if (schema == null)
+ System.err.println("Cannot get avro schema! Input path " + path + " might be empty.");
+ return schema;
+ }
+
+ /**
+ * This method is called by {@link #getAvroSchema}. The default implementation
+ * returns the schema of an avro file; or the schema of the last file in a first-level
+ * directory (it does not contain sub-directories).
+ *
+ * @param path path of a file or first level directory
+ * @param fs file system
+ * @return avro schema
+ * @throws IOException
+ */
+ protected Schema getSchema(Path path, FileSystem fs) throws IOException {
+ /* get path of the last file */
+ Path lastFile = AvroStorageUtils.getLast(path, fs);
+
+ /* read in file and obtain schema */
+ GenericDatumReader<Object> avroReader = new GenericDatumReader<Object>();
+ InputStream hdfsInputStream = fs.open(lastFile);
+ DataFileStream<Object> avroDataStream = new DataFileStream<Object>(hdfsInputStream, avroReader);
+ Schema ret = avroDataStream.getSchema();
+ avroDataStream.close();
+ return ret;
+ }
+
+
+ @SuppressWarnings("rawtypes")
+ @Override
+ public InputFormat getInputFormat() throws IOException {
+ AvroStorageLog.funcCall("getInputFormat");
+ if(inputAvroSchema != null)
+ return new PigAvroInputFormat(inputAvroSchema);
+ else
+ return new TextInputFormat();
+ }
+
+ @SuppressWarnings("rawtypes")
+ @Override
+ public void prepareToRead(RecordReader reader, PigSplit split)
+ throws IOException {
+ AvroStorageLog.funcCall("prepareToRead");
+ this.reader = (PigAvroRecordReader) reader;
+ }
+
+ @Override
+ public Tuple getNext() throws IOException {
+ try {
+ if (!reader.nextKeyValue()) {
+ return null;
+ }
+ return (Tuple) this.reader.getCurrentValue();
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
+
+
+ /* implement LoadMetadata */
+
+ /**
+ * Get avro schema from "location" and return the converted
+ * PigSchema.
+ */
+ @Override
+ public ResourceSchema getSchema(String location, Job job)
+ throws IOException {
+
+ /* get avro schema */
+ AvroStorageLog.funcCall("getSchema");
+ if (inputAvroSchema == null) {
+ inputAvroSchema = getAvroSchema(location, job);
+ }
+ if(inputAvroSchema != null) {
+ AvroStorageLog.details( "avro input schema:" + inputAvroSchema);
+
+ /* convert to pig schema */
+ ResourceSchema pigSchema = AvroSchema2Pig.convert(inputAvroSchema);
+ AvroStorageLog.details("pig input schema:" + pigSchema);
+ return pigSchema;
+ } else
+ return null;
+ }
+
+ @Override
+ public ResourceStatistics getStatistics(String location, Job job) throws IOException {
+ return null; // Nothing to do
+ }
+
+ @Override
+ public String[] getPartitionKeys(String location, Job job) throws IOException {
+ return null; // Nothing to do
+ }
+
+ @Override
+ public void setPartitionFilter(Expression partitionFilter) throws IOException {
+ // Nothing to do
+ }
+
+ /**
+ * build a property map from a json object
+ *
+ * @param jsonString json object in string format
+ * @return a property map
+ * @throws ParseException
+ */
+ @SuppressWarnings("unchecked")
+ protected Map<String, Object> parseJsonString(String jsonString) throws ParseException {
+
+ /*parse the json object */
+ JSONParser parser = new JSONParser();
+ JSONObject obj = (JSONObject) parser.parse(jsonString);
+
+ Set<Entry<String, Object>> entries = obj.entrySet();
+ for (Entry<String, Object> entry : entries) {
+
+ String key = entry.getKey();
+ Object value = entry.getValue();
+
+ if (key.equalsIgnoreCase("debug") || key.equalsIgnoreCase("index")) {
+ /* convert long values to integer */
+ int v = ((Long) value).intValue();
+ obj.put(key, v);
+ }
+ else if (key.equalsIgnoreCase("schema") || key.matches("field\\d+")) {
+ /* convert avro schema (as json object) to string */
+ obj.put(key, value.toString().trim());
+ }
+
+ }
+ return obj;
+ }
+
+ /**
+ * build a property map from a string list
+ *
+ * @param parts input string list
+ * @return a property map
+ * @throws IOException
+ * @throws ParseException
+ */
+ protected Map<String, Object> parseStringList(String[] parts) throws IOException {
+
+ Map<String, Object> map = new HashMap<String, Object>();
+
+ for (int i = 0; i < parts.length - 1; i += 2) {
+ String name = parts[i].trim();
+ String value = parts[i+1].trim();
+ if (name.equalsIgnoreCase("debug")
+ || name.equalsIgnoreCase("index")) {
+ /* store value as integer */
+ map.put(name, Integer.parseInt(value));
+ } else if (name.equalsIgnoreCase("data")
+ || name.equalsIgnoreCase("same")
+ || name.equalsIgnoreCase("schema")
+ || name.matches("field\\d+")) {
+ /* store value as string */
+ map.put(name, value);
+ } else if (name.equalsIgnoreCase("nullable")) {
+ /* store value as boolean */
+ map.put(name, Boolean.getBoolean(value));
+ } else
+ throw new IOException("Invalid parameter:" + name);
+ }
+ return map;
+ }
+
+ /**
+ * Initialize output avro schema using input property map
+ */
+ protected void init(Map<String, Object> inputs) throws IOException {
+
+ /*used to store field schemas */
+ List<Field> fields = null;
+
+ /* set debug level */
+ if (inputs.containsKey("debug")) {
+ AvroStorageLog.setDebugLevel((Integer) inputs.get("debug"));
+ }
+
+ /* initialize schema manager, if any */
+ AvroSchemaManager schemaManager = null;
+ if (inputs.containsKey("data")) {
+ Path path = new Path((String) inputs.get("data"));
+ AvroStorageLog.details("data path=" + path.toUri().toString());
+ FileSystem fs = FileSystem.get(path.toUri(), new Configuration());
+ Schema schema = getAvroSchema(path, fs);
+ schemaManager = new AvroSchemaManager(schema);
+ }
+
+ /* iterate input property map */
+ for (Entry<String, Object> entry : inputs.entrySet()) {
+ String name = entry.getKey().trim();
+ Object value = entry.getValue();
+
+ if (name.equalsIgnoreCase("index")) {
+ /* set index of store function */
+ storeFuncIndex = (Integer) value;
+ } else if (name.equalsIgnoreCase("same")) {
+ /* use schema in the specified path as output schema */
+ Path path = new Path( ((String) value).trim());
+ AvroStorageLog.details("data path=" + path.toUri().toString());
+ FileSystem fs = FileSystem.get(path.toUri(), new Configuration());
+ outputAvroSchema = getAvroSchema(path, fs);
+ } else if (name.equalsIgnoreCase("nullable")) {
+ nullable = (Boolean) value;
+ } else if (name.equalsIgnoreCase("schema")) {
+ outputAvroSchema = Schema.parse((String) value);
+ } else if (name.matches("field\\d+")) {
+ /*set schema of dth field */
+ if (fields == null)
+ fields = new ArrayList<Field>();
+
+ int index = Integer.parseInt(name.substring("field".length()));
+ String content = ((String) value).trim();
+ Field field = null;
+ if (content.equalsIgnoreCase(NOTNULL)) {
+ /* null means deriving avro schema from pig schema but not null*/
+ field = AvroStorageUtils.createUDField(index, null);
+ } else if (content.startsWith("def:")) {
+ if (schemaManager == null)
+ throw new IOException("Please specify data parameter (using \"data\") before this one.");
+
+ String alias = content.substring("def:".length());
+ Schema s = schemaManager.getSchema(alias);
+ if (s == null)
+ throw new IOException("Cannot find matching schema for alias:" + alias);
+ /* use pre-defined schema*/
+ field = AvroStorageUtils.createUDField(index, s);
+
+ AvroStorageLog.details("Use pre-defined schema(" + alias + "): " + s + " for field " + index);
+ } else {
+ Schema schema = null;
+ try {
+ schema = Schema.parse(content);
+ } catch (RuntimeException e) {
+ /* might be primary schema like int or long */
+ schema = Schema.parse("\"" + content + "\"");
+ }
+
+ field = AvroStorageUtils.createUDField(index, schema);
+ }
+
+ fields.add(field);
+ } else if (!name.equalsIgnoreCase("data")
+ && !name.equalsIgnoreCase("debug")) {
+ throw new IOException("Invalid parameter:" + name);
+ }
+ }
+
+ /* if schemas of some fields are set */
+ if (fields != null && outputAvroSchema == null) {
+ outputAvroSchema = AvroStorageUtils.createUDPartialRecordSchema();
+ outputAvroSchema.setFields(fields);
+ }
+
+ /* print warning if both output and nullable are specified;
+ * and nullable will be ignored.*/
+ if (outputAvroSchema != null) {
+ if (!nullable) {
+ AvroStorageLog.warn("Invalid parameter--nullable cannot be false while "
+ + "output schema is not null. Will ignore nullable.\n\n");
+ nullable = true;
+ }
+ }
+
+ }
+
+ @Override
+ public String relToAbsPathForStoreLocation(String location, Path curDir) throws IOException {
+ return LoadFunc.getAbsolutePath(location, curDir);
+ }
+
+ @Override
+ public void setStoreLocation(String location, Job job) throws IOException {
+ AvroStorageLog.details("output location=" + location);
+ FileOutputFormat.setOutputPath(job, new Path(location));
+ }
+
+ /**
+ * Append newly specified schema
+ */
+ @Override
+ public void checkSchema(ResourceSchema s) throws IOException {
+ AvroStorageLog.funcCall("Check schema");
+ UDFContext context = UDFContext.getUDFContext();
+ Properties property = context.getUDFProperties(ResourceSchema.class);
+ String prevSchemaStr = property.getProperty(AVRO_OUTPUT_SCHEMA_PROPERTY);
+ AvroStorageLog.details("Previously defined schemas=" + prevSchemaStr);
+
+ String key = getSchemaKey();
+ Map<String, String> schemaMap = (prevSchemaStr != null)
+ ? parseSchemaMap(prevSchemaStr)
+ : null;
+
+ if (schemaMap != null && schemaMap.containsKey(key)) {
+ AvroStorageLog.warn("Duplicate value for key-" + key + ". Will ignore the new schema.");
+ return;
+ }
+
+ /* validate and convert output schema */
+ Schema schema = outputAvroSchema != null
+ ? PigSchema2Avro.validateAndConvert(outputAvroSchema, s)
+ : PigSchema2Avro.convert(s, nullable);
+
+ AvroStorageLog.info("key=" + key + " outputSchema=" + schema);
+
+ String schemaStr = schema.toString();
+ String append = key + SCHEMA_KEYVALUE_DELIM + schemaStr;
+
+ String newSchemaStr = (schemaMap != null)
+ ? prevSchemaStr + SCHEMA_DELIM + append
+ : append;
+ property.setProperty(AVRO_OUTPUT_SCHEMA_PROPERTY, newSchemaStr);
+ AvroStorageLog.details("New schemas=" + newSchemaStr);
+
+ }
+
+ private String getSchemaKey() {
+ return Integer.toString(storeFuncIndex);
+ }
+
+ private Map<String, String> parseSchemaMap(String input) throws IOException {
+ AvroStorageLog.details("Parse schema map from " + input);
+ String[] entries = input.split(SCHEMA_DELIM);
+ Map<String, String> map = new HashMap<String, String>();
+ for (String entry : entries) {
+
+ AvroStorageLog.details("Entry = " + entry);
+ if (entry.length() == 0)
+ continue;
+
+ String[] parts = entry.split(SCHEMA_KEYVALUE_DELIM);
+ if (parts.length != 2)
+ throw new IOException("Expect 2 fields in " + entry);
+ map.put(parts[0], parts[1]);
+ }
+ return map;
+ }
+
+ @SuppressWarnings("rawtypes")
+ @Override
+ public OutputFormat getOutputFormat() throws IOException {
+ AvroStorageLog.funcCall("getOutputFormat");
+
+ UDFContext context = UDFContext.getUDFContext();
+ Properties property = context.getUDFProperties(ResourceSchema.class);
+ String allSchemaStr = property.getProperty(AVRO_OUTPUT_SCHEMA_PROPERTY);
+ Map<String, String> map = (allSchemaStr != null) ? parseSchemaMap(allSchemaStr) : null;
+
+ String key = getSchemaKey();
+ Schema schema = (map == null || !map.containsKey(key)) ? outputAvroSchema : Schema.parse(map.get(key));
+
+ if (schema == null)
+ throw new IOException("Output schema is null!");
+ AvroStorageLog.details("Output schema=" + schema);
+
+ return new PigAvroOutputFormat(schema);
+ }
+
+ @SuppressWarnings("rawtypes")
+ @Override
+ public void prepareToWrite(RecordWriter writer) throws IOException {
+ this.writer = (PigAvroRecordWriter) writer;
+ }
+
+ @Override
+ public void setStoreFuncUDFContextSignature(String signature) {
+ // Nothing to do
+ }
+
+ @Override
+ public void cleanupOnFailure(String location, Job job) throws IOException {
+ StoreFunc.cleanupOnFailureImpl(location, job);
+ }
+
+ @Override
+ public void putNext(Tuple t) throws IOException {
+ try {
+ this.writer.write(NullWritable.get(), t);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+
+}
Added: pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorageInputStream.java
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorageInputStream.java?rev=1065886&view=auto
==============================================================================
--- pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorageInputStream.java (added)
+++ pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorageInputStream.java Tue Feb 1 01:40:58 2011
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.pig.piggybank.storage.avro;
+
+import java.io.Closeable;
+import java.io.IOException;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.avro.file.SeekableInput;
+
+/** Adapt an {@link FSDataInputStream} to {@link SeekableInput}. */
+public class AvroStorageInputStream implements Closeable, SeekableInput {
+ private final FSDataInputStream stream;
+ private final long len;
+
+ /** Construct given a path and a configuration. */
+ public AvroStorageInputStream(Path path, TaskAttemptContext context) throws IOException {
+ this.stream = path.getFileSystem(context.getConfiguration()).open(path);
+ this.len = path.getFileSystem(context.getConfiguration()).getFileStatus(path).getLen();
+ }
+
+ public long length() {
+ return len;
+ }
+
+ public int read(byte[] b, int off, int len) throws IOException {
+ return stream.read(b, off, len);
+ }
+
+ public void seek(long p) throws IOException {
+ stream.seek(p);
+ }
+
+ public long tell() throws IOException {
+ return stream.getPos();
+ }
+
+ public void close() throws IOException {
+ stream.close();
+ }
+}
Added: pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorageLog.java
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorageLog.java?rev=1065886&view=auto
==============================================================================
--- pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorageLog.java (added)
+++ pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorageLog.java Tue Feb 1 01:40:58 2011
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.pig.piggybank.storage.avro;
+
+/**
+ * Simple logging utils of this package
+ */
+public class AvroStorageLog {
+
+ static private int debugLevel = -1;
+
+ final static public int INFO = 2;
+ final static public int FUNC_CALL = 3;
+ final static public int DETAILS = 5;
+
+ static void setDebugLevel (int l) {
+ debugLevel = l;
+ }
+
+ static void info (String msg) {
+ if (debugLevel >= INFO)
+ System.out.println("INFO:" + msg);
+ }
+
+ static void funcCall (String msg) {
+ if (debugLevel >= FUNC_CALL)
+ System.out.println("DEBUG:" + msg);
+ }
+
+ static void details (String msg) {
+ if (debugLevel >= DETAILS)
+ System.out.println("DEBUG:" + msg);
+ }
+
+ static void debug(int level, String msg) {
+ if (debugLevel >= level)
+ System.out.println("DEBUG:" + msg);
+ }
+
+ static void warn( String msg) {
+ System.out.println("WARNING:" + msg);
+ }
+
+ static void error(String msg) {
+ System.err.println("ERROR:" + msg);
+ }
+
+}
+
Added: pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorageUtils.java
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorageUtils.java?rev=1065886&view=auto
==============================================================================
--- pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorageUtils.java (added)
+++ pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorageUtils.java Tue Feb 1 01:40:58 2011
@@ -0,0 +1,345 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.pig.piggybank.storage.avro;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+import org.apache.avro.Schema;
+import org.apache.avro.Schema.Field;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.pig.ResourceSchema;
+import org.apache.pig.ResourceSchema.ResourceFieldSchema;
+import org.apache.pig.data.DataType;
+
+/**
+ * This is utility class for this package
+ */
+public class AvroStorageUtils {
+
+ public static Schema BooleanSchema = Schema.create(Schema.Type.BOOLEAN);
+ public static Schema LongSchema = Schema.create(Schema.Type.LONG);
+ public static Schema FloatSchema = Schema.create(Schema.Type.FLOAT);
+ public static Schema DoubleSchema = Schema.create(Schema.Type.DOUBLE);
+ public static Schema IntSchema = Schema.create(Schema.Type.INT);
+ public static Schema StringSchema = Schema.create(Schema.Type.STRING);
+ public static Schema BytesSchema = Schema.create(Schema.Type.BYTES);
+ public static Schema NullSchema = Schema.create(Schema.Type.NULL);
+
+ private static final String NONAME = "NONAME";
+ private static final String PIG_TUPLE_WRAPPER = "PIG_WRAPPER";
+
+ /** ignore hdfs files with prefix "_" and "." */
+ public static PathFilter PATH_FILTER = new PathFilter() {
+ @Override
+ public boolean accept(Path path) {
+ return !path.getName().startsWith("_")
+ && !path.getName().startsWith(".");
+ }
+ };
+
+ static String getDummyFieldName(int index) {
+ return NONAME + "_" + index;
+ }
+
+ /** create an avro field using the given schema */
+ public static Field createUDField(int index, Schema s) {
+ return new Field(getDummyFieldName(index), s, null, null);
+ }
+
+ /** create an avro field with null schema (it is a space holder) */
+ public static Schema createUDPartialRecordSchema() {
+ return Schema.createRecord(NONAME, null, null, false);
+ }
+
+ /** check whether a schema is a space holder (using field name) */
+ public static boolean isUDPartialRecordSchema(Schema s) {
+ return s.getName().equals(NONAME);
+ }
+
+ /** get field schema given index number */
+ public static Field getUDField(Schema s, int index) {
+ return s.getField(getDummyFieldName(index));
+ }
+
+ /**
+ * get input paths to job config
+ */
+ public static boolean addInputPaths(String pathString, Job job)
+ throws IOException {
+ Configuration conf = job.getConfiguration();
+ FileSystem fs = FileSystem.get(conf);
+
+ Path path = new Path(pathString);
+ FileStatus pathStatus = fs.getFileStatus(path);
+
+ List<FileStatus> input = new LinkedList<FileStatus>();
+ if (PATH_FILTER.accept(path)) { // remove input path with leading "." or "_"
+ input.add(pathStatus);
+ }
+
+ boolean ret = false;
+ while (!input.isEmpty()) {
+
+ FileStatus status = input.remove(0);
+ Path p = status.getPath();
+
+ if (!status.isDir() ) {
+ AvroStorageLog.details("Add input path:" + p);
+ FileInputFormat.addInputPath(job, p);
+ ret = true;
+ }
+ else {
+ /*list all sub-dirs*/
+ FileStatus[] ss = fs.listStatus(p, PATH_FILTER);
+
+ if (ss.length > 0) {
+ if (noDir(ss) ) {
+ AvroStorageLog.details("Add input path:" + p);
+ FileInputFormat.addInputPath(job, p);
+ ret = true;
+ }
+ else {
+ input.addAll(Arrays.asList(ss));
+ ret = true;
+ }
+
+ }
+ }
+ }
+
+ return ret;
+ }
+
+ /** check whether there is NO directory in the input file (status) list*/
+ public static boolean noDir(FileStatus [] ss) {
+ for (FileStatus s : ss) {
+ if (s.isDir())
+ return false;
+ }
+ return true;
+ }
+
+ /** get last file of a hdfs path if it is a directory;
+ * or return the file itself if path is a file
+ */
+ public static Path getLast(String path, FileSystem fs) throws IOException {
+ return getLast(new Path(path), fs);
+ }
+
+ /** get last file of a hdfs path if it is a directory;
+ * or return the file itself if path is a file
+ */
+ public static Path getLast(Path path, FileSystem fs) throws IOException {
+
+ FileStatus[] statuses = fs.listStatus(path, PATH_FILTER);
+
+ if (statuses.length == 0) {
+ return path;
+ } else {
+ Arrays.sort(statuses);
+ return statuses[statuses.length - 1].getPath();
+ }
+ }
+
+ /**
+ * Wrap an avro schema as a nullable union if needed.
+ * For instance, wrap schema "int" as ["null", "int"]
+ */
+ public static Schema wrapAsUnion(Schema schema, boolean nullable) {
+ if (nullable) {
+ /* if schema is an acceptable union, then return itself */
+ if (schema.getType().equals(Schema.Type.UNION)
+ && isAcceptableUnion(schema))
+ return schema;
+ else
+ return Schema.createUnion(Arrays.asList(NullSchema, schema));
+ } else
+ /*do not wrap it if not */
+ return schema;
+ }
+
+ /** determine whether the input schema contains recursive records */
+ public static boolean containsRecursiveRecord(Schema s) {
+ /*initialize empty set of defined record names*/
+ Set<String> set = new HashSet<String> ();
+ return containsRecursiveRecord(s, set);
+ }
+
+ /**
+ * Called by {@link #containsRecursiveRecord(Schema)} and it recursively checks
+ * whether the input schema contains recursive records.
+ */
+ protected static boolean containsRecursiveRecord(Schema s, Set<String> definedRecordNames) {
+
+ /* if it is a record, check itself and all fields*/
+ if (s.getType().equals(Schema.Type.RECORD)) {
+ String name = s.getName();
+ if (definedRecordNames.contains(name)) return true;
+
+ /* add its own name into defined record set*/
+ definedRecordNames.add(s.getName());
+
+ /* check all fields */
+ List<Field> fields = s.getFields();
+ for (Field field: fields) {
+ Schema fs = field.schema();
+ if (containsRecursiveRecord(fs, definedRecordNames))
+ return true;
+ }
+ return false;
+ }
+
+ /* if it is an array, check its element type */
+ else if (s.getType().equals(Schema.Type.ARRAY)) {
+ Schema fs = s.getElementType();
+ return containsRecursiveRecord(fs, definedRecordNames);
+ }
+
+ /*if it is a map, check its value type */
+ else if (s.getType().equals(Schema.Type.MAP)) {
+ Schema vs = s.getValueType();
+ return containsRecursiveRecord(vs, definedRecordNames);
+ }
+
+ /* if it is a union, check all possible types */
+ else if (s.getType().equals(Schema.Type.UNION)) {
+ List<Schema> types = s.getTypes();
+ for (Schema type: types) {
+ if (containsRecursiveRecord(type, definedRecordNames))
+ return true;
+ }
+ return false;
+ }
+
+ /* return false for other cases */
+ else {
+ return false;
+ }
+ }
+
+ /** determine whether the input schema contains generic unions */
+ public static boolean containsGenericUnion(Schema s) {
+
+ /* if it is a record, check all fields*/
+ if (s.getType().equals(Schema.Type.RECORD)) {
+ List<Field> fields = s.getFields();
+ for (Field field: fields) {
+ Schema fs = field.schema();
+ if (containsGenericUnion(fs))
+ return true;
+ }
+ return false;
+ }
+
+ /* if it is an array, check its element type */
+ else if (s.getType().equals(Schema.Type.ARRAY)) {
+ Schema fs = s.getElementType();
+ return containsGenericUnion(fs) ;
+ }
+
+ /*if it is a map, check its value type */
+ else if (s.getType().equals(Schema.Type.MAP)) {
+ Schema vs = s.getValueType();
+ return containsGenericUnion(vs) ;
+ }
+
+ /* if it is a union, check all possible types and itself */
+ else if (s.getType().equals(Schema.Type.UNION)) {
+ List<Schema> types = s.getTypes();
+ for (Schema type: types) {
+ if (containsGenericUnion(type) )
+ return true;
+ }
+ /* check whether itself is acceptable (null-union) */
+ return ! isAcceptableUnion (s);
+ }
+
+ /* return false for other cases */
+ else {
+ return false;
+ }
+ }
+
+ /** determine whether a union is a nullable union;
+ * note that this function doesn't check containing
+ * types of the input union recursively. */
+ public static boolean isAcceptableUnion(Schema in) {
+ if (! in.getType().equals(Schema.Type.UNION))
+ return false;
+
+ List<Schema> types = in.getTypes();
+ if (types.size() <= 1) {
+ return true;
+ } else if (types.size() > 2) {
+ return false; /*contains more than 2 types */
+ } else {
+ /* one of two types is NULL */
+ return types.get(0).getType().equals(Schema.Type.NULL) || types.get(1) .getType().equals(Schema.Type.NULL);
+ }
+ }
+
+ /** wrap a pig schema as tuple */
+ public static ResourceFieldSchema wrapAsTuple(ResourceFieldSchema subFieldSchema) throws IOException {
+ ResourceSchema listSchema = new ResourceSchema();
+ listSchema.setFields(new ResourceFieldSchema[] { subFieldSchema });
+
+ ResourceFieldSchema tupleWrapper = new ResourceFieldSchema();
+ tupleWrapper.setType(DataType.TUPLE);
+ tupleWrapper.setName(PIG_TUPLE_WRAPPER);
+ tupleWrapper.setSchema(listSchema);
+
+ return tupleWrapper;
+ }
+
+ /** check whether it is just a wrapped tuple */
+ public static boolean isTupleWrapper(ResourceFieldSchema pigSchema) {
+ return pigSchema.getType() == DataType.TUPLE
+ && pigSchema.getName().equals(AvroStorageUtils.PIG_TUPLE_WRAPPER);
+ }
+
+ /** extract schema from a nullable union */
+ public static Schema getAcceptedType(Schema in) {
+ if (!isAcceptableUnion(in))
+ throw new RuntimeException("Cannot call this function on a unacceptable union");
+
+ List<Schema> types = in.getTypes();
+ switch (types.size()) {
+ case 0:
+ return null; /*union with no type*/
+ case 1:
+ return types.get(0); /*union with one type*/
+ case 2:
+ return (types.get(0).getType().equals(Schema.Type.NULL))
+ ? types.get(1)
+ : types.get(0);
+ default:
+ return null;
+ }
+ }
+
+}
Added: pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroDatumReader.java
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroDatumReader.java?rev=1065886&view=auto
==============================================================================
--- pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroDatumReader.java (added)
+++ pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroDatumReader.java Tue Feb 1 01:40:58 2011
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.pig.piggybank.storage.avro;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import org.apache.avro.Schema;
+import org.apache.avro.Schema.Field;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericFixed;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.ResolvingDecoder;
+import org.apache.avro.util.Utf8;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.DefaultTuple;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+
+/**
+ * An avro GenericDatumReader which reads in avro data and
+ * converts them to pig data: tuples, bags, etc.
+ *
+ */
+public class PigAvroDatumReader extends GenericDatumReader<Object> {
+
+ /**
+ * Construct where the writer's and reader's schemas are the same.
+ */
+ public PigAvroDatumReader(Schema schema) {
+ super(schema);
+ }
+
+ /**
+ * Construct given writer's and reader's schema.
+ */
+ public PigAvroDatumReader(Schema writer, Schema reader) throws IOException {
+ super(writer, reader);
+ }
+
+ /**
+ * Called to read a record instance. Overridden to read a pig tuple.
+ */
+ @Override
+ protected Object readRecord(Object old, Schema expected, ResolvingDecoder in) throws IOException {
+
+ /* create an empty tuple */
+ Tuple tuple = (Tuple) newRecord(old, expected);
+
+ for (Field f : in.readFieldOrder()) {
+ tuple.append(read(null, f.schema(), in));
+ }
+
+ return tuple;
+ }
+
+ /**
+ * Called to read a map instance. Overridden to read a pig map.
+ */
+ protected Object readMap(Object old, Schema expected, ResolvingDecoder in) throws IOException {
+ Schema eValue = expected.getValueType();
+ long l = in.readMapStart();
+ Object map = newMap(old, (int) l);
+ if (l > 0) {
+ do {
+ for (int i = 0; i < l; i++) {
+ addToMap(map, readString(null, AvroStorageUtils.StringSchema, in),
+ read(null, eValue, in));
+ }
+ } while ((l = in.mapNext()) > 0);
+ }
+ return map;
+ }
+
+ /**
+ * Called to create an enum value. Overridden to create a pig string.
+ */
+ @Override
+ protected Object createEnum(String symbol, Schema schema) {
+ return symbol;
+ }
+
+ /**
+ * Called by the default implementation of {@link #readArray} to retrieve a
+ * value from a reused instance.
+ */
+ @Override
+ protected Object peekArray(Object array) {
+ return null;
+ }
+
+ /**
+ * Called by the default implementation of {@link #readArray} to add a
+ * value. Overridden to append to pig bag.
+ */
+ @Override
+ protected void addToArray(Object array, long pos, Object e) {
+ if (e instanceof Tuple) {
+ ((DataBag) array).add((Tuple) e);
+ } else {
+ Tuple t = new DefaultTuple();
+ t.append(e);
+ ((DataBag) array).add(t);
+ }
+ }
+
+ /**
+ * Called to read a fixed value. Overridden to read a pig byte array.
+ */
+ @Override
+ protected Object readFixed(Object old, Schema expected, Decoder in) throws IOException {
+ GenericFixed fixed = (GenericFixed) super.readFixed(old, expected, in);
+ DataByteArray byteArray = new DataByteArray(fixed.bytes());
+ return byteArray;
+ }
+
+ /**
+ * Called to create new record instances. Overridden to return a new tuple.
+ */
+ @Override
+ protected Object newRecord(Object old, Schema schema) {
+ return TupleFactory.getInstance().newTuple();
+ }
+
+ /**
+ * Called to create new array instances. Overridden to return a new bag.
+ */
+ @Override
+ protected Object newArray(Object old, int size, Schema schema) {
+ return BagFactory.getInstance().newDefaultBag();
+ }
+
+ /**
+ * Called to read strings. Overridden to return a pig string.
+ */
+ @Override
+ protected Object readString(Object old, Schema expected, Decoder in) throws IOException {
+ Utf8 str = (Utf8) super.readString(old, expected, in);
+ return str.toString();
+ }
+
+ /**
+ * Called to read byte arrays. Overridden to return a pig byte array.
+ */
+ @Override
+ protected Object readBytes(Object old, Decoder in) throws IOException {
+ ByteBuffer buf = (ByteBuffer) super.readBytes(old, in);
+ DataByteArray byteArray = new DataByteArray(buf.array());
+ return byteArray;
+ }
+
+}
Added: pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroDatumWriter.java
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroDatumWriter.java?rev=1065886&view=auto
==============================================================================
--- pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroDatumWriter.java (added)
+++ pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroDatumWriter.java Tue Feb 1 01:40:58 2011
@@ -0,0 +1,394 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.pig.piggybank.storage.avro;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import org.apache.avro.Schema;
+import org.apache.avro.Schema.Field;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.io.Encoder;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.Tuple;
+
+/**
+ * An avro GenericDatumWriter to write pig data as Avro data.
+ *
+ */
+public class PigAvroDatumWriter extends GenericDatumWriter<Object> {
+
+ /**
+ * construct with output schema
+ */
+ public PigAvroDatumWriter(Schema schema) {
+ setSchema(schema);
+ }
+
+ @Override
+ protected void write(Schema schema, Object datum, Encoder out)
+ throws IOException {
+ try {
+
+ /**
+ * In case users want to get rid of Pig tuple wrapper
+ */
+ if (!schema.getType().equals(Schema.Type.RECORD)
+ && !schema.getType().equals(Schema.Type.UNION)
+ && datum instanceof Tuple
+ && unwrappedInstanceOf(schema, datum)) {
+ Tuple t = (Tuple) datum;
+ if (t.size() > 1)
+ throw new IOException("Incompatible schema:" + schema+ " \n for data " + datum);
+ write(schema, t.get(0), out);
+ return;
+ }
+
+ switch (schema.getType()) {
+ case FIXED:
+ writeFixed(schema, datum, out);
+ break;
+ case ENUM:
+ writeEnum(schema, datum, out);
+ break;
+ case STRING:
+ writeString(schema, datum, out);
+ break;
+ case BYTES:
+ writeBytes(datum, out);
+ break;
+ case BOOLEAN:
+ writeBoolean(datum, out);
+ break;
+ case UNION:
+ writeUnion(schema, datum, out);
+ break;
+ case LONG:
+ writeLong(datum, out);
+ break;
+ case FLOAT:
+ writeFloat(datum, out);
+ break;
+ case DOUBLE:
+ writeDouble(datum, out);
+ break;
+ case ARRAY: /*falls through*/
+ case MAP: /*falls through*/
+ case RECORD: /*falls through*/
+ case INT: /*falls through*/
+ case NULL:/*falls through*/
+ default:
+ super.write(schema, datum, out);
+ }
+ } catch (NullPointerException e) {
+ throw npe(e, " of " + schema.getName());
+ }
+ }
+
+ /**
+ * Called to write union.
+ */
+ protected void writeUnion(Schema schema, Object datum, Encoder out)
+ throws IOException {
+ int index = resolveUnion(schema, datum);
+ out.writeIndex(index);
+ write(schema.getTypes().get(index), datum, out);
+ }
+
+ /**
+ * Called to resolve union.
+ */
+ protected int resolveUnion(Schema union, Object datum) throws IOException {
+ int i = 0;
+ for (Schema type : union.getTypes()) {
+ if (type.getType().equals(Schema.Type.UNION))
+ throw new IOException("A union cannot immediately contain other unions.");
+ if (instanceOf(type, datum))
+ return i;
+ i++;
+ }
+ throw new RuntimeException("Datum " + datum + " is not in union " + union);
+ }
+
+ /**
+ * Recursively check whether "datum" is an instance of "schema" and called
+ * by {@link #resolveUnion(Schema,Object)},
+ * {@link #unwrappedInstanceOf(Schema,Object)}.
+ *
+ */
+ protected boolean instanceOf(Schema schema, Object datum)
+ throws IOException {
+
+ try {
+ switch (schema.getType()) {
+ case RECORD:
+ if (datum instanceof Tuple) {
+ Tuple tuple = (Tuple) datum;
+ List<Field> fields = schema.getFields();
+ if (fields.size() != tuple.size()) {
+ return false;
+ }
+ for (int i = 0; i < fields.size(); i++) {
+ if (!instanceOf(fields.get(i).schema(), tuple.get(i)))
+ return false;
+ }
+ return true;
+ }
+ return false;
+
+ case UNION:
+ @SuppressWarnings("unused")
+ int index = resolveUnion(schema, datum);
+ return true;
+ case ENUM:
+ return datum instanceof String && schema.hasEnumSymbol(((String) datum))
+ || unwrappedInstanceOf(schema, datum);
+ case ARRAY:
+ return datum instanceof DataBag
+ || unwrappedInstanceOf(schema, datum);
+ case MAP:
+ return datum instanceof Map
+ || unwrappedInstanceOf(schema, datum);
+ case FIXED:
+ return datum instanceof DataByteArray && ((DataByteArray) datum).size() == schema.getFixedSize()
+ || unwrappedInstanceOf(schema, datum);
+ case STRING:
+ return datum instanceof String
+ || unwrappedInstanceOf(schema, datum);
+ case BYTES:
+ return datum instanceof DataByteArray
+ || unwrappedInstanceOf(schema, datum);
+ case INT:
+ return datum instanceof Integer
+ || unwrappedInstanceOf(schema, datum);
+ case LONG:
+ return datum instanceof Long
+ || datum instanceof Integer
+ || unwrappedInstanceOf(schema, datum);
+ case FLOAT:
+ return datum instanceof Float
+ || datum instanceof Integer
+ || datum instanceof Long
+ || unwrappedInstanceOf(schema, datum);
+ case DOUBLE:
+ return datum instanceof Double
+ || datum instanceof Float
+ || datum instanceof Integer
+ || datum instanceof Long
+ || unwrappedInstanceOf(schema, datum);
+ case BOOLEAN:
+ return datum instanceof Boolean
+ || datum instanceof Integer
+ || unwrappedInstanceOf(schema, datum);
+ case NULL:
+ return datum == null;
+ default:
+ throw new RuntimeException("Unexpected type: " + schema);
+ }
+ } catch (ExecException e) {
+ e.printStackTrace(System.err);
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * Check whether "datum" is an instance of "schema" after stripping
+ * the tuple wrapper.
+ */
+ private boolean unwrappedInstanceOf(Schema schema, Object datum)
+ throws IOException {
+ try {
+ if (!(datum instanceof Tuple))
+ return false;
+
+ Tuple tuple = (Tuple) datum;
+ if (tuple.size() != 1)
+ return false;
+
+ switch (schema.getType()) {
+ case ENUM:
+ case ARRAY:
+ case MAP:
+ case FIXED:
+ case STRING:
+ case BYTES:
+ case INT:
+ case LONG:
+ case FLOAT:
+ case DOUBLE:
+ case BOOLEAN:
+ return instanceOf(schema, tuple.get(0));
+ default:
+ throw new IOException("Invalid type:" + schema.getType());
+ }
+ } catch (Exception e) {
+ e.printStackTrace(System.err);
+ throw new IOException(e);
+ }
+ }
+
+ /**
+ * Write double. Users can cast long, float and integer to double.
+ *
+ */
+ protected void writeDouble(Object datum, Encoder out) throws IOException {
+ double num;
+ if (datum instanceof Integer) {
+ num = ((Integer) datum).doubleValue();
+ } else if (datum instanceof Long) {
+ num = ((Long) datum).doubleValue();
+ } else if (datum instanceof Float) {
+ num = ((Float) datum).doubleValue();
+ } else if (datum instanceof Double) {
+ num = (Double) datum;
+ } else
+ throw new IOException("Cannot convert to double:" + datum.getClass());
+
+ out.writeDouble(num);
+ }
+
+ /**
+ * Write float. Users can cast long and integer into float.
+ *
+ */
+ protected void writeFloat(Object datum, Encoder out) throws IOException {
+ float num;
+ if (datum instanceof Integer) {
+ num = ((Integer) datum).floatValue();
+ } else if (datum instanceof Long) {
+ num = ((Long) datum).floatValue();
+ } else if (datum instanceof Float) {
+ num = (Float) datum;
+ } else
+ throw new IOException("Cannot convert to float:" + datum.getClass());
+
+ out.writeFloat(num);
+
+ }
+
+ /**
+ * Write long. Users can cast integer into long.
+ *
+ */
+ protected void writeLong(Object datum, Encoder out) throws IOException {
+ long num;
+ if (datum instanceof Integer) {
+ num = ((Integer) datum).longValue();
+ } else if (datum instanceof Long) {
+ num = (Long) datum;
+ } else
+ throw new IOException("Cannot convert to long:" + datum.getClass());
+
+ out.writeLong(num);
+ }
+
+ /**
+ * Write boolean. Users can cast an integer into boolean.
+ *
+ */
+ protected void writeBoolean(Object datum, Encoder out) throws IOException {
+
+ if (datum instanceof Boolean) {
+ out.writeBoolean((Boolean) datum);
+ } else if (datum instanceof Integer) {
+ out.writeBoolean(((Integer) datum) != 0);
+ } else
+ throw new RuntimeException("Unsupported type boolean:" + datum.getClass());
+
+ }
+
+ private NullPointerException npe(NullPointerException e, String s) {
+ NullPointerException result = new NullPointerException(e.getMessage()
+ + s);
+ result.initCause(e.getCause() == null ? e : e.getCause());
+ return result;
+ }
+
+ /**
+ * Called to write a bytes.
+ */
+ @Override
+ protected void writeBytes(Object datum, org.apache.avro.io.Encoder out)
+ throws IOException {
+ if (datum instanceof DataByteArray) {
+ out.writeBytes(((DataByteArray) datum).get());
+ } else
+ throw new RuntimeException("Unsupported type bytes:" + datum.getClass());
+ }
+
+ /**
+ * Called to write a fixed value.
+ */
+ @Override
+ protected void writeFixed(Schema schema, Object datum,
+ org.apache.avro.io.Encoder out)
+ throws IOException {
+ if (datum instanceof DataByteArray) {
+ final byte[] bytes = ((DataByteArray) datum).get();
+ out.writeFixed(bytes, 0, bytes.length);
+ } else
+ throw new RuntimeException("Unsupported type fixed:" + datum.getClass());
+
+ }
+
+ /**
+ * Called by the implementation of {@link #writeRecord} to retrieve
+ * a record field value.
+ */
+ @Override
+ protected Object getField(Object record, String name, int pos) {
+ if (record instanceof Tuple) {
+ try {
+ return ((Tuple) record).get(pos);
+ } catch (ExecException e) {
+ e.printStackTrace();
+ throw new RuntimeException(e);
+ }
+ } else
+ throw new RuntimeException("Unsupported type in record:" + record.getClass());
+ }
+
+ /**
+ * Called by the implementation of {@link #writeArray} to get the
+ * size of an array.
+ */
+ @Override
+ protected long getArraySize(Object array) {
+ if (array instanceof DataBag) {
+ return ((DataBag) array).size();
+ } else
+ throw new RuntimeException("Unsupported type in array:" + array.getClass());
+ }
+
+ /**
+ * Called by the implementation of {@link #writeArray} to enumerate
+ * array elements.
+ */
+ @Override
+ protected Iterator<? extends Object> getArrayElements(Object array) {
+ if (array instanceof DataBag) {
+ return ((DataBag) array).iterator();
+ } else
+ throw new RuntimeException("Unsupported type in array:" + array.getClass());
+ }
+
+
+}
Added: pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroInputFormat.java
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroInputFormat.java?rev=1065886&view=auto
==============================================================================
--- pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroInputFormat.java (added)
+++ pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroInputFormat.java Tue Feb 1 01:40:58 2011
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.pig.piggybank.storage.avro;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.avro.Schema;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+
+/**
+ * The InputFormat for avro data.
+ *
+ */
+public class PigAvroInputFormat extends FileInputFormat<NullWritable, Writable> {
+
+ private Schema schema = null; /* avro schema */
+
+ /**
+ * empty constructor
+ */
+ public PigAvroInputFormat() {
+ }
+
+ /**
+ * constructor called by AvroStorage to pass in schema
+ * @param s input data schema
+ */
+ public PigAvroInputFormat(Schema s) {
+ schema = s;
+ }
+
+ /**
+ * Ignore files not ending with ".avro"
+ */
+ @Override
+ protected List<FileStatus> listStatus(JobContext context) throws IOException {
+ List<FileStatus> result = new ArrayList<FileStatus>();
+ for (FileStatus file : super.listStatus(context))
+ if (file.getPath().getName().endsWith(PigAvroOutputFormat.EXT))
+ result.add(file);
+ return result;
+ }
+
+ /**
+ * Create and return an avro record reader.
+ * It uses the input schema passed in to the
+ * constructor.
+ */
+ @Override
+ public RecordReader<NullWritable, Writable>
+ createRecordReader(InputSplit split, TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ context.setStatus(split.toString());
+ return new PigAvroRecordReader(context, (FileSplit) split, schema);
+ }
+
+}
Added: pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroOutputFormat.java
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroOutputFormat.java?rev=1065886&view=auto
==============================================================================
--- pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroOutputFormat.java (added)
+++ pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroOutputFormat.java Tue Feb 1 01:40:58 2011
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.pig.piggybank.storage.avro;
+
+import java.io.IOException;
+import org.apache.avro.Schema;
+import org.apache.avro.file.CodecFactory;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+
+/**
+ * The OutputFormat for avro data.
+ *
+ */
+public class PigAvroOutputFormat extends FileOutputFormat<NullWritable, Object> {
+
+ /** The file name extension for avro data files. */
+ public final static String EXT = ".avro";
+
+ /** The configuration key for Avro deflate level. */
+ public static final String DEFLATE_LEVEL_KEY = "avro.mapred.deflate.level";
+
+ /** The default deflate level. */
+ public static final int DEFAULT_DEFLATE_LEVEL = 1;
+
+ /* avro schema of output data */
+ private Schema schema = null;
+
+ /**
+ * default constructor
+ */
+ public PigAvroOutputFormat() {
+ }
+
+ /**
+ * construct with specified output schema
+ * @param s output schema
+ */
+ public PigAvroOutputFormat(Schema s) {
+ schema = s;
+ }
+
+ /**
+ * Enable output compression using the deflate codec and
+ * specify its level.
+ */
+ public static void setDeflateLevel(Job job, int level) {
+ FileOutputFormat.setCompressOutput(job, true);
+ job.getConfiguration().setInt(DEFLATE_LEVEL_KEY, level);
+
+ }
+
+ @Override
+ public RecordWriter<NullWritable, Object> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException {
+
+ if (schema == null)
+ throw new IOException("Must provide a schema");
+
+ Configuration conf = context.getConfiguration();
+
+ DataFileWriter<Object> writer = new DataFileWriter<Object>(new PigAvroDatumWriter(schema));
+
+ if (FileOutputFormat.getCompressOutput(context)) {
+ int level = context.getConfiguration().getInt(DEFLATE_LEVEL_KEY, DEFAULT_DEFLATE_LEVEL);
+ writer.setCodec(CodecFactory.deflateCodec(level));
+ }
+
+ Path path = getDefaultWorkFile(context, EXT);
+ writer.create(schema, path.getFileSystem(conf).create(path));
+ return new PigAvroRecordWriter(writer);
+ }
+
+}