You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2011/11/09 10:04:56 UTC

svn commit: r1199666 - in /pig/branches/branch-0.10: ./ src/org/apache/pig/ src/org/apache/pig/builtin/ test/e2e/pig/tests/ test/org/apache/pig/test/ test/org/apache/pig/test/data/

Author: daijy
Date: Wed Nov  9 09:04:56 2011
New Revision: 1199666

URL: http://svn.apache.org/viewvc?rev=1199666&view=rev
Log:
PIG-2332: JsonLoader/JsonStorage

Added:
    pig/branches/branch-0.10/src/org/apache/pig/builtin/JsonLoader.java
    pig/branches/branch-0.10/src/org/apache/pig/builtin/JsonStorage.java
    pig/branches/branch-0.10/test/org/apache/pig/test/TestJsonLoaderStorage.java
    pig/branches/branch-0.10/test/org/apache/pig/test/data/jsonStorage1.pig
    pig/branches/branch-0.10/test/org/apache/pig/test/data/jsonStorage1.result
    pig/branches/branch-0.10/test/org/apache/pig/test/data/jsonStorage1.txt
Modified:
    pig/branches/branch-0.10/CHANGES.txt
    pig/branches/branch-0.10/src/org/apache/pig/LoadFunc.java
    pig/branches/branch-0.10/src/org/apache/pig/StoreFunc.java
    pig/branches/branch-0.10/test/e2e/pig/tests/nightly.conf
    pig/branches/branch-0.10/test/org/apache/pig/test/Util.java

Modified: pig/branches/branch-0.10/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.10/CHANGES.txt?rev=1199666&r1=1199665&r2=1199666&view=diff
==============================================================================
--- pig/branches/branch-0.10/CHANGES.txt (original)
+++ pig/branches/branch-0.10/CHANGES.txt Wed Nov  9 09:04:56 2011
@@ -24,6 +24,8 @@ INCOMPATIBLE CHANGES
 
 IMPROVEMENTS
 
+PIG-2332: JsonLoader/JsonStorage (daijy)
+
 PIG-2328: Add builtin UDFs for building and using bloom filters (gates)
 
 PIG-2334: Set default number of reducers for S3N filesystem (ddaniels888 via daijy)

Modified: pig/branches/branch-0.10/src/org/apache/pig/LoadFunc.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.10/src/org/apache/pig/LoadFunc.java?rev=1199666&r1=1199665&r2=1199666&view=diff
==============================================================================
--- pig/branches/branch-0.10/src/org/apache/pig/LoadFunc.java (original)
+++ pig/branches/branch-0.10/src/org/apache/pig/LoadFunc.java Wed Nov  9 09:04:56 2011
@@ -25,6 +25,7 @@ import java.util.Iterator;
 import java.util.List;
 
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Counter;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.RecordReader;
@@ -37,6 +38,7 @@ import org.apache.pig.builtin.Utf8Storag
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.logicalLayer.FrontendException;
 import org.apache.pig.impl.util.UDFContext;
+import org.apache.pig.tools.pigstats.PigStatusReporter;
 
 
 /**
@@ -292,5 +294,15 @@ public abstract class LoadFunc {
     public void setUDFContextSignature(String signature) {
         // default implementation is a no-op
     }
-       
+    
+    /**
+     * Issue a warning.  Warning messages are aggregated and reported to
+     * the user.
+     * @param msg String message of the warning
+     * @param warningEnum type of warning
+     */
+    public final void warn(String msg, Enum warningEnum) {
+        Counter counter = PigStatusReporter.getInstance().getCounter(warningEnum);
+        counter.increment(1);
+    }
 }

Modified: pig/branches/branch-0.10/src/org/apache/pig/StoreFunc.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.10/src/org/apache/pig/StoreFunc.java?rev=1199666&r1=1199665&r2=1199666&view=diff
==============================================================================
--- pig/branches/branch-0.10/src/org/apache/pig/StoreFunc.java (original)
+++ pig/branches/branch-0.10/src/org/apache/pig/StoreFunc.java Wed Nov  9 09:04:56 2011
@@ -21,6 +21,7 @@ import java.io.IOException;
 
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Counter;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.OutputFormat;
 import org.apache.hadoop.mapreduce.RecordWriter;
@@ -29,6 +30,7 @@ import org.apache.pig.classification.Int
 import org.apache.pig.classification.InterfaceStability;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.util.UDFContext;
+import org.apache.pig.tools.pigstats.PigStatusReporter;
 
 
 /**
@@ -173,4 +175,15 @@ public abstract class StoreFunc implemen
             fs.delete(path, true);
         }    
     }
+    
+    /**
+     * Issue a warning.  Warning messages are aggregated and reported to
+     * the user.
+     * @param msg String message of the warning
+     * @param warningEnum type of warning
+     */
+    public final void warn(String msg, Enum warningEnum) {
+        Counter counter = PigStatusReporter.getInstance().getCounter(warningEnum);
+        counter.increment(1);
+    }
 }

Added: pig/branches/branch-0.10/src/org/apache/pig/builtin/JsonLoader.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.10/src/org/apache/pig/builtin/JsonLoader.java?rev=1199666&view=auto
==============================================================================
--- pig/branches/branch-0.10/src/org/apache/pig/builtin/JsonLoader.java (added)
+++ pig/branches/branch-0.10/src/org/apache/pig/builtin/JsonLoader.java Wed Nov  9 09:04:56 2011
@@ -0,0 +1,344 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.builtin;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import org.codehaus.jackson.JsonFactory;
+import org.codehaus.jackson.JsonParser;
+import org.codehaus.jackson.JsonToken;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+
+import org.apache.pig.Expression;
+import org.apache.pig.LoadCaster;
+import org.apache.pig.LoadFunc;
+import org.apache.pig.LoadMetadata;
+import org.apache.pig.PigWarning;
+import org.apache.pig.ResourceSchema;
+import org.apache.pig.ResourceSchema.ResourceFieldSchema;
+import org.apache.pig.ResourceStatistics;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.util.UDFContext;
+import org.apache.pig.impl.util.Utils;
+import org.apache.pig.parser.ParserException;
+
+/**
+ * A loader for data stored using {@link JsonStorage}.  This is not a generic
+ * JSON loader. It depends on the schema being stored with the data when
+ * conceivably you could write a loader that determines the schema from the
+ * JSON. 
+ */
+public class JsonLoader extends LoadFunc implements LoadMetadata {
+
+    protected RecordReader reader = null;
+    protected ResourceSchema schema = null;
+    
+    private String udfcSignature = null;
+    private JsonFactory jsonFactory = null;
+    private TupleFactory tupleFactory = TupleFactory.getInstance();
+    private BagFactory bagFactory = BagFactory.getInstance();
+    
+    private static final String SCHEMA_SIGNATURE = "pig.jsonloader.schema";
+    
+    public JsonLoader() {
+    }
+    
+    public JsonLoader(String schemaString) throws IOException {
+        schema = new ResourceSchema(Utils.parseSchema(schemaString));
+    }
+
+    public void setLocation(String location, Job job) throws IOException {
+        // Tell our input format where we will be reading from
+        FileInputFormat.setInputPaths(job, location);
+    }
+    
+    @SuppressWarnings("unchecked")
+    public InputFormat getInputFormat() throws IOException {
+        // We will use TextInputFormat, the default Hadoop input format for
+        // text.  It has a LongWritable key that we will ignore, and the value
+        // is a Text (a string writable) that the JSON data is in.
+        return new TextInputFormat();
+    }
+
+    public LoadCaster getLoadCaster() throws IOException {
+        // We do not expect to do casting of byte arrays, because we will be
+        // returning typed data.
+        return null;
+    }
+
+    @SuppressWarnings("unchecked")
+    public void prepareToRead(RecordReader reader, PigSplit split)
+    throws IOException {
+        this.reader = reader;
+        
+        // Get the schema string from the UDFContext object.
+        UDFContext udfc = UDFContext.getUDFContext();
+        Properties p =
+            udfc.getUDFProperties(this.getClass(), new String[]{udfcSignature});
+        String strSchema = p.getProperty(SCHEMA_SIGNATURE);
+        if (strSchema == null) {
+            throw new IOException("Could not find schema in UDF context");
+        }
+
+        // Parse the schema from the string stored in the properties object.
+        schema = new ResourceSchema(Utils.getSchemaFromString(strSchema));
+
+        jsonFactory = new JsonFactory();
+    }
+
+    public Tuple getNext() throws IOException {
+        Text val = null;
+        try {
+            // Read the next key value pair from the record reader.  If it's
+            // finished, return null
+            if (!reader.nextKeyValue()) return null;
+
+            // Get the current value.  We don't use the key.
+            val = (Text)reader.getCurrentValue();
+        } catch (InterruptedException ie) {
+            throw new IOException(ie);
+        }
+
+        // Create a parser specific for this input line.  This may not be the
+        // most efficient approach.
+        ByteArrayInputStream bais = new ByteArrayInputStream(val.getBytes());
+        JsonParser p = jsonFactory.createJsonParser(bais);
+
+        // Create the tuple we will be returning.  We create it with the right
+        // number of fields, as the Tuple object is optimized for this case.
+        ResourceFieldSchema[] fields = schema.getFields();
+        Tuple t = tupleFactory.newTuple(fields.length);
+
+        // Read the start object marker.  Throughout this file if the parsing
+        // isn't what we expect we return a tuple with null fields rather than
+        // throwing an exception.  That way a few mangled lines don't fail the
+        // job.
+        if (p.nextToken() != JsonToken.START_OBJECT) {
+            warn("Bad record, could not find start of record " +
+                val.toString(), PigWarning.UDF_WARNING_1);
+            return t;
+        }
+
+        // Read each field in the record
+        for (int i = 0; i < fields.length; i++) {
+            t.set(i, readField(p, fields[i], i));
+        }
+
+        if (p.nextToken() != JsonToken.END_OBJECT) {
+            warn("Bad record, could not find end of record " +
+                val.toString(), PigWarning.UDF_WARNING_1);
+            return t;
+        }
+        p.close();
+        return t;
+    }
+
+    private Object readField(JsonParser p,
+                             ResourceFieldSchema field,
+                             int fieldnum) throws IOException {
+        // Read the next token
+        JsonToken tok = p.nextToken();
+        if (tok == null) {
+            warn("Early termination of record, expected " + schema.getFields().length
+                + " fields bug found " + fieldnum, PigWarning.UDF_WARNING_1);
+            return null;
+        }
+
+        // Check to see if this value was null
+        if (tok == JsonToken.VALUE_NULL) return null;
+
+        // Read based on our expected type
+        switch (field.getType()) {
+        case DataType.INTEGER:
+            // Read the field name
+            tok = p.nextToken();
+            if (tok == JsonToken.VALUE_NULL) return null;
+            return p.getIntValue();
+
+        case DataType.LONG:
+            tok = p.nextToken();
+            if (tok == JsonToken.VALUE_NULL) return null;
+            return p.getLongValue();
+
+        case DataType.FLOAT:
+            tok = p.nextToken();
+            return p.getFloatValue();
+
+        case DataType.DOUBLE:
+            tok = p.nextToken();
+            if (tok == JsonToken.VALUE_NULL) return null;
+            return p.getDoubleValue();
+
+        case DataType.BYTEARRAY:
+            tok = p.nextToken();
+            if (tok == JsonToken.VALUE_NULL) return null;
+            byte[] b = p.getText().getBytes();
+            // Use the DBA constructor that copies the bytes so that we own
+            // the memory
+            return new DataByteArray(b, 0, b.length);
+
+        case DataType.CHARARRAY:
+            tok = p.nextToken();
+            if (tok == JsonToken.VALUE_NULL) return null;
+            return p.getText();
+
+        case DataType.MAP:
+            // Should be a start of the map object
+            if (p.nextToken() != JsonToken.START_OBJECT) {
+                warn("Bad map field, could not find start of object, field "
+                    + fieldnum, PigWarning.UDF_WARNING_1);
+                return null;
+            }
+            Map<String, String> m = new HashMap<String, String>();
+            while (p.nextToken() != JsonToken.END_OBJECT) {
+                String k = p.getCurrentName();
+                String v = p.getText();
+                m.put(k, v);
+            }
+            return m;
+
+        case DataType.TUPLE:
+            if (p.nextToken() != JsonToken.START_OBJECT) {
+                warn("Bad tuple field, could not find start of object, "
+                    + "field " + fieldnum, PigWarning.UDF_WARNING_1);
+                return null;
+            }
+
+            ResourceSchema s = field.getSchema();
+            ResourceFieldSchema[] fs = s.getFields();
+            Tuple t = tupleFactory.newTuple(fs.length);
+
+            for (int j = 0; j < fs.length; j++) {
+                t.set(j, readField(p, fs[j], j));
+            }
+
+            if (p.nextToken() != JsonToken.END_OBJECT) {
+                warn("Bad tuple field, could not find end of object, "
+                    + "field " + fieldnum, PigWarning.UDF_WARNING_1);
+                return null;
+            }
+            return t;
+
+        case DataType.BAG:
+            if (p.nextToken() != JsonToken.START_ARRAY) {
+                warn("Bad bag field, could not find start of array, "
+                    + "field " + fieldnum, PigWarning.UDF_WARNING_1);
+                return null;
+            }
+
+            s = field.getSchema();
+            fs = s.getFields();
+            // Drill down the next level to the tuple's schema.
+            s = fs[0].getSchema();
+            fs = s.getFields();
+
+            DataBag bag = bagFactory.newDefaultBag();
+
+            JsonToken innerTok;
+            while ((innerTok = p.nextToken()) != JsonToken.END_ARRAY) {
+                if (innerTok != JsonToken.START_OBJECT) {
+                    warn("Bad bag tuple field, could not find start of "
+                        + "object, field " + fieldnum, PigWarning.UDF_WARNING_1);
+                    return null;
+                }
+
+                t = tupleFactory.newTuple(fs.length);
+                for (int j = 0; j < fs.length; j++) {
+                    t.set(j, readField(p, fs[j], j));
+                }
+
+                if (p.nextToken() != JsonToken.END_OBJECT) {
+                    warn("Bad bag tuple field, could not find end of "
+                        + "object, field " + fieldnum, PigWarning.UDF_WARNING_1);
+                    return null;
+                }
+                bag.add(t);
+            }
+            return bag;
+        default:
+            throw new IOException("Unknown type in input schema: " +
+                field.getType());
+        }
+
+    }
+
+    //------------------------------------------------------------------------
+    
+    public void setUDFContextSignature(String signature) {
+        udfcSignature = signature;
+    }
+
+    public ResourceSchema getSchema(String location, Job job)
+    throws IOException {
+
+        ResourceSchema s;
+        if (schema!=null) {
+            s = schema;
+        } else {
+            // Parse the schema
+            s = (new JsonMetadata()).getSchema(location, job, true);
+    
+            if (s == null) {
+                throw new IOException("Unable to parse schema found in file in " + location);
+            }
+        }
+    
+        // Now that we have determined the schema, store it in our
+        // UDFContext properties object so we have it when we need it on the
+        // backend
+        UDFContext udfc = UDFContext.getUDFContext();
+        Properties p =
+            udfc.getUDFProperties(this.getClass(), new String[]{udfcSignature});
+        p.setProperty(SCHEMA_SIGNATURE, s.toString());
+
+        return s;
+    }
+
+    public ResourceStatistics getStatistics(String location, Job job) 
+    throws IOException {
+        // We don't implement this one.
+        return null;
+    }
+
+    public String[] getPartitionKeys(String location, Job job)
+    throws IOException {
+        // We don't have partitions
+        return null;
+    }
+
+    public void setPartitionFilter(Expression partitionFilter)
+    throws IOException {
+        // We don't have partitions
+    }
+}

Added: pig/branches/branch-0.10/src/org/apache/pig/builtin/JsonStorage.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.10/src/org/apache/pig/builtin/JsonStorage.java?rev=1199666&view=auto
==============================================================================
--- pig/branches/branch-0.10/src/org/apache/pig/builtin/JsonStorage.java (added)
+++ pig/branches/branch-0.10/src/org/apache/pig/builtin/JsonStorage.java Wed Nov  9 09:04:56 2011
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.builtin;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.Map;
+import java.util.Properties;
+
+import org.codehaus.jackson.JsonEncoding;
+import org.codehaus.jackson.JsonFactory;
+import org.codehaus.jackson.JsonGenerator;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+
+import org.apache.pig.ResourceSchema;
+import org.apache.pig.ResourceSchema.ResourceFieldSchema;
+import org.apache.pig.ResourceStatistics;
+import org.apache.pig.StoreMetadata;
+import org.apache.pig.StoreFunc;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.impl.util.UDFContext;
+import org.apache.pig.impl.util.Utils;
+
+/**
+ * A JSON Pig store function.  Each Pig tuple is stored on one line (as one
+ * value for TextOutputFormat) so that it can be read easily using
+ * TextInputFormat.  Pig tuples are mapped to JSON objects.  Pig bags are
+ * mapped to JSON arrays.  Pig maps are also mapped to JSON objects.  Maps are
+ * assumed to be string to string.  A schema is stored in a side file to deal
+ * with mapping between JSON and Pig types. The schema file share the same format
+ * as the one we use in PigStorage.
+ */
+public class JsonStorage extends StoreFunc implements StoreMetadata {
+
+    protected RecordWriter writer = null;
+    protected ResourceSchema schema = null;
+
+    private String udfcSignature = null;
+    private JsonFactory jsonFactory = null;
+
+    // Default size for the byte buffer, should fit most tuples.
+    private static final int BUF_SIZE = 4 * 1024; 
+    
+    private static final String SCHEMA_SIGNATURE = "pig.jsonstorage.schema";
+
+    /*
+     * Methods called on the front end
+     */
+
+    @Override
+    public OutputFormat getOutputFormat() throws IOException {
+        // We will use TextOutputFormat, the default Hadoop output format for
+        // text.  The key is unused and the value will be a
+        // Text (a string writable type) that we store our JSON data in.
+        return new TextOutputFormat<LongWritable, Text>();
+    }
+
+    @Override
+    public void setStoreLocation(String location, Job job) throws IOException {
+        // FileOutputFormat has a utility method for setting up the output
+        // location.  
+        FileOutputFormat.setOutputPath(job, new Path(location));
+    }
+
+    @Override
+    public void setStoreFuncUDFContextSignature(String signature) {
+        // store the signature so we can use it later
+        udfcSignature = signature;
+    }
+
+    @Override
+    public void checkSchema(ResourceSchema s) throws IOException {
+        // We won't really check the schema here, we'll store it in our
+        // UDFContext properties object so we have it when we need it on the
+        // backend
+        
+        UDFContext udfc = UDFContext.getUDFContext();
+        Properties p =
+            udfc.getUDFProperties(this.getClass(), new String[]{udfcSignature});
+        p.setProperty(SCHEMA_SIGNATURE, s.toString());
+    }
+
+
+    /*
+     * Methods called on the back end
+     */
+
+    @Override
+    public void prepareToWrite(RecordWriter writer) throws IOException {
+        // Store the record writer reference so we can use it when it's time
+        // to write tuples
+        this.writer = writer;
+
+        // Get the schema string from the UDFContext object.
+        UDFContext udfc = UDFContext.getUDFContext();
+        Properties p =
+            udfc.getUDFProperties(this.getClass(), new String[]{udfcSignature});
+        String strSchema = p.getProperty(SCHEMA_SIGNATURE);
+        if (strSchema == null) {
+            throw new IOException("Could not find schema in UDF context");
+        }
+
+        // Parse the schema from the string stored in the properties object.
+        schema = new ResourceSchema(Utils.getSchemaFromString(strSchema));
+
+        // Build a Json factory
+        jsonFactory = new JsonFactory();
+    }
+
+    @SuppressWarnings("unchecked")
+    public void putNext(Tuple t) throws IOException {
+        // Build a ByteArrayOutputStream to write the JSON into
+        ByteArrayOutputStream baos = new ByteArrayOutputStream(BUF_SIZE);
+        // Build the generator
+        JsonGenerator json =
+            jsonFactory.createJsonGenerator(baos, JsonEncoding.UTF8);
+
+        // Write the beginning of the top level tuple object
+        json.writeStartObject();
+        
+        ResourceFieldSchema[] fields = schema.getFields();
+        for (int i = 0; i < fields.length; i++) {
+            writeField(json, fields[i], t.get(i));
+        }
+        json.writeEndObject();
+        json.close();
+
+        // Hand a null key and our string to Hadoop
+        try {
+            writer.write(null, new Text(baos.toByteArray()));
+        } catch (InterruptedException ie) {
+            throw new IOException(ie);
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    private void writeField(JsonGenerator json,
+                            ResourceFieldSchema field, 
+                            Object d) throws IOException {
+
+        // If the field is missing or the value is null, write a null
+        if (d == null) {
+            json.writeNullField(field.getName());
+            return;
+        }
+
+        // Based on the field's type, write it out
+        switch (field.getType()) {
+        case DataType.INTEGER:
+            json.writeNumberField(field.getName(), (Integer)d);
+            return;
+
+        case DataType.LONG:
+            json.writeNumberField(field.getName(), (Long)d);
+            return;
+
+        case DataType.FLOAT:
+            json.writeNumberField(field.getName(), (Float)d);
+            return;
+
+        case DataType.DOUBLE:
+            json.writeNumberField(field.getName(), (Double)d);
+            return;
+
+        case DataType.BYTEARRAY:
+            json.writeStringField(field.getName(), d.toString());
+            return;
+
+        case DataType.CHARARRAY:
+            json.writeStringField(field.getName(), (String)d);
+            return;
+
+        case DataType.MAP:
+            json.writeFieldName(field.getName());
+            json.writeStartObject();
+            for (Map.Entry<String, Object> e : ((Map<String, Object>)d).entrySet()) {
+                json.writeStringField(e.getKey(), e.getValue().toString());
+            }
+            json.writeEndObject();
+            return;
+
+        case DataType.TUPLE:
+            json.writeFieldName(field.getName());
+            json.writeStartObject();
+
+            ResourceSchema s = field.getSchema();
+            if (s == null) {
+                throw new IOException("Schemas must be fully specified to use "
+                    + "this storage function.  No schema found for field " +
+                    field.getName());
+            }
+            ResourceFieldSchema[] fs = s.getFields();
+
+            for (int j = 0; j < fs.length; j++) {
+                writeField(json, fs[j], ((Tuple)d).get(j));
+            }
+            json.writeEndObject();
+            return;
+
+        case DataType.BAG:
+            json.writeFieldName(field.getName());
+            json.writeStartArray();
+            s = field.getSchema();
+            if (s == null) {
+                throw new IOException("Schemas must be fully specified to use "
+                    + "this storage function.  No schema found for field " +
+                    field.getName());
+            }
+            fs = s.getFields();
+            if (fs.length != 1 || fs[0].getType() != DataType.TUPLE) {
+                throw new IOException("Found a bag without a tuple "
+                    + "inside!");
+            }
+            // Drill down the next level to the tuple's schema.
+            s = fs[0].getSchema();
+            if (s == null) {
+                throw new IOException("Schemas must be fully specified to use "
+                    + "this storage function.  No schema found for field " +
+                    field.getName());
+            }
+            fs = s.getFields();
+            for (Tuple t : (DataBag)d) {
+                json.writeStartObject();
+                for (int j = 0; j < fs.length; j++) {
+                    writeField(json, fs[j], t.get(j));
+                }
+                json.writeEndObject();
+            }
+            json.writeEndArray();
+            return;
+        }
+    }
+
+    public void storeStatistics(ResourceStatistics stats,
+                                String location,
+                                Job job) throws IOException {
+        // We don't implement this method
+    }
+
+    public void storeSchema(ResourceSchema schema, String location, Job job)
+    throws IOException {
+        // Store the schema in a side file in the same directory.  MapReduce
+        // does not include files starting with "_" when reading data for a job.
+        JsonMetadata metadataWriter = new JsonMetadata();
+        byte recordDel = '\n';
+        byte fieldDel = '\t';
+        metadataWriter.setFieldDel(fieldDel);
+        metadataWriter.setRecordDel(recordDel);
+        metadataWriter.storeSchema(schema, location, job);
+    }
+
+}
\ No newline at end of file

Modified: pig/branches/branch-0.10/test/e2e/pig/tests/nightly.conf
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.10/test/e2e/pig/tests/nightly.conf?rev=1199666&r1=1199665&r2=1199666&view=diff
==============================================================================
--- pig/branches/branch-0.10/test/e2e/pig/tests/nightly.conf (original)
+++ pig/branches/branch-0.10/test/e2e/pig/tests/nightly.conf Wed Nov  9 09:04:56 2011
@@ -3950,6 +3950,37 @@ store E into ':OUTPATH:';\, 
                                  store D into ':OUTPATH:';",
                      }
                  ],
+             },{
+                 'name' => 'JsonLoaderStorage',
+                 'tests' => [
+                     {
+                         'num' => 1,
+                         'pig' => q?A = LOAD ':INPATH:/singlefile/studenttab10k' AS (name:chararray, age:int, gpa:double);
+                                 store A into ':OUTPATH:.intermediate' using JsonStorage();
+                                 exec
+                                 A = LOAD ':OUTPATH:.intermediate' using JsonLoader();
+                                 store A into ':OUTPATH:';?,
+                         'notmq' => 1,
+                         'verify_pig_script' => q?A = LOAD ':INPATH:/singlefile/studenttab10k' AS (name:chararray, age:int,gpa:double);
+                                 store A into ':OUTPATH:';?,
+                     }, {
+                         'num' => 2,
+                         'pig' => q?A = LOAD ':INPATH:/singlefile/studenttab10k' AS (name:chararray, age:int, gpa:double);
+                                 store A into ':OUTPATH:.intermediate1' using JsonStorage();
+                                 B = LOAD ':INPATH:/singlefile/votertab10k' AS (name:chararray, age:int, registration:chararray, contributions:double);
+                                 store B into ':OUTPATH:.intermediate2' using JsonStorage();
+                                 exec
+                                 A = LOAD ':OUTPATH:.intermediate1' using JsonLoader();
+                                 B = LOAD ':OUTPATH:.intermediate2' using JsonLoader();
+                                 C = JOIN A by name, B by name;
+                                 store C into ':OUTPATH:';?,
+                         'notmq' => 1,
+                         'verify_pig_script' => q?A = LOAD ':INPATH:/singlefile/studenttab10k' AS (name:chararray, age:int,gpa:double);
+                                 B = LOAD ':INPATH:/singlefile/votertab10k' AS (name:chararray, age:int, registration:chararray, contributions:double);
+                                 C = JOIN A by name, B by name;
+                                 store C into ':OUTPATH:';?,
+                     }
+                 ],
              }
         ],
     },

Added: pig/branches/branch-0.10/test/org/apache/pig/test/TestJsonLoaderStorage.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.10/test/org/apache/pig/test/TestJsonLoaderStorage.java?rev=1199666&view=auto
==============================================================================
--- pig/branches/branch-0.10/test/org/apache/pig/test/TestJsonLoaderStorage.java (added)
+++ pig/branches/branch-0.10/test/org/apache/pig/test/TestJsonLoaderStorage.java Wed Nov  9 09:04:56 2011
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.PrintWriter;
+import java.util.Iterator;
+
+import junit.framework.Assert;
+
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.data.Tuple;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class TestJsonLoaderStorage {
+    private static PigServer pigServer;
+    File jsonFile;
+    
+    @BeforeClass
+    public static void setUp() throws Exception{
+        removeOutput();
+        pigServer = new PigServer(ExecType.LOCAL);
+    }
+    
+    private static void removeOutput() {
+        File outputDir = new File("jsonStorage1.json");
+        if (outputDir.exists()) {
+            for (File c : outputDir.listFiles())
+                c.delete();
+            outputDir.delete();
+        }
+    }
+    
+    @Test
+    public void testJsonStorage1() throws Exception{
+        removeOutput();
+        
+        pigServer.registerScript("test/org/apache/pig/test/data/jsonStorage1.pig");
+        
+        File resultFile = new File("jsonStorage1.json/part-m-00000");
+        
+        String result = Util.readFile(resultFile);
+        String expected = Util.readFile(new File("test/org/apache/pig/test/data/jsonStorage1.result"));
+        Assert.assertTrue(result.equals(expected));
+        
+        File schemaFile = new File("jsonStorage1.json/.pig_schema");
+        result = Util.readFile(schemaFile);
+        expected = Util.readFile(new File("test/org/apache/pig/test/data/jsonStorage1.schema"));
+        Assert.assertTrue(result.equals(expected));
+    }
+    
+    @Test
+    public void testJsonLoader1() throws Exception{
+        
+        File tmpFile = File.createTempFile("tmp", null);
+        tmpFile.delete();
+        
+        pigServer.registerQuery("a = load 'jsonStorage1.json' using JsonLoader();");
+        pigServer.store("a", tmpFile.getCanonicalPath());
+        
+        String result = Util.readFile(new File(tmpFile.getCanonicalPath()+"/part-m-00000"));
+        String expected = Util.readFile(new File("test/org/apache/pig/test/data/jsonStorage1.txt"));
+        Assert.assertTrue(result.equals(expected));
+    }
+    
+    @Test
+    public void testJsonLoader2() throws Exception{
+        
+        File tmpFile = File.createTempFile("tmp", null);
+        tmpFile.delete();
+        
+        File schemaFile = new File("test/org/apache/pig/test/data/jsonStorage1.schema");
+        schemaFile.delete();
+        
+        pigServer.registerQuery("a = load 'jsonStorage1.json' using" + 
+        		" JsonLoader('a0:int,a1:{(a10:int,a11:chararray)},a2:(a20:double,a21:bytearray),a3:[chararray]');");
+        pigServer.store("a", tmpFile.getCanonicalPath());
+        
+        String result = Util.readFile(new File(tmpFile.getCanonicalPath()+"/part-m-00000"));
+        String expected = Util.readFile(new File("test/org/apache/pig/test/data/jsonStorage1.txt"));
+        Assert.assertTrue(result.equals(expected));
+    }
+    
+    @Test
+    public void testJsonStorage2() throws Exception{
+        
+        File inputFile = File.createTempFile("tmp", null);
+        PrintWriter pw = new PrintWriter(new FileWriter(inputFile));
+        pw.println("\t\t\t");
+        pw.close();
+        
+        File interFile = File.createTempFile("tmp", null);
+        interFile.delete();
+        
+        pigServer.registerQuery("a = load '" + inputFile.getCanonicalPath() + "' as (a0:int, a1:chararray, a2, a3:(a30:int));");
+        pigServer.store("a", interFile.getCanonicalPath(), "JsonStorage");
+        
+        pigServer.registerQuery("b = load '" + interFile.getCanonicalPath() + "' using JsonLoader();");
+        Iterator<Tuple> iter = pigServer.openIterator("b");
+        
+        Tuple t = iter.next();
+        
+        Assert.assertTrue(t.size()==4);
+        Assert.assertTrue(t.get(0)==null);
+        Assert.assertTrue(t.get(1)==null);
+        Assert.assertTrue(t.get(2)==null);
+        Assert.assertTrue(t.get(3)==null);
+        
+        Assert.assertFalse(iter.hasNext());
+    }
+
+    @AfterClass
+    public static void tearDown() {
+        removeOutput();
+    }
+}
\ No newline at end of file

Modified: pig/branches/branch-0.10/test/org/apache/pig/test/Util.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.10/test/org/apache/pig/test/Util.java?rev=1199666&r1=1199665&r2=1199666&view=diff
==============================================================================
--- pig/branches/branch-0.10/test/org/apache/pig/test/Util.java (original)
+++ pig/branches/branch-0.10/test/org/apache/pig/test/Util.java Wed Nov  9 09:04:56 2011
@@ -24,6 +24,7 @@ import java.io.BufferedReader;
 import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.io.FileInputStream;
+import java.io.FileNotFoundException;
 import java.io.FileOutputStream;
 import java.io.FileReader;
 import java.io.FileWriter;
@@ -1129,6 +1130,15 @@ public class Util {
         Assert.assertEquals("Comparing actual and expected results. ",
                 expectedResList, actualResList);
     }
-
     
+    public static String readFile(File file) throws IOException {
+        BufferedReader reader = new BufferedReader(new FileReader(file));
+        String result = "";
+        String line;
+        while ((line=reader.readLine())!=null) {
+            result += line;
+            result += "\n";
+        }
+        return result;
+    }
 }

Added: pig/branches/branch-0.10/test/org/apache/pig/test/data/jsonStorage1.pig
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.10/test/org/apache/pig/test/data/jsonStorage1.pig?rev=1199666&view=auto
==============================================================================
--- pig/branches/branch-0.10/test/org/apache/pig/test/data/jsonStorage1.pig (added)
+++ pig/branches/branch-0.10/test/org/apache/pig/test/data/jsonStorage1.pig Wed Nov  9 09:04:56 2011
@@ -0,0 +1,2 @@
+a = load 'test/org/apache/pig/test/data/jsonStorage1.txt' as (a0:int, a1:{t:(a10:int, a11:chararray)},a2:(a20:double, a21), a3:map[chararray]);
+store a into 'jsonStorage1.json' using JsonStorage();

Added: pig/branches/branch-0.10/test/org/apache/pig/test/data/jsonStorage1.result
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.10/test/org/apache/pig/test/data/jsonStorage1.result?rev=1199666&view=auto
==============================================================================
--- pig/branches/branch-0.10/test/org/apache/pig/test/data/jsonStorage1.result (added)
+++ pig/branches/branch-0.10/test/org/apache/pig/test/data/jsonStorage1.result Wed Nov  9 09:04:56 2011
@@ -0,0 +1,2 @@
+{"a0":1,"a1":[{"a10":1,"a11":"tom"},{"a10":2,"a11":"jerry"}],"a2":{"a20":1.01,"a21":"sun"},"a3":{"key3":"c","key2":"b","key1":"a"}}
+{"a0":2,"a1":[{"a10":6,"a11":"cat"},{"a10":7,"a11":"dog"},{"a10":8,"a11":"pig"}],"a2":{"a20":2.3,"a21":"moon"},"a3":{"key4":"value4","key1":"value1"}}

Added: pig/branches/branch-0.10/test/org/apache/pig/test/data/jsonStorage1.txt
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.10/test/org/apache/pig/test/data/jsonStorage1.txt?rev=1199666&view=auto
==============================================================================
--- pig/branches/branch-0.10/test/org/apache/pig/test/data/jsonStorage1.txt (added)
+++ pig/branches/branch-0.10/test/org/apache/pig/test/data/jsonStorage1.txt Wed Nov  9 09:04:56 2011
@@ -0,0 +1,2 @@
+1	{(1,tom),(2,jerry)}	(1.01,sun)	[key3#c,key2#b,key1#a]
+2	{(6,cat),(7,dog),(8,pig)}	(2.3,moon)	[key4#value4,key1#value1]