You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2011/11/09 10:08:40 UTC
svn commit: r1199668 - in /pig/trunk: ./ src/org/apache/pig/
src/org/apache/pig/builtin/ test/e2e/pig/tests/ test/org/apache/pig/test/
test/org/apache/pig/test/data/
Author: daijy
Date: Wed Nov 9 09:08:39 2011
New Revision: 1199668
URL: http://svn.apache.org/viewvc?rev=1199668&view=rev
Log:
PIG-2332: JsonLoader/JsonStorage
Added:
pig/trunk/src/org/apache/pig/builtin/JsonLoader.java
pig/trunk/src/org/apache/pig/builtin/JsonStorage.java
pig/trunk/test/org/apache/pig/test/TestJsonLoaderStorage.java
pig/trunk/test/org/apache/pig/test/data/jsonStorage1.pig
pig/trunk/test/org/apache/pig/test/data/jsonStorage1.result
pig/trunk/test/org/apache/pig/test/data/jsonStorage1.txt
Modified:
pig/trunk/CHANGES.txt
pig/trunk/src/org/apache/pig/LoadFunc.java
pig/trunk/src/org/apache/pig/StoreFunc.java
pig/trunk/test/e2e/pig/tests/nightly.conf
pig/trunk/test/org/apache/pig/test/Util.java
Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1199668&r1=1199667&r2=1199668&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Wed Nov 9 09:08:39 2011
@@ -44,6 +44,8 @@ INCOMPATIBLE CHANGES
IMPROVEMENTS
+PIG-2332: JsonLoader/JsonStorage (daijy)
+
PIG-2334: Set default number of reducers for S3N filesystem (ddaniels888 via daijy)
PIG-1387: Syntactical Sugar for PIG-1385 (azaroth)
Modified: pig/trunk/src/org/apache/pig/LoadFunc.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/LoadFunc.java?rev=1199668&r1=1199667&r2=1199668&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/LoadFunc.java (original)
+++ pig/trunk/src/org/apache/pig/LoadFunc.java Wed Nov 9 09:08:39 2011
@@ -25,6 +25,7 @@ import java.util.Iterator;
import java.util.List;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.RecordReader;
@@ -37,6 +38,7 @@ import org.apache.pig.builtin.Utf8Storag
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.util.UDFContext;
+import org.apache.pig.tools.pigstats.PigStatusReporter;
/**
@@ -292,5 +294,15 @@ public abstract class LoadFunc {
public void setUDFContextSignature(String signature) {
// default implementation is a no-op
}
-
+
+ /**
+ * Issue a warning. Warning messages are aggregated and reported to
+ * the user.
+ * @param msg String message of the warning
+ * @param warningEnum type of warning
+ */
+ public final void warn(String msg, Enum warningEnum) {
+ Counter counter = PigStatusReporter.getInstance().getCounter(warningEnum);
+ counter.increment(1);
+ }
}
Modified: pig/trunk/src/org/apache/pig/StoreFunc.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/StoreFunc.java?rev=1199668&r1=1199667&r2=1199668&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/StoreFunc.java (original)
+++ pig/trunk/src/org/apache/pig/StoreFunc.java Wed Nov 9 09:08:39 2011
@@ -21,6 +21,7 @@ import java.io.IOException;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.RecordWriter;
@@ -29,6 +30,7 @@ import org.apache.pig.classification.Int
import org.apache.pig.classification.InterfaceStability;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.util.UDFContext;
+import org.apache.pig.tools.pigstats.PigStatusReporter;
/**
@@ -173,4 +175,15 @@ public abstract class StoreFunc implemen
fs.delete(path, true);
}
}
+
+ /**
+ * Issue a warning. Warning messages are aggregated and reported to
+ * the user.
+ * @param msg String message of the warning
+ * @param warningEnum type of warning
+ */
+ public final void warn(String msg, Enum warningEnum) {
+ Counter counter = PigStatusReporter.getInstance().getCounter(warningEnum);
+ counter.increment(1);
+ }
}
Added: pig/trunk/src/org/apache/pig/builtin/JsonLoader.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/builtin/JsonLoader.java?rev=1199668&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/builtin/JsonLoader.java (added)
+++ pig/trunk/src/org/apache/pig/builtin/JsonLoader.java Wed Nov 9 09:08:39 2011
@@ -0,0 +1,344 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.builtin;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import org.codehaus.jackson.JsonFactory;
+import org.codehaus.jackson.JsonParser;
+import org.codehaus.jackson.JsonToken;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+
+import org.apache.pig.Expression;
+import org.apache.pig.LoadCaster;
+import org.apache.pig.LoadFunc;
+import org.apache.pig.LoadMetadata;
+import org.apache.pig.PigWarning;
+import org.apache.pig.ResourceSchema;
+import org.apache.pig.ResourceSchema.ResourceFieldSchema;
+import org.apache.pig.ResourceStatistics;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.util.UDFContext;
+import org.apache.pig.impl.util.Utils;
+import org.apache.pig.parser.ParserException;
+
+/**
+ * A loader for data stored using {@link JsonStorage}. This is not a generic
+ * JSON loader. It depends on the schema being stored with the data when
+ * conceivably you could write a loader that determines the schema from the
+ * JSON.
+ */
+public class JsonLoader extends LoadFunc implements LoadMetadata {
+
+ protected RecordReader reader = null;
+ protected ResourceSchema schema = null;
+
+ private String udfcSignature = null;
+ private JsonFactory jsonFactory = null;
+ private TupleFactory tupleFactory = TupleFactory.getInstance();
+ private BagFactory bagFactory = BagFactory.getInstance();
+
+ private static final String SCHEMA_SIGNATURE = "pig.jsonloader.schema";
+
+ public JsonLoader() {
+ }
+
+ public JsonLoader(String schemaString) throws IOException {
+ schema = new ResourceSchema(Utils.parseSchema(schemaString));
+ }
+
+ public void setLocation(String location, Job job) throws IOException {
+ // Tell our input format where we will be reading from
+ FileInputFormat.setInputPaths(job, location);
+ }
+
+ @SuppressWarnings("unchecked")
+ public InputFormat getInputFormat() throws IOException {
+ // We will use TextInputFormat, the default Hadoop input format for
+ // text. It has a LongWritable key that we will ignore, and the value
+ // is a Text (a string writable) that the JSON data is in.
+ return new TextInputFormat();
+ }
+
+ public LoadCaster getLoadCaster() throws IOException {
+ // We do not expect to do casting of byte arrays, because we will be
+ // returning typed data.
+ return null;
+ }
+
+ @SuppressWarnings("unchecked")
+ public void prepareToRead(RecordReader reader, PigSplit split)
+ throws IOException {
+ this.reader = reader;
+
+ // Get the schema string from the UDFContext object.
+ UDFContext udfc = UDFContext.getUDFContext();
+ Properties p =
+ udfc.getUDFProperties(this.getClass(), new String[]{udfcSignature});
+ String strSchema = p.getProperty(SCHEMA_SIGNATURE);
+ if (strSchema == null) {
+ throw new IOException("Could not find schema in UDF context");
+ }
+
+ // Parse the schema from the string stored in the properties object.
+ schema = new ResourceSchema(Utils.getSchemaFromString(strSchema));
+
+ jsonFactory = new JsonFactory();
+ }
+
+ public Tuple getNext() throws IOException {
+ Text val = null;
+ try {
+ // Read the next key value pair from the record reader. If it's
+ // finished, return null
+ if (!reader.nextKeyValue()) return null;
+
+ // Get the current value. We don't use the key.
+ val = (Text)reader.getCurrentValue();
+ } catch (InterruptedException ie) {
+ throw new IOException(ie);
+ }
+
+ // Create a parser specific for this input line. This may not be the
+ // most efficient approach.
+ ByteArrayInputStream bais = new ByteArrayInputStream(val.getBytes());
+ JsonParser p = jsonFactory.createJsonParser(bais);
+
+ // Create the tuple we will be returning. We create it with the right
+ // number of fields, as the Tuple object is optimized for this case.
+ ResourceFieldSchema[] fields = schema.getFields();
+ Tuple t = tupleFactory.newTuple(fields.length);
+
+ // Read the start object marker. Throughout this file if the parsing
+ // isn't what we expect we return a tuple with null fields rather than
+ // throwing an exception. That way a few mangled lines don't fail the
+ // job.
+ if (p.nextToken() != JsonToken.START_OBJECT) {
+ warn("Bad record, could not find start of record " +
+ val.toString(), PigWarning.UDF_WARNING_1);
+ return t;
+ }
+
+ // Read each field in the record
+ for (int i = 0; i < fields.length; i++) {
+ t.set(i, readField(p, fields[i], i));
+ }
+
+ if (p.nextToken() != JsonToken.END_OBJECT) {
+ warn("Bad record, could not find end of record " +
+ val.toString(), PigWarning.UDF_WARNING_1);
+ return t;
+ }
+ p.close();
+ return t;
+ }
+
+ private Object readField(JsonParser p,
+ ResourceFieldSchema field,
+ int fieldnum) throws IOException {
+ // Read the next token
+ JsonToken tok = p.nextToken();
+ if (tok == null) {
+ warn("Early termination of record, expected " + schema.getFields().length
+ + " fields bug found " + fieldnum, PigWarning.UDF_WARNING_1);
+ return null;
+ }
+
+ // Check to see if this value was null
+ if (tok == JsonToken.VALUE_NULL) return null;
+
+ // Read based on our expected type
+ switch (field.getType()) {
+ case DataType.INTEGER:
+ // Read the field name
+ tok = p.nextToken();
+ if (tok == JsonToken.VALUE_NULL) return null;
+ return p.getIntValue();
+
+ case DataType.LONG:
+ tok = p.nextToken();
+ if (tok == JsonToken.VALUE_NULL) return null;
+ return p.getLongValue();
+
+ case DataType.FLOAT:
+ tok = p.nextToken();
+ return p.getFloatValue();
+
+ case DataType.DOUBLE:
+ tok = p.nextToken();
+ if (tok == JsonToken.VALUE_NULL) return null;
+ return p.getDoubleValue();
+
+ case DataType.BYTEARRAY:
+ tok = p.nextToken();
+ if (tok == JsonToken.VALUE_NULL) return null;
+ byte[] b = p.getText().getBytes();
+ // Use the DBA constructor that copies the bytes so that we own
+ // the memory
+ return new DataByteArray(b, 0, b.length);
+
+ case DataType.CHARARRAY:
+ tok = p.nextToken();
+ if (tok == JsonToken.VALUE_NULL) return null;
+ return p.getText();
+
+ case DataType.MAP:
+ // Should be a start of the map object
+ if (p.nextToken() != JsonToken.START_OBJECT) {
+ warn("Bad map field, could not find start of object, field "
+ + fieldnum, PigWarning.UDF_WARNING_1);
+ return null;
+ }
+ Map<String, String> m = new HashMap<String, String>();
+ while (p.nextToken() != JsonToken.END_OBJECT) {
+ String k = p.getCurrentName();
+ String v = p.getText();
+ m.put(k, v);
+ }
+ return m;
+
+ case DataType.TUPLE:
+ if (p.nextToken() != JsonToken.START_OBJECT) {
+ warn("Bad tuple field, could not find start of object, "
+ + "field " + fieldnum, PigWarning.UDF_WARNING_1);
+ return null;
+ }
+
+ ResourceSchema s = field.getSchema();
+ ResourceFieldSchema[] fs = s.getFields();
+ Tuple t = tupleFactory.newTuple(fs.length);
+
+ for (int j = 0; j < fs.length; j++) {
+ t.set(j, readField(p, fs[j], j));
+ }
+
+ if (p.nextToken() != JsonToken.END_OBJECT) {
+ warn("Bad tuple field, could not find end of object, "
+ + "field " + fieldnum, PigWarning.UDF_WARNING_1);
+ return null;
+ }
+ return t;
+
+ case DataType.BAG:
+ if (p.nextToken() != JsonToken.START_ARRAY) {
+ warn("Bad bag field, could not find start of array, "
+ + "field " + fieldnum, PigWarning.UDF_WARNING_1);
+ return null;
+ }
+
+ s = field.getSchema();
+ fs = s.getFields();
+ // Drill down the next level to the tuple's schema.
+ s = fs[0].getSchema();
+ fs = s.getFields();
+
+ DataBag bag = bagFactory.newDefaultBag();
+
+ JsonToken innerTok;
+ while ((innerTok = p.nextToken()) != JsonToken.END_ARRAY) {
+ if (innerTok != JsonToken.START_OBJECT) {
+ warn("Bad bag tuple field, could not find start of "
+ + "object, field " + fieldnum, PigWarning.UDF_WARNING_1);
+ return null;
+ }
+
+ t = tupleFactory.newTuple(fs.length);
+ for (int j = 0; j < fs.length; j++) {
+ t.set(j, readField(p, fs[j], j));
+ }
+
+ if (p.nextToken() != JsonToken.END_OBJECT) {
+ warn("Bad bag tuple field, could not find end of "
+ + "object, field " + fieldnum, PigWarning.UDF_WARNING_1);
+ return null;
+ }
+ bag.add(t);
+ }
+ return bag;
+ default:
+ throw new IOException("Unknown type in input schema: " +
+ field.getType());
+ }
+
+ }
+
+ //------------------------------------------------------------------------
+
+ public void setUDFContextSignature(String signature) {
+ udfcSignature = signature;
+ }
+
+ public ResourceSchema getSchema(String location, Job job)
+ throws IOException {
+
+ ResourceSchema s;
+ if (schema!=null) {
+ s = schema;
+ } else {
+ // Parse the schema
+ s = (new JsonMetadata()).getSchema(location, job, true);
+
+ if (s == null) {
+ throw new IOException("Unable to parse schema found in file in " + location);
+ }
+ }
+
+ // Now that we have determined the schema, store it in our
+ // UDFContext properties object so we have it when we need it on the
+ // backend
+ UDFContext udfc = UDFContext.getUDFContext();
+ Properties p =
+ udfc.getUDFProperties(this.getClass(), new String[]{udfcSignature});
+ p.setProperty(SCHEMA_SIGNATURE, s.toString());
+
+ return s;
+ }
+
+ public ResourceStatistics getStatistics(String location, Job job)
+ throws IOException {
+ // We don't implement this one.
+ return null;
+ }
+
+ public String[] getPartitionKeys(String location, Job job)
+ throws IOException {
+ // We don't have partitions
+ return null;
+ }
+
+ public void setPartitionFilter(Expression partitionFilter)
+ throws IOException {
+ // We don't have partitions
+ }
+}
Added: pig/trunk/src/org/apache/pig/builtin/JsonStorage.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/builtin/JsonStorage.java?rev=1199668&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/builtin/JsonStorage.java (added)
+++ pig/trunk/src/org/apache/pig/builtin/JsonStorage.java Wed Nov 9 09:08:39 2011
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.builtin;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.Map;
+import java.util.Properties;
+
+import org.codehaus.jackson.JsonEncoding;
+import org.codehaus.jackson.JsonFactory;
+import org.codehaus.jackson.JsonGenerator;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+
+import org.apache.pig.ResourceSchema;
+import org.apache.pig.ResourceSchema.ResourceFieldSchema;
+import org.apache.pig.ResourceStatistics;
+import org.apache.pig.StoreMetadata;
+import org.apache.pig.StoreFunc;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.impl.util.UDFContext;
+import org.apache.pig.impl.util.Utils;
+
+/**
+ * A JSON Pig store function. Each Pig tuple is stored on one line (as one
+ * value for TextOutputFormat) so that it can be read easily using
+ * TextInputFormat. Pig tuples are mapped to JSON objects. Pig bags are
+ * mapped to JSON arrays. Pig maps are also mapped to JSON objects. Maps are
+ * assumed to be string to string. A schema is stored in a side file to deal
+ * with mapping between JSON and Pig types. The schema file share the same format
+ * as the one we use in PigStorage.
+ */
+public class JsonStorage extends StoreFunc implements StoreMetadata {
+
+ protected RecordWriter writer = null;
+ protected ResourceSchema schema = null;
+
+ private String udfcSignature = null;
+ private JsonFactory jsonFactory = null;
+
+ // Default size for the byte buffer, should fit most tuples.
+ private static final int BUF_SIZE = 4 * 1024;
+
+ private static final String SCHEMA_SIGNATURE = "pig.jsonstorage.schema";
+
+ /*
+ * Methods called on the front end
+ */
+
+ @Override
+ public OutputFormat getOutputFormat() throws IOException {
+ // We will use TextOutputFormat, the default Hadoop output format for
+ // text. The key is unused and the value will be a
+ // Text (a string writable type) that we store our JSON data in.
+ return new TextOutputFormat<LongWritable, Text>();
+ }
+
+ @Override
+ public void setStoreLocation(String location, Job job) throws IOException {
+ // FileOutputFormat has a utility method for setting up the output
+ // location.
+ FileOutputFormat.setOutputPath(job, new Path(location));
+ }
+
+ @Override
+ public void setStoreFuncUDFContextSignature(String signature) {
+ // store the signature so we can use it later
+ udfcSignature = signature;
+ }
+
+ @Override
+ public void checkSchema(ResourceSchema s) throws IOException {
+ // We won't really check the schema here, we'll store it in our
+ // UDFContext properties object so we have it when we need it on the
+ // backend
+
+ UDFContext udfc = UDFContext.getUDFContext();
+ Properties p =
+ udfc.getUDFProperties(this.getClass(), new String[]{udfcSignature});
+ p.setProperty(SCHEMA_SIGNATURE, s.toString());
+ }
+
+
+ /*
+ * Methods called on the back end
+ */
+
+ @Override
+ public void prepareToWrite(RecordWriter writer) throws IOException {
+ // Store the record writer reference so we can use it when it's time
+ // to write tuples
+ this.writer = writer;
+
+ // Get the schema string from the UDFContext object.
+ UDFContext udfc = UDFContext.getUDFContext();
+ Properties p =
+ udfc.getUDFProperties(this.getClass(), new String[]{udfcSignature});
+ String strSchema = p.getProperty(SCHEMA_SIGNATURE);
+ if (strSchema == null) {
+ throw new IOException("Could not find schema in UDF context");
+ }
+
+ // Parse the schema from the string stored in the properties object.
+ schema = new ResourceSchema(Utils.getSchemaFromString(strSchema));
+
+ // Build a Json factory
+ jsonFactory = new JsonFactory();
+ }
+
+ @SuppressWarnings("unchecked")
+ public void putNext(Tuple t) throws IOException {
+ // Build a ByteArrayOutputStream to write the JSON into
+ ByteArrayOutputStream baos = new ByteArrayOutputStream(BUF_SIZE);
+ // Build the generator
+ JsonGenerator json =
+ jsonFactory.createJsonGenerator(baos, JsonEncoding.UTF8);
+
+ // Write the beginning of the top level tuple object
+ json.writeStartObject();
+
+ ResourceFieldSchema[] fields = schema.getFields();
+ for (int i = 0; i < fields.length; i++) {
+ writeField(json, fields[i], t.get(i));
+ }
+ json.writeEndObject();
+ json.close();
+
+ // Hand a null key and our string to Hadoop
+ try {
+ writer.write(null, new Text(baos.toByteArray()));
+ } catch (InterruptedException ie) {
+ throw new IOException(ie);
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ private void writeField(JsonGenerator json,
+ ResourceFieldSchema field,
+ Object d) throws IOException {
+
+ // If the field is missing or the value is null, write a null
+ if (d == null) {
+ json.writeNullField(field.getName());
+ return;
+ }
+
+ // Based on the field's type, write it out
+ switch (field.getType()) {
+ case DataType.INTEGER:
+ json.writeNumberField(field.getName(), (Integer)d);
+ return;
+
+ case DataType.LONG:
+ json.writeNumberField(field.getName(), (Long)d);
+ return;
+
+ case DataType.FLOAT:
+ json.writeNumberField(field.getName(), (Float)d);
+ return;
+
+ case DataType.DOUBLE:
+ json.writeNumberField(field.getName(), (Double)d);
+ return;
+
+ case DataType.BYTEARRAY:
+ json.writeStringField(field.getName(), d.toString());
+ return;
+
+ case DataType.CHARARRAY:
+ json.writeStringField(field.getName(), (String)d);
+ return;
+
+ case DataType.MAP:
+ json.writeFieldName(field.getName());
+ json.writeStartObject();
+ for (Map.Entry<String, Object> e : ((Map<String, Object>)d).entrySet()) {
+ json.writeStringField(e.getKey(), e.getValue().toString());
+ }
+ json.writeEndObject();
+ return;
+
+ case DataType.TUPLE:
+ json.writeFieldName(field.getName());
+ json.writeStartObject();
+
+ ResourceSchema s = field.getSchema();
+ if (s == null) {
+ throw new IOException("Schemas must be fully specified to use "
+ + "this storage function. No schema found for field " +
+ field.getName());
+ }
+ ResourceFieldSchema[] fs = s.getFields();
+
+ for (int j = 0; j < fs.length; j++) {
+ writeField(json, fs[j], ((Tuple)d).get(j));
+ }
+ json.writeEndObject();
+ return;
+
+ case DataType.BAG:
+ json.writeFieldName(field.getName());
+ json.writeStartArray();
+ s = field.getSchema();
+ if (s == null) {
+ throw new IOException("Schemas must be fully specified to use "
+ + "this storage function. No schema found for field " +
+ field.getName());
+ }
+ fs = s.getFields();
+ if (fs.length != 1 || fs[0].getType() != DataType.TUPLE) {
+ throw new IOException("Found a bag without a tuple "
+ + "inside!");
+ }
+ // Drill down the next level to the tuple's schema.
+ s = fs[0].getSchema();
+ if (s == null) {
+ throw new IOException("Schemas must be fully specified to use "
+ + "this storage function. No schema found for field " +
+ field.getName());
+ }
+ fs = s.getFields();
+ for (Tuple t : (DataBag)d) {
+ json.writeStartObject();
+ for (int j = 0; j < fs.length; j++) {
+ writeField(json, fs[j], t.get(j));
+ }
+ json.writeEndObject();
+ }
+ json.writeEndArray();
+ return;
+ }
+ }
+
+ public void storeStatistics(ResourceStatistics stats,
+ String location,
+ Job job) throws IOException {
+ // We don't implement this method
+ }
+
+ public void storeSchema(ResourceSchema schema, String location, Job job)
+ throws IOException {
+ // Store the schema in a side file in the same directory. MapReduce
+ // does not include files starting with "_" when reading data for a job.
+ JsonMetadata metadataWriter = new JsonMetadata();
+ byte recordDel = '\n';
+ byte fieldDel = '\t';
+ metadataWriter.setFieldDel(fieldDel);
+ metadataWriter.setRecordDel(recordDel);
+ metadataWriter.storeSchema(schema, location, job);
+ }
+
+}
\ No newline at end of file
Modified: pig/trunk/test/e2e/pig/tests/nightly.conf
URL: http://svn.apache.org/viewvc/pig/trunk/test/e2e/pig/tests/nightly.conf?rev=1199668&r1=1199667&r2=1199668&view=diff
==============================================================================
--- pig/trunk/test/e2e/pig/tests/nightly.conf (original)
+++ pig/trunk/test/e2e/pig/tests/nightly.conf Wed Nov 9 09:08:39 2011
@@ -3993,6 +3993,37 @@ store E into ':OUTPATH:';\,
store e into ':OUTPATH:';?,
}
],
+ },{
+ 'name' => 'JsonLoaderStorage',
+ 'tests' => [
+ {
+ 'num' => 1,
+ 'pig' => q?A = LOAD ':INPATH:/singlefile/studenttab10k' AS (name:chararray, age:int, gpa:double);
+ store A into ':OUTPATH:.intermediate' using JsonStorage();
+ exec
+ A = LOAD ':OUTPATH:.intermediate' using JsonLoader();
+ store A into ':OUTPATH:';?,
+ 'notmq' => 1,
+ 'verify_pig_script' => q?A = LOAD ':INPATH:/singlefile/studenttab10k' AS (name:chararray, age:int,gpa:double);
+ store A into ':OUTPATH:';?,
+ }, {
+ 'num' => 2,
+ 'pig' => q?A = LOAD ':INPATH:/singlefile/studenttab10k' AS (name:chararray, age:int, gpa:double);
+ store A into ':OUTPATH:.intermediate1' using JsonStorage();
+ B = LOAD ':INPATH:/singlefile/votertab10k' AS (name:chararray, age:int, registration:chararray, contributions:double);
+ store B into ':OUTPATH:.intermediate2' using JsonStorage();
+ exec
+ A = LOAD ':OUTPATH:.intermediate1' using JsonLoader();
+ B = LOAD ':OUTPATH:.intermediate2' using JsonLoader();
+ C = JOIN A by name, B by name;
+ store C into ':OUTPATH:';?,
+ 'notmq' => 1,
+ 'verify_pig_script' => q?A = LOAD ':INPATH:/singlefile/studenttab10k' AS (name:chararray, age:int,gpa:double);
+ B = LOAD ':INPATH:/singlefile/votertab10k' AS (name:chararray, age:int, registration:chararray, contributions:double);
+ C = JOIN A by name, B by name;
+ store C into ':OUTPATH:';?,
+ }
+ ],
},
],
},
Added: pig/trunk/test/org/apache/pig/test/TestJsonLoaderStorage.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestJsonLoaderStorage.java?rev=1199668&view=auto
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestJsonLoaderStorage.java (added)
+++ pig/trunk/test/org/apache/pig/test/TestJsonLoaderStorage.java Wed Nov 9 09:08:39 2011
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.PrintWriter;
+import java.util.Iterator;
+
+import junit.framework.Assert;
+
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.data.Tuple;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class TestJsonLoaderStorage {
+ private static PigServer pigServer;
+ File jsonFile;
+
+ @BeforeClass
+ public static void setUp() throws Exception{
+ removeOutput();
+ pigServer = new PigServer(ExecType.LOCAL);
+ }
+
+ private static void removeOutput() {
+ File outputDir = new File("jsonStorage1.json");
+ if (outputDir.exists()) {
+ for (File c : outputDir.listFiles())
+ c.delete();
+ outputDir.delete();
+ }
+ }
+
+ @Test
+ public void testJsonStorage1() throws Exception{
+ removeOutput();
+
+ pigServer.registerScript("test/org/apache/pig/test/data/jsonStorage1.pig");
+
+ File resultFile = new File("jsonStorage1.json/part-m-00000");
+
+ String result = Util.readFile(resultFile);
+ String expected = Util.readFile(new File("test/org/apache/pig/test/data/jsonStorage1.result"));
+ Assert.assertTrue(result.equals(expected));
+
+ File schemaFile = new File("jsonStorage1.json/.pig_schema");
+ result = Util.readFile(schemaFile);
+ expected = Util.readFile(new File("test/org/apache/pig/test/data/jsonStorage1.schema"));
+ Assert.assertTrue(result.equals(expected));
+ }
+
+ @Test
+ public void testJsonLoader1() throws Exception{
+
+ File tmpFile = File.createTempFile("tmp", null);
+ tmpFile.delete();
+
+ pigServer.registerQuery("a = load 'jsonStorage1.json' using JsonLoader();");
+ pigServer.store("a", tmpFile.getCanonicalPath());
+
+ String result = Util.readFile(new File(tmpFile.getCanonicalPath()+"/part-m-00000"));
+ String expected = Util.readFile(new File("test/org/apache/pig/test/data/jsonStorage1.txt"));
+ Assert.assertTrue(result.equals(expected));
+ }
+
+ @Test
+ public void testJsonLoader2() throws Exception{
+
+ File tmpFile = File.createTempFile("tmp", null);
+ tmpFile.delete();
+
+ File schemaFile = new File("test/org/apache/pig/test/data/jsonStorage1.schema");
+ schemaFile.delete();
+
+ pigServer.registerQuery("a = load 'jsonStorage1.json' using" +
+ " JsonLoader('a0:int,a1:{(a10:int,a11:chararray)},a2:(a20:double,a21:bytearray),a3:[chararray]');");
+ pigServer.store("a", tmpFile.getCanonicalPath());
+
+ String result = Util.readFile(new File(tmpFile.getCanonicalPath()+"/part-m-00000"));
+ String expected = Util.readFile(new File("test/org/apache/pig/test/data/jsonStorage1.txt"));
+ Assert.assertTrue(result.equals(expected));
+ }
+
+ @Test
+ public void testJsonStorage2() throws Exception{
+
+ File inputFile = File.createTempFile("tmp", null);
+ PrintWriter pw = new PrintWriter(new FileWriter(inputFile));
+ pw.println("\t\t\t");
+ pw.close();
+
+ File interFile = File.createTempFile("tmp", null);
+ interFile.delete();
+
+ pigServer.registerQuery("a = load '" + inputFile.getCanonicalPath() + "' as (a0:int, a1:chararray, a2, a3:(a30:int));");
+ pigServer.store("a", interFile.getCanonicalPath(), "JsonStorage");
+
+ pigServer.registerQuery("b = load '" + interFile.getCanonicalPath() + "' using JsonLoader();");
+ Iterator<Tuple> iter = pigServer.openIterator("b");
+
+ Tuple t = iter.next();
+
+ Assert.assertTrue(t.size()==4);
+ Assert.assertTrue(t.get(0)==null);
+ Assert.assertTrue(t.get(1)==null);
+ Assert.assertTrue(t.get(2)==null);
+ Assert.assertTrue(t.get(3)==null);
+
+ Assert.assertFalse(iter.hasNext());
+ }
+
+ @AfterClass
+ public static void tearDown() {
+ removeOutput();
+ }
+}
\ No newline at end of file
Modified: pig/trunk/test/org/apache/pig/test/Util.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/Util.java?rev=1199668&r1=1199667&r2=1199668&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/Util.java (original)
+++ pig/trunk/test/org/apache/pig/test/Util.java Wed Nov 9 09:08:39 2011
@@ -24,6 +24,7 @@ import java.io.BufferedReader;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileInputStream;
+import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.FileReader;
import java.io.FileWriter;
@@ -1129,6 +1130,15 @@ public class Util {
Assert.assertEquals("Comparing actual and expected results. ",
expectedResList, actualResList);
}
-
+ public static String readFile(File file) throws IOException {
+ BufferedReader reader = new BufferedReader(new FileReader(file));
+ String result = "";
+ String line;
+ while ((line=reader.readLine())!=null) {
+ result += line;
+ result += "\n";
+ }
+ return result;
+ }
}
Added: pig/trunk/test/org/apache/pig/test/data/jsonStorage1.pig
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/data/jsonStorage1.pig?rev=1199668&view=auto
==============================================================================
--- pig/trunk/test/org/apache/pig/test/data/jsonStorage1.pig (added)
+++ pig/trunk/test/org/apache/pig/test/data/jsonStorage1.pig Wed Nov 9 09:08:39 2011
@@ -0,0 +1,2 @@
+a = load 'test/org/apache/pig/test/data/jsonStorage1.txt' as (a0:int, a1:{t:(a10:int, a11:chararray)},a2:(a20:double, a21), a3:map[chararray]);
+store a into 'jsonStorage1.json' using JsonStorage();
Added: pig/trunk/test/org/apache/pig/test/data/jsonStorage1.result
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/data/jsonStorage1.result?rev=1199668&view=auto
==============================================================================
--- pig/trunk/test/org/apache/pig/test/data/jsonStorage1.result (added)
+++ pig/trunk/test/org/apache/pig/test/data/jsonStorage1.result Wed Nov 9 09:08:39 2011
@@ -0,0 +1,2 @@
+{"a0":1,"a1":[{"a10":1,"a11":"tom"},{"a10":2,"a11":"jerry"}],"a2":{"a20":1.01,"a21":"sun"},"a3":{"key3":"c","key2":"b","key1":"a"}}
+{"a0":2,"a1":[{"a10":6,"a11":"cat"},{"a10":7,"a11":"dog"},{"a10":8,"a11":"pig"}],"a2":{"a20":2.3,"a21":"moon"},"a3":{"key4":"value4","key1":"value1"}}
Added: pig/trunk/test/org/apache/pig/test/data/jsonStorage1.txt
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/data/jsonStorage1.txt?rev=1199668&view=auto
==============================================================================
--- pig/trunk/test/org/apache/pig/test/data/jsonStorage1.txt (added)
+++ pig/trunk/test/org/apache/pig/test/data/jsonStorage1.txt Wed Nov 9 09:08:39 2011
@@ -0,0 +1,2 @@
+1 {(1,tom),(2,jerry)} (1.01,sun) [key3#c,key2#b,key1#a]
+2 {(6,cat),(7,dog),(8,pig)} (2.3,moon) [key4#value4,key1#value1]