You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ch...@apache.org on 2013/03/02 00:58:49 UTC

svn commit: r1451783 - in /pig/trunk: ./ contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/ contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/

Author: cheolsoo
Date: Fri Mar  1 23:58:48 2013
New Revision: 1451783

URL: http://svn.apache.org/r1451783
Log:
PIG-3142: [piggybank] Fixed-width load and store functions for the Piggybank (jpacker via cheolsoo)

Added:
    pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/FixedWidthLoader.java
    pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/FixedWidthStorer.java
    pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestFixedWidthLoader.java
    pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestFixedWidthStorer.java
Modified:
    pig/trunk/CHANGES.txt

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1451783&r1=1451782&r2=1451783&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Fri Mar  1 23:58:48 2013
@@ -28,6 +28,8 @@ PIG-3174:  Remove rpm and deb artifacts 
 
 IMPROVEMENTS
 
+PIG-3142: [piggybank] Fixed-width load and store functions for the Piggybank (jpacker via cheolsoo)
+
 PIG-3162: PigTest.assertOutput doesn't allow non-default delimiter (dreambird via cheolsoo)
 
 PIG-3002: Pig client should handle CountersExceededException (jarcec via billgraham)

Added: pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/FixedWidthLoader.java
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/FixedWidthLoader.java?rev=1451783&view=auto
==============================================================================
--- pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/FixedWidthLoader.java (added)
+++ pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/FixedWidthLoader.java Fri Mar  1 23:58:48 2013
@@ -0,0 +1,454 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.piggybank.storage;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+
+import org.apache.pig.Expression;
+import org.apache.pig.LoadFunc;
+import org.apache.pig.LoadMetadata;
+import org.apache.pig.LoadPushDown;
+import org.apache.pig.PigWarning;
+import org.apache.pig.ResourceSchema;
+import org.apache.pig.ResourceSchema.ResourceFieldSchema;
+import org.apache.pig.ResourceStatistics;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.util.ObjectSerializer;
+import org.apache.pig.impl.util.UDFContext;
+import org.apache.pig.impl.util.Utils;
+import org.apache.pig.parser.ParserException;
+
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+
+/**
+ * A fixed-width file loader. 
+ * 
+ * Takes a string argument specifying the ranges of each column in a unix 'cut'-like format.
+ * Ex: '-5, 10-12, 14, 20-'
+ * Ranges are comma-separated, 1-indexed (for ease of use with 1-indexed text editors), and inclusive.
+ * A single-column field at position n may be specified as either 'n-n' or simply 'n'.
+ *
+ * A second optional argument specifies whether to skip the first row of the input file,
+ * assuming it to be a header. As Pig may combine multiple input files each with their own header
+ * into a single split, FixedWidthLoader makes sure to skip any duplicate headers as will.
+ * 'SKIP_HEADER' skips the row; anything else and the default behavior ('USE_HEADER') is not to skip it.
+ *
+ * A third optional argument specifies a Pig schema to load the data with. Automatically
+ * trims whitespace from numeric fields. Note that if fewer fields are specified in the
+ * schema than are specified in the column spec, only the fields in the schema will
+ * be used.
+ *
+ * Warning: fields loaded as char/byte arrays will trim all leading and trailing whitespace
+ * from the field value as it is indistiguishable from the spaces that separate different fields.
+ *
+ * All datetimes are converted to UTC when loaded.
+ *
+ * Column spec idea and syntax parser borrowed from Russ Lankenau's implementation
+ * at https://github.com/rlankenau/fixed-width-pig-loader 
+ */
+public class FixedWidthLoader extends LoadFunc implements LoadMetadata, LoadPushDown {
+    
+    public static class FixedWidthField {
+        int start, end;
+
+        FixedWidthField(int start, int end) {
+            this.start = start;
+            this.end = end;
+        }
+    }
+
+    private TupleFactory tupleFactory = TupleFactory.getInstance();
+
+    private RecordReader reader = null;
+    
+    private ArrayList<FixedWidthField> columns;
+
+    private ResourceSchema schema = null;
+    private ResourceFieldSchema[] fields;
+
+    private boolean loadingFirstRecord = true;
+    private boolean skipHeader = false;
+    private String header = null;
+    private int splitIndex;
+
+    private boolean[] requiredFields = null;
+    private int numRequiredFields;
+
+    private String udfContextSignature = null;
+    private static final String SCHEMA_SIGNATURE = "pig.fixedwidthloader.schema";
+    private static final String REQUIRED_FIELDS_SIGNATURE = "pig.fixedwidthloader.required_fields";
+    private static final Log log = LogFactory.getLog(FixedWidthLoader.class);
+
+    /*
+     * Constructors and helper methods
+     */
+
+    public FixedWidthLoader() {
+        throw new IllegalArgumentException(
+            "Usage: org.apache.pig.piggybank.storage.FixedWidthLoader(" +
+            "'<column spec>'[, { 'USE_HEADER' | 'SKIP_HEADER' }[, '<schema>']]" +
+            ")"
+        );
+    }
+
+    public FixedWidthLoader(String columnSpec) {
+        try {
+            columns = parseColumnSpec(columnSpec);
+            String schemaStr = generateDefaultSchemaString();
+            schema = new ResourceSchema(Utils.getSchemaFromString(schemaStr));
+            fields = schema.getFields();
+        } catch (ParserException e) {
+            throw new IllegalArgumentException("Invalid schema format: " + e.getMessage());
+        }
+    }
+
+    public FixedWidthLoader(String columnSpec, String skipHeaderStr) {
+        this(columnSpec);
+        if (skipHeaderStr.equalsIgnoreCase("SKIP_HEADER"))
+            skipHeader = true;
+    }
+
+    public FixedWidthLoader(String columnSpec, String skipHeaderStr, String schemaStr) {
+        try {
+            columns = parseColumnSpec(columnSpec);
+            schemaStr = schemaStr.replaceAll("[\\s\\r\\n]", "");
+            schema = new ResourceSchema(Utils.getSchemaFromString(schemaStr));
+            fields = schema.getFields();
+
+            for (int i = 0; i < fields.length; i++) {
+                byte fieldType = fields[i].getType();
+                if (fieldType == DataType.MAP || fieldType == DataType.TUPLE || fieldType == DataType.BAG) {
+                    throw new IllegalArgumentException(
+                        "Field \"" + fields[i].getName() + "\" is an object type (map, tuple, or bag). " + 
+                        "Object types are not supported by FixedWidthLoader."
+                    );
+                }
+            }
+
+            if (fields.length < columns.size())
+                warn("More columns specified in column spec than fields specified in schema. Only loading fields specified in schema.",
+                     PigWarning.UDF_WARNING_2);
+            else if (fields.length > columns.size())
+                throw new IllegalArgumentException("More fields specified in schema than columns specified in column spec.");
+        } catch (ParserException e) {
+            throw new IllegalArgumentException("Invalid schema format: " + e.getMessage());
+        }
+
+        if (skipHeaderStr.equalsIgnoreCase("SKIP_HEADER"))
+            skipHeader = true;
+    }
+
+    public static ArrayList<FixedWidthField> parseColumnSpec(String spec) {
+        ArrayList<FixedWidthField> columns = new ArrayList<FixedWidthField>();
+        String[] ranges = spec.split(",");
+
+        for (String range : ranges) {
+
+            // Ranges are 1-indexed and inclusive-inclusive [] in spec,
+            // but we convert to 0-indexing and inclusive-exclusive [) internally
+            
+            if (range.indexOf("-") != -1) {
+                int start, end;
+
+                String offsets[] = range.split("-", 2);
+                offsets[0] = offsets[0].trim();
+                offsets[1] = offsets[1].trim();
+                
+                if (offsets[0].equals(""))
+                    start = 0;
+                else
+                    start = Integer.parseInt(offsets[0]) - 1;
+
+                if (offsets[1].equals(""))
+                    end = Integer.MAX_VALUE;
+                else
+                    end = Integer.parseInt(offsets[1]);
+
+                if (start + 1 < 1)
+                    throw new IllegalArgumentException("Illegal column spec '" + range + "': start value must be at least 1");
+                if (start + 1 > end)
+                    throw new IllegalArgumentException("Illegal column spec '" + range + "': start value must be less than end value"); 
+
+                columns.add(new FixedWidthField(start, end));
+            } else {
+                int offset = Integer.parseInt(range.trim()) - 1;
+                columns.add(new FixedWidthField(offset, offset + 1));
+            }
+        }
+
+        return columns;
+    }
+
+    private String generateDefaultSchemaString() {
+        StringBuilder sb = new StringBuilder();
+        for (int i = 0; i < columns.size(); i++) {
+            sb.append((i == 0? "" : ", ") + "f" + i + ": bytearray");
+        }
+        return sb.toString();
+    }
+
+    /*
+     * Methods called on the frontend
+     */
+
+    @Override
+    public InputFormat getInputFormat() throws IOException {
+        return new TextInputFormat();
+    }
+
+    @Override
+    public void setLocation(String location, Job job) throws IOException {
+        FileInputFormat.setInputPaths(job, location);
+    }
+
+    @Override
+    public void setUDFContextSignature( String signature ) {
+        udfContextSignature = signature;
+    }
+
+    public ResourceSchema getSchema(String location, Job job)
+            throws IOException {
+
+        if (schema != null) {
+            // Send schema to backend
+            // Schema should have been passed as an argument (-> constructor)
+            // or provided in the default constructor
+
+            UDFContext udfc = UDFContext.getUDFContext();
+            Properties p = udfc.getUDFProperties(this.getClass(), new String[]{ udfContextSignature });
+            p.setProperty(SCHEMA_SIGNATURE, schema.toString());
+
+            return schema;
+        } else {
+            // Should never get here
+            throw new IllegalArgumentException(
+                "No schema found: default schema was never created and no user-specified schema was found."
+            );
+        }
+    }
+
+    /*
+     * Methods called on the backend
+     */
+
+    @Override
+    public void prepareToRead(RecordReader reader, PigSplit split) throws IOException {
+        // Save reader to use in getNext()
+        this.reader = reader;
+
+        splitIndex = split.getSplitIndex();
+
+        // Get schema from front-end
+        UDFContext udfc = UDFContext.getUDFContext();
+        Properties p = udfc.getUDFProperties(this.getClass(), new String[] { udfContextSignature });
+
+        String strSchema = p.getProperty(SCHEMA_SIGNATURE);
+        if (strSchema == null) {
+            throw new IOException("Could not find schema in UDF context");
+        }
+        schema = new ResourceSchema(Utils.getSchemaFromString(strSchema));
+
+        requiredFields = (boolean[]) ObjectSerializer.deserialize(p.getProperty(REQUIRED_FIELDS_SIGNATURE));
+        if (requiredFields != null) {
+            numRequiredFields = 0;
+            for (int i = 0; i < requiredFields.length; i++) {
+                if (requiredFields[i])
+                    numRequiredFields++;
+            }
+        }
+    }
+    
+    @Override
+    public Tuple getNext() throws IOException {
+        if (loadingFirstRecord && skipHeader && splitIndex == 0) {
+            try {
+                if (!reader.nextKeyValue()) 
+                    return null;
+                header = ((Text) reader.getCurrentValue()).toString();
+            } catch (Exception e) {
+                throw new IOException(e);
+            }
+        }
+        loadingFirstRecord = false;
+
+        String line;
+        try {
+            if (!reader.nextKeyValue()) return null;
+            line = ((Text) reader.getCurrentValue()).toString();
+
+            // if the line is a duplicate header and 'SKIP_HEADER' is set, ignore it
+            // (this might happen if multiple files each with a header are combined into a single split)
+            if (line.equals(header)) {
+                if (!reader.nextKeyValue()) return null;
+                line = ((Text) reader.getCurrentValue()).toString();
+            }
+        } catch (Exception e) {
+            throw new IOException(e);
+        }
+
+        Tuple t;
+        if (requiredFields != null) {
+            t = tupleFactory.newTuple(numRequiredFields);
+            int count = 0;
+            for (int i = 0; i < fields.length; i++) {
+                if (requiredFields[i]) {
+                    try {
+                        t.set(count, readField(line, fields[i], columns.get(i)));
+                    } catch (Exception e) {
+                        warn("Exception when parsing field \"" + fields[i].getName() + "\" " +
+                             "in record " + line.toString() + ": " + e.toString(),
+                             PigWarning.UDF_WARNING_1);
+                    }
+                    count++;
+                }
+            }
+        } else {
+            t = tupleFactory.newTuple(fields.length);
+            for (int i = 0; i < fields.length; i++) {
+                try {
+                    t.set(i, readField(line, fields[i], columns.get(i)));
+                } catch (Exception e) {
+                    warn("Exception when parsing field \"" + fields[i].getName() + "\" " +
+                         "in record " + line.toString() + ": " + e.toString(),
+                         PigWarning.UDF_WARNING_1);
+                }
+            }
+        }
+
+        return t;
+    }
+
+    private Object readField(String line, ResourceFieldSchema field, FixedWidthField column) 
+                             throws IOException, IllegalArgumentException {
+
+        int start = column.start;
+        int end = Math.min(column.end, line.length());
+
+        if (start > line.length())
+            return null;
+
+        if (end <= start)
+            return null;
+
+        String s  = line.substring(start, end);
+        String sTrim = s.trim();
+
+        switch (field.getType()) {
+            case DataType.UNKNOWN:
+            case DataType.BYTEARRAY:
+            case DataType.CHARARRAY:
+                if (s.trim().length() == 0)
+                    return null;
+                return s.trim();
+
+            case DataType.BOOLEAN:
+                return Boolean.parseBoolean(sTrim);
+
+            case DataType.INTEGER:
+                return Integer.parseInt(sTrim);
+
+            case DataType.LONG:
+                return Long.parseLong(sTrim);
+
+            case DataType.FLOAT:
+                return Float.parseFloat(sTrim);
+            
+            case DataType.DOUBLE:
+                return Double.parseDouble(sTrim);
+
+            case DataType.DATETIME:
+                return (new DateTime(sTrim)).toDateTime(DateTimeZone.UTC);
+
+            case DataType.MAP:
+            case DataType.TUPLE:
+            case DataType.BAG:
+                throw new IllegalArgumentException("Object types (map, tuple, bag) are not supported by FixedWidthLoader");
+            
+            default:
+                throw new IllegalArgumentException(
+                    "Unknown type in input schema: " + field.getType());
+        }
+    }
+
+    @Override
+    public RequiredFieldResponse pushProjection(RequiredFieldList requiredFieldList) throws FrontendException {
+        if (requiredFieldList == null)
+            return null;
+
+        if (fields != null && requiredFieldList.getFields() != null)
+        {
+            requiredFields = new boolean[fields.length];
+
+            for (RequiredField f : requiredFieldList.getFields()) {
+                requiredFields[f.getIndex()] = true;
+            }
+
+            UDFContext udfc = UDFContext.getUDFContext();
+            Properties p = udfc.getUDFProperties(this.getClass(), new String[]{ udfContextSignature });
+            try {
+                p.setProperty(REQUIRED_FIELDS_SIGNATURE, ObjectSerializer.serialize(requiredFields));
+            } catch (Exception e) {
+                throw new RuntimeException("Cannot serialize requiredFields for pushProjection");
+            }
+        }
+
+        return new RequiredFieldResponse(true);
+    }
+
+    @Override
+    public List<OperatorSet> getFeatures() {
+        return Arrays.asList(LoadPushDown.OperatorSet.PROJECTION);
+    }
+
+    public ResourceStatistics getStatistics(String location, Job job)
+            throws IOException {
+        // Not implemented
+        return null;
+    }
+
+    public String[] getPartitionKeys(String location, Job job)
+            throws IOException {
+        // Not implemented
+        return null;
+    }
+
+    public void setPartitionFilter(Expression partitionFilter)
+            throws IOException {
+        // Not implemented
+    }
+}
\ No newline at end of file

Added: pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/FixedWidthStorer.java
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/FixedWidthStorer.java?rev=1451783&view=auto
==============================================================================
--- pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/FixedWidthStorer.java (added)
+++ pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/FixedWidthStorer.java Fri Mar  1 23:58:48 2013
@@ -0,0 +1,309 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.piggybank.storage;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Properties;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+
+import org.apache.pig.Expression;
+import org.apache.pig.LoadFunc;
+import org.apache.pig.LoadMetadata;
+import org.apache.pig.PigWarning;
+import org.apache.pig.ResourceSchema;
+import org.apache.pig.ResourceSchema.ResourceFieldSchema;
+import org.apache.pig.ResourceStatistics;
+import org.apache.pig.StoreFunc;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.util.UDFContext;
+import org.apache.pig.impl.util.Utils;
+import org.apache.pig.parser.ParserException;
+
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+
+/**
+ * Stores Pig records in a fixed-width file format. 
+ * 
+ * Takes a string argument specifying the ranges of each column in a unix 'cut'-like format.
+ * Ex: '-5, 10-12, 14, 20-'
+ * Ranges are comma-separated, 1-indexed (for ease of use with 1-indexed text editors), and inclusive.
+ * A single-column field at position n may be specified as either 'n-n' or simply 'n'.
+ *
+ * A second optional argument specifies whether to write a header record
+ * with the names of each field. 'WRITE_HEADER' writes a header record;
+ * 'NO_HEADER' and the default does not write one.
+ *
+ * All datetimes are stored in UTC.
+ *
+ * Column spec idea and syntax parser borrowed from Russ Lankenau's FixedWidthLoader implementation
+ * at https://github.com/rlankenau/fixed-width-pig-loader 
+ */
+public class FixedWidthStorer extends StoreFunc {
+
+    private TupleFactory tupleFactory = TupleFactory.getInstance();
+
+    private RecordWriter writer = null;
+    
+    private ArrayList<FixedWidthLoader.FixedWidthField> columns;
+
+    private ResourceSchema schema = null;
+    private ResourceFieldSchema[] fields;
+
+    private boolean writingFirstRecord = true;
+    private boolean writeHeader = false;
+
+    private String udfContextSignature = null;
+    private static final String SCHEMA_SIGNATURE = "pig.fixedwidthloader.schema";
+    private static final Log log = LogFactory.getLog(FixedWidthStorer.class);
+
+    /*
+     * Constructors and contructor helper methods
+     */
+
+    public FixedWidthStorer() {
+        throw new IllegalArgumentException(
+            "Usage: org.apache.pig.piggybank.storage.FixedWidthStorer(" +
+            "'<column spec>'[, { 'WRITE_HEADER' | 'NO_HEADER' }]" +
+            ")"
+        );
+    }
+
+    public FixedWidthStorer(String columnSpec) {
+        columns = FixedWidthLoader.parseColumnSpec(columnSpec);
+    }
+
+    public FixedWidthStorer(String columnSpec, String headerStr) {
+        this(columnSpec);
+
+        if (headerStr.equalsIgnoreCase("WRITE_HEADER"))
+            writeHeader = true;
+    }
+
+    /*
+     * Methods called on the frontend
+     */
+
+    @Override
+    public OutputFormat getOutputFormat() throws IOException {
+        // Key is unused, Text is where the data is stored in
+        return new TextOutputFormat<LongWritable, Text>();
+    }
+
+    @Override
+    public void setStoreLocation(String location, Job job) throws IOException {
+        FileOutputFormat.setOutputPath(job, new Path(location));
+    }
+
+    @Override
+    public void setStoreFuncUDFContextSignature(String signature) {
+        udfContextSignature = signature;
+    }
+
+    @Override
+    public void checkSchema(ResourceSchema s) throws IOException {
+        // Not actually checking schema
+        // Just storing it to use in the backend
+        
+        UDFContext udfc = UDFContext.getUDFContext();
+        Properties p =
+            udfc.getUDFProperties(this.getClass(), new String[]{ udfContextSignature });
+        p.setProperty(SCHEMA_SIGNATURE, s.toString());
+    }
+
+    /*
+     * Methods called on the backend
+     */
+
+    @Override
+    public void prepareToWrite(RecordWriter writer) throws IOException {
+        // Store writer to use in putNext()
+        this.writer = writer;
+
+        // Get the schema string from the UDFContext object.
+        UDFContext udfc = UDFContext.getUDFContext();
+        Properties p = udfc.getUDFProperties(this.getClass(), new String[]{ udfContextSignature });
+        String strSchema = p.getProperty(SCHEMA_SIGNATURE);
+        if (strSchema == null) {
+            throw new IOException("Could not find schema in UDF context");
+        }
+
+        schema = new ResourceSchema(Utils.getSchemaFromString(strSchema));
+        fields = schema.getFields();
+    }
+    
+    @Override
+    @SuppressWarnings("unchecked")
+    public void putNext(Tuple t) throws IOException {
+        
+        // Write header row if this is the first record
+
+        StringBuilder sb = new StringBuilder();
+        FixedWidthLoader.FixedWidthField column;
+        int offset = 0;
+
+        if (writingFirstRecord && writeHeader) {
+            for (int i = 0; i < fields.length; i++) {
+                column = columns.get(i);
+                sb.append(writeFieldAsString(fields[i], column, offset, fields[i].getName()));
+                offset = column.end;
+            }
+
+            try {
+                writer.write(null, new Text(sb.toString()));
+            } catch (InterruptedException ie) {
+                throw new IOException(ie);
+            }
+        }
+        writingFirstRecord = false;
+
+        sb = new StringBuilder();
+        offset = 0;
+        for (int i = 0; i < fields.length; i++) {
+            column = columns.get(i);
+            sb.append(writeFieldAsString(fields[i], column, offset, t.get(i)));
+            offset = column.end;
+        }
+
+        try {
+            writer.write(null, new Text(sb.toString()));
+        } catch (InterruptedException ie) {
+            throw new IOException(ie);
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    private String writeFieldAsString(ResourceFieldSchema field,
+                                      FixedWidthLoader.FixedWidthField column,
+                                      int offset,
+                                      Object d) throws IOException {
+
+        StringBuilder sb = new StringBuilder();
+
+        if (offset < column.start) {
+            int spaces = column.start - offset;
+            for (int i = 0; i < spaces; i++) {
+                sb.append(' ');
+            }
+        }
+
+        int width = column.end - column.start;
+        String fieldStr = null;
+        if (d != null) {
+            if (DataType.findType(d) == DataType.DATETIME)
+                fieldStr = ((DateTime) d).toDateTime(DateTimeZone.UTC).toString();
+            else
+                fieldStr = d.toString();
+        }
+
+        // write nulls as spaces
+        if (fieldStr == null) {
+            for (int i = 0; i < width; i++) {
+                sb.append(' ');
+            }
+            return sb.toString();
+        }
+
+        // If the field is too big to fit in column
+        if (fieldStr.length() > width) {
+            // If it is float or double, try to round it to fit
+            byte fieldType = field.getType();
+            if (fieldType == DataType.FLOAT || fieldType == DataType.DOUBLE) {
+                double doubleVal = ((Number) d).doubleValue();
+                int numDigitsLeftOfDecimal = (int) Math.ceil(Math.log10(Math.abs(doubleVal)));
+
+                // Field can be rounded to fit
+                if (numDigitsLeftOfDecimal <= width + 2) {
+                    int numDigitsRightOfDecimal = width - numDigitsLeftOfDecimal - 1; // should be at least 1
+                    String truncated = String.format("%." + numDigitsRightOfDecimal + "f", doubleVal);
+
+                    warn("Cannot fit " + fieldStr + " in field starting at column " + 
+                         column.start + " and ending at column " + (column.end - 1) + ". " +
+                         "Since the field is a decimal type, truncating it to " + truncated + " " +
+                         "to fit in the column.",
+                         PigWarning.UDF_WARNING_1);
+                    sb.append(truncated);
+                } else {
+                    // Field is float or double but cannot be rounded to fit
+                    warn("Cannot fit " + fieldStr + " in field starting at column " + 
+                         column.start + " and ending at column " + (column.end - 1) + ". " +
+                         "Writing null (all spaces) instead.",
+                         PigWarning.UDF_WARNING_2);
+                    for (int i = 0; i < width; i++) {
+                        sb.append(' ');
+                    }
+                }
+            } else {
+                warn("Cannot fit " + fieldStr + " in field starting at column " + 
+                      column.start + " and ending at column " + (column.end - 1) + ". " +
+                      "Writing null (all spaces) instead.",
+                      PigWarning.UDF_WARNING_2);
+                for (int i = 0; i < width; i++) {
+                    sb.append(' ');
+                }
+            }
+        } else {
+            // Field can fit. Right-justify it.
+            int spaces = width - fieldStr.length();
+            for (int i = 0; i < spaces; i++) {
+                sb.append(' ');
+            }
+            sb.append(fieldStr);
+        }
+
+        return sb.toString();
+    }
+
+    public ResourceStatistics getStatistics(String location, Job job)
+            throws IOException {
+        // Not implemented
+        return null;
+    }
+
+    public void storeStatistics(ResourceStatistics stats, String location, Job job) 
+                                throws IOException {
+        // Not implemented
+    }
+
+    public String[] getPartitionKeys(String location, Job job)
+            throws IOException {
+        // Not implemented
+        return null;
+    }
+
+    public void setPartitionFilter(Expression partitionFilter)
+            throws IOException {
+        // Not implemented
+    }
+}
\ No newline at end of file

Added: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestFixedWidthLoader.java
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestFixedWidthLoader.java?rev=1451783&view=auto
==============================================================================
--- pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestFixedWidthLoader.java (added)
+++ pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestFixedWidthLoader.java Fri Mar  1 23:58:48 2013
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.piggybank.test.storage;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Iterator;
+
+import junit.framework.TestCase;
+
+import org.apache.commons.lang.StringUtils;
+
+import org.apache.pig.ExecType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.pigunit.pig.PigServer;
+import org.apache.pig.test.Util;
+import org.apache.pig.tools.parameters.ParseException;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestFixedWidthLoader {
+
+    private static final String dataDir = "build/test/tmpdata/";
+    private static final String testFile = "fixed_width_data";
+
+    private PigServer pig;
+
+    @Before
+    public void setup() throws IOException {
+        pig = new PigServer(ExecType.LOCAL);
+
+        Util.deleteDirectory(new File(dataDir));
+        try {
+            pig.mkdirs(dataDir);
+
+            Util.createLocalInputFile(dataDir + testFile,
+                new String[] {
+                    "  int            long   float     double bit boolean              datetime  string  string   extra",
+                    "12345   1234567890000   2.718   3.141593   0    true  2007-04-05T14:30:10Z   avertwolowolo",
+                    "12345   1234567890000   2.718   3.141593   1   false  2007-04-05T14:30:10Z   avertwolowolo   moose",
+                    "        1234567890000           3.141593        true                         avert        ",
+                    "        1234567890000           3.141593       false",
+                    "        1234567890000           cerulean        true"
+            });
+        } catch (IOException e) {};
+    }
+
+    @After
+    public void cleanup() throws IOException {
+        Util.deleteDirectory(new File(dataDir));
+        pig.shutdown();
+    }
+
+    @Test
+    public void defaultSchema() throws IOException, ParseException {
+        pig.registerQuery(
+            "data = load '" + dataDir + testFile + "' " +
+            "using org.apache.pig.piggybank.storage.FixedWidthLoader('-5, 9-21, 25-29 , 33 - 40, 44-44, 48-52, 55-74, 78-82, 83-90', 'USE_HEADER');"
+        );
+
+        Iterator<Tuple> data = pig.openIterator("data");
+        String[] expected = {
+            "(int,long,float,double,t,olean,datetime,tring,string)", // "bit", "boolean", and first "string" fields cut off properly
+            "(12345,1234567890000,2.718,3.141593,0,true,2007-04-05T14:30:10Z,avert,wolowolo)",
+            "(12345,1234567890000,2.718,3.141593,1,false,2007-04-05T14:30:10Z,avert,wolowolo)",
+            "(,1234567890000,,3.141593,,true,,avert,)",
+            "(,1234567890000,,3.141593,,false,,,)",
+            "(,1234567890000,,cerulean,,true,,,)"                   // no problem with this since loaded as a bytearray
+        };
+
+        Assert.assertEquals(StringUtils.join(expected, "\n"), StringUtils.join(data, "\n"));
+    }
+
+    @Test
+    public void userSchema() throws IOException, ParseException {
+        pig.registerQuery(
+            "data = load '" + dataDir + testFile + "' " +
+            "using org.apache.pig.piggybank.storage.FixedWidthLoader(" +
+                "'-5, 9-21, 25-29, 33-40, 44, 48-52, 55-74, 78-82, 83-90', " + 
+                "'SKIP_HEADER', " + 
+                "'i: int, l: long, f: float, d: double, bit: int, b: boolean, dt: datetime, c_arr: chararray, b_arr: bytearray'" + 
+            ");"
+        );
+
+        Iterator<Tuple> data = pig.openIterator("data");
+        String[] expected = {
+            // Header skipped
+            "(12345,1234567890000,2.718,3.141593,0,true,2007-04-05T14:30:10.000Z,avert,wolowolo)",  // scalar types
+            "(12345,1234567890000,2.718,3.141593,1,false,2007-04-05T14:30:10.000Z,avert,wolowolo)", // ignore extra field "moose" after beryl
+            "(,1234567890000,,3.141593,,true,,avert,)",                                             // nulls fields (all spaces)
+            "(,1234567890000,,3.141593,,false,,,)",                                                 // missing fields (line break earlier than expected)
+            "(,1234567890000,,,,true,,,)"                                                           // invalid double field "cerulean" turns to null   
+        };
+
+        Assert.assertEquals(StringUtils.join(expected, "\n"), StringUtils.join(data, "\n"));
+    }
+
+    @Test 
+    public void userSchemaFewerFieldsThanColumns() throws IOException, ParseException {
+        pig.registerQuery(
+            "data = load '" + dataDir + testFile + "' " +
+            "using org.apache.pig.piggybank.storage.FixedWidthLoader(" + 
+                "'-5, 9-21, 25-29, 33-40, 44, 48-52, 55-74, 78-82, 83-90', " + 
+                "'SKIP_HEADER', " +
+                "'i: int, l: long, f: float, d: double'" +
+            ");"
+        );
+
+        Iterator<Tuple> data = pig.openIterator("data");
+        String[] expected = {
+            "(12345,1234567890000,2.718,3.141593)",
+            "(12345,1234567890000,2.718,3.141593)",
+            "(,1234567890000,,3.141593)",
+            "(,1234567890000,,3.141593)",
+            "(,1234567890000,,)"
+        };
+
+        Assert.assertEquals(StringUtils.join(expected, "\n"), StringUtils.join(data, "\n"));
+    }
+
+    @Test(expected=FrontendException.class)
+    public void doesNotSupportObjectTypes() throws IOException, ParseException {
+        pig.registerQuery(
+            "data = load '" + dataDir + testFile + "' " +
+            "using org.apache.pig.piggybank.storage.FixedWidthLoader(" +
+                "'-5, 9-21, 25-29, 33-40, 44, 48-52, 55-74, 78-82, 83-90', 'SKIP_HEADER', 'i: (j: int, k: int)'" + 
+            ");"
+        );
+
+        Iterator<Tuple> data = pig.openIterator("data");
+    }
+
+    @Test(expected=FrontendException.class)
+    public void fewerColumnsThanSchemaFields() throws IOException, ParseException {
+        pig.registerQuery(
+            "data = load '" + dataDir + testFile + "' " +
+            "using org.apache.pig.piggybank.storage.FixedWidthLoader(" +
+                "'1-5, 9-21, 25-29, 33-40', 'SKIP_HEADER', 'i: int, l: long, f: float, d: double, bit: int, c: chararray, b: bytearray') " + 
+            ");"
+        );
+
+        Iterator<Tuple> data = pig.openIterator("data");
+    }
+
+    @Test(expected=FrontendException.class)
+    public void columnStartsAtZero() throws IOException, ParseException {
+        pig.registerQuery(
+            "data = load '" + dataDir + testFile + "' " +
+            "using org.apache.pig.piggybank.storage.FixedWidthLoader(" +
+                "'0-5', 'SKIP_HEADER', 'i: int'" + 
+            ");"
+        );
+
+        Iterator<Tuple> data = pig.openIterator("data");
+    }
+
+    @Test(expected=FrontendException.class)
+    public void columnEndLessThanStart() throws IOException, ParseException {
+        pig.registerQuery(
+            "data = load '" + dataDir + testFile + "' " +
+            "using org.apache.pig.piggybank.storage.FixedWidthLoader(" +
+                "'5-0', 'SKIP_HEADER', 'i: int'" + 
+            ");"
+        );
+
+        Iterator<Tuple> data = pig.openIterator("data");
+    }
+
+    @Test
+    public void pushProjection() throws IOException, ParseException {
+        pig.registerQuery(
+            "data = load '" + dataDir + testFile + "' " +
+            "using org.apache.pig.piggybank.storage.FixedWidthLoader(" +
+                "'-5, 9-21, 25-29 , 33 - 40, 44-44, 48-52, 55-74, 78-82, 83-90', " + 
+                "'SKIP_HEADER', " + 
+                "'i: int, l: long, f: float, d: double, bit: int, b: boolean, dt: datetime, c_arr: chararray, b_arr: bytearray'" + 
+            ");"
+        );
+
+        pig.registerQuery(
+            "projection = foreach data generate $1, $3, $7;"
+        );
+
+        Iterator<Tuple> projection = pig.openIterator("projection");
+        String[] expected = {
+            "(1234567890000,3.141593,avert)",
+            "(1234567890000,3.141593,avert)",
+            "(1234567890000,3.141593,avert)",
+            "(1234567890000,3.141593,)",
+            "(1234567890000,,)" 
+        };
+
+        Assert.assertEquals(StringUtils.join(expected, "\n"), StringUtils.join(projection, "\n"));
+    }
+}
\ No newline at end of file

Added: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestFixedWidthStorer.java
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestFixedWidthStorer.java?rev=1451783&view=auto
==============================================================================
--- pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestFixedWidthStorer.java (added)
+++ pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestFixedWidthStorer.java Fri Mar  1 23:58:48 2013
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.piggybank.test.storage;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Iterator;
+
+import junit.framework.Assert;
+
+import org.apache.commons.lang.StringUtils;
+
+import org.apache.pig.ExecType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.pigunit.pig.PigServer;
+import org.apache.pig.tools.parameters.ParseException;
+import org.apache.pig.test.Util;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestFixedWidthStorer  {
+
+    private static final String dataDir = "build/test/tmpdata/";
+    private static final String TEST_SCRIPT = "test/pigscripts/test_script.pig";
+    private static final String STORE_SCRIPT = "test/pigscripts/test_store_function.pig";
+    private static final String PIG_STORAGE_DELIMITER = "|";
+
+    static PigServer pig;
+
+    @Before
+    public void setup() throws IOException {
+        pig = new PigServer(ExecType.LOCAL);
+
+        Util.deleteDirectory(new File(dataDir));
+        try {
+            pig.mkdirs(dataDir);
+        } catch (IOException e) {};
+    }
+
+    @After
+    public void cleanup() throws IOException {
+        Util.deleteDirectory(new File(dataDir));
+        pig.shutdown();
+    }
+
+    @Test
+    public void storeScalarTypes() throws IOException, ParseException {
+        String input = "pig_storage_scalar_data";
+        String schema = "i: int, l: long, f: float, d: double, " +
+                        "b: boolean, dt: datetime, c_arr: chararray, b_arr: bytearray";
+        String output = "fixed_width_storage_scalar_data";
+
+        Util.createLocalInputFile(dataDir + input,
+            new String[] {
+                "1|10|2.718|3.14159|true|2007-04-05T14:30:10Z|aardvark|wooooolololo",
+                "1|100|1.234||false||cerulean skies|"
+        });
+
+        // Load input using PigStorage and store it using FixedWidthStorer
+        pig.registerQuery(
+            "data = load '" + dataDir + input + "' " +
+            "using PigStorage('|') as (" + schema + ");"
+        );
+        pig.store("data", dataDir + output, 
+                  "org.apache.pig.piggybank.storage.FixedWidthStorer('-5, 8-12, 15-19, 22-27, 29-33, 35-58, 62-69, 70-81', 'WRITE_HEADER')");
+
+        // Load the output and see if it is what it ought to be
+        pig.registerQuery(
+            "data = load '" + dataDir + output + "' " +
+            "using TextLoader() as (line: chararray);"
+        );
+
+        Iterator<Tuple> data = pig.openIterator("data");
+        String[] expected = {
+            // All columns right-aligned
+            "(    i      l      f       d     b                       dt      c_arr       b_arr)", // Header written
+            "(    1     10  2.718  3.1416  true 2007-04-05T14:30:10.000Z   aardvarkwooooolololo)", // 3.14159 rounded to fit in column
+            "(    1    100  1.234         false                                                )"  // "cerulean skies" does not fit, so a null is written (spaces)
+        };
+
+        Assert.assertEquals(StringUtils.join(expected, "\n"), StringUtils.join(data, "\n"));
+    }
+}
\ No newline at end of file