You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2013/09/16 21:42:28 UTC
svn commit: r1523773 - in /pig/trunk: ./ src/org/apache/pig/builtin/
test/org/apache/pig/test/ test/org/apache/pig/test/data/
Author: daijy
Date: Mon Sep 16 19:42:28 2013
New Revision: 1523773
URL: http://svn.apache.org/r1523773
Log:
PIG-3454: Update JsonLoader/JsonStorage
Removed:
pig/trunk/test/org/apache/pig/test/data/jsonStorage1.pig
pig/trunk/test/org/apache/pig/test/data/jsonStorage1.result
pig/trunk/test/org/apache/pig/test/data/jsonStorage1.txt
Modified:
pig/trunk/CHANGES.txt
pig/trunk/src/org/apache/pig/builtin/JsonLoader.java
pig/trunk/src/org/apache/pig/builtin/JsonStorage.java
pig/trunk/test/org/apache/pig/test/TestJsonLoaderStorage.java
Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1523773&r1=1523772&r2=1523773&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Mon Sep 16 19:42:28 2013
@@ -226,6 +226,8 @@ PIG-3013: BinInterSedes improve chararra
BUG FIXES
+PIG-3454: Update JsonLoader/JsonStorage (tyro89 via daijy)
+
PIG-3333: Fix remaining Windows core unit test failures (daijy)
PIG-3426: Add support for removing s3 files (jeremykarn via daijy)
Modified: pig/trunk/src/org/apache/pig/builtin/JsonLoader.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/builtin/JsonLoader.java?rev=1523773&r1=1523772&r2=1523773&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/builtin/JsonLoader.java (original)
+++ pig/trunk/src/org/apache/pig/builtin/JsonLoader.java Mon Sep 16 19:42:28 2013
@@ -22,6 +22,13 @@ import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+import org.joda.time.format.ISODateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
import org.codehaus.jackson.JsonFactory;
import org.codehaus.jackson.JsonParser;
@@ -182,6 +189,11 @@ public class JsonLoader extends LoadFunc
// Read based on our expected type
switch (field.getType()) {
+ case DataType.BOOLEAN:
+ tok = p.nextToken();
+ if (tok == JsonToken.VALUE_NULL) return null;
+ return p.getBooleanValue();
+
case DataType.INTEGER:
// Read the field name
tok = p.nextToken();
@@ -195,6 +207,7 @@ public class JsonLoader extends LoadFunc
case DataType.FLOAT:
tok = p.nextToken();
+ if (tok == JsonToken.VALUE_NULL) return null;
return p.getFloatValue();
case DataType.DOUBLE:
@@ -202,6 +215,12 @@ public class JsonLoader extends LoadFunc
if (tok == JsonToken.VALUE_NULL) return null;
return p.getDoubleValue();
+ case DataType.DATETIME:
+ tok = p.nextToken();
+ if (tok == JsonToken.VALUE_NULL) return null;
+ DateTimeFormatter formatter = ISODateTimeFormat.dateTimeParser();
+ return formatter.withOffsetParsed().parseDateTime(p.getText());
+
case DataType.BYTEARRAY:
tok = p.nextToken();
if (tok == JsonToken.VALUE_NULL) return null;
@@ -215,6 +234,16 @@ public class JsonLoader extends LoadFunc
if (tok == JsonToken.VALUE_NULL) return null;
return p.getText();
+ case DataType.BIGINTEGER:
+ tok = p.nextToken();
+ if (tok == JsonToken.VALUE_NULL) return null;
+ return p.getBigIntegerValue();
+
+ case DataType.BIGDECIMAL:
+ tok = p.nextToken();
+ if (tok == JsonToken.VALUE_NULL) return null;
+ return p.getDecimalValue();
+
case DataType.MAP:
// Should be a start of the map object
if (p.nextToken() != JsonToken.START_OBJECT) {
Modified: pig/trunk/src/org/apache/pig/builtin/JsonStorage.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/builtin/JsonStorage.java?rev=1523773&r1=1523772&r2=1523773&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/builtin/JsonStorage.java (original)
+++ pig/trunk/src/org/apache/pig/builtin/JsonStorage.java Mon Sep 16 19:42:28 2013
@@ -21,6 +21,10 @@ import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.Map;
import java.util.Properties;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+
+import org.joda.time.DateTime;
import org.codehaus.jackson.JsonEncoding;
import org.codehaus.jackson.JsonFactory;
@@ -46,6 +50,13 @@ import org.apache.pig.data.DataBag;
import org.apache.pig.impl.util.UDFContext;
import org.apache.pig.impl.util.Utils;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.TreeMap;
+
/**
* A JSON Pig store function. Each Pig tuple is stored on one line (as one
* value for TextOutputFormat) so that it can be read easily using
@@ -177,6 +188,10 @@ public class JsonStorage extends StoreFu
// Based on the field's type, write it out
switch (field.getType()) {
+ case DataType.BOOLEAN:
+ json.writeBooleanField(field.getName(), (Boolean)d);
+ return;
+
case DataType.INTEGER:
json.writeNumberField(field.getName(), (Integer)d);
return;
@@ -193,6 +208,10 @@ public class JsonStorage extends StoreFu
json.writeNumberField(field.getName(), (Double)d);
return;
+ case DataType.DATETIME:
+ json.writeStringField(field.getName(), d.toString());
+ return;
+
case DataType.BYTEARRAY:
json.writeStringField(field.getName(), d.toString());
return;
@@ -201,6 +220,17 @@ public class JsonStorage extends StoreFu
json.writeStringField(field.getName(), (String)d);
return;
+ case DataType.BIGINTEGER:
+ //Since Jackson doesnt have a writeNumberField for BigInteger we
+ //have to do it manually here.
+ json.writeFieldName(field.getName());
+ json.writeNumber((BigInteger)d);
+ return;
+
+ case DataType.BIGDECIMAL:
+ json.writeNumberField(field.getName(), (BigDecimal)d);
+ return;
+
case DataType.MAP:
json.writeFieldName(field.getName());
json.writeStartObject();
Modified: pig/trunk/test/org/apache/pig/test/TestJsonLoaderStorage.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestJsonLoaderStorage.java?rev=1523773&r1=1523772&r2=1523773&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestJsonLoaderStorage.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestJsonLoaderStorage.java Mon Sep 16 19:42:28 2013
@@ -1,126 +1,291 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
package org.apache.pig.test;
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.backend.executionengine.ExecJob.JOB_STATUS;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.test.Util;
+
import java.io.File;
import java.io.FileWriter;
-import java.io.PrintWriter;
+import java.io.FileReader;
+import java.io.BufferedReader;
+import java.io.IOException;
+
import java.util.Iterator;
+import java.util.Map;
-import junit.framework.Assert;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+
+import org.joda.time.DateTime;
-import org.apache.pig.ExecType;
-import org.apache.pig.PigServer;
-import org.apache.pig.data.Tuple;
-import org.apache.pig.test.utils.TestHelper;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
import org.junit.Test;
+import static junit.framework.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
public class TestJsonLoaderStorage {
- private static PigServer pigServer;
- File jsonFile;
- @BeforeClass
- public static void setUp() throws Exception{
- removeOutput();
- pigServer = new PigServer(ExecType.LOCAL);
+ private static final String schema =
+ "a:boolean," +
+ "b:int," +
+ "c:long," +
+ "d:float," +
+ "e:double," +
+ "f:datetime," +
+ "g:bytearray," +
+ "h:chararray," +
+ "i:biginteger," +
+ "j:bigdecimal," +
+ "k:map[chararray]," +
+ "l:tuple(a:int)," +
+ "m:bag{a:tuple(a:int)}";
+
+ private static final String rawInput =
+ "true\t" +
+ "123\t" +
+ "456\t" +
+ "7.89\t" +
+ "0.12\t" +
+ "2013-01-02T03:04:05.123+03:00\t" +
+ "abc\t" +
+ "def\t" +
+ "123456789\t" +
+ "1234.6789\t" +
+ "[a#ghi]\t" +
+ "(123)\t" +
+ "{(123),(456),(789)}";
+
+ private static final String nullInput =
+ "\t\t\t\t\t\t\t\t\t\t\t\t";
+
+ private static final String json =
+ "{" +
+ "\"a\":true," +
+ "\"b\":123," +
+ "\"c\":456," +
+ "\"d\":7.89," +
+ "\"e\":0.12," +
+ "\"f\":\"2013-01-02T03:04:05.123+03:00\"," +
+ "\"g\":\"abc\"," +
+ "\"h\":\"def\"," +
+ "\"i\":123456789," +
+ "\"j\":1234.6789," +
+ "\"k\":{\"a\":\"ghi\"}," +
+ "\"l\":{\"a\":123}," +
+ "\"m\":[{\"a\":123},{\"a\":456},{\"a\":789}]" +
+ "}";
+
+ private static final String nullJson =
+ "{" +
+ "\"a\":null," +
+ "\"b\":null," +
+ "\"c\":null," +
+ "\"d\":null," +
+ "\"e\":null," +
+ "\"f\":null," +
+ "\"g\":null," +
+ "\"h\":null," +
+ "\"i\":null," +
+ "\"j\":null," +
+ "\"k\":null," +
+ "\"l\":null," +
+ "\"m\":null" +
+ "}";
+
+ private Iterator<Tuple> loadJson(String input) throws IOException {
+ File tempFile = File.createTempFile("json", null);
+ tempFile.deleteOnExit();
+
+ FileWriter writer = new FileWriter(tempFile);
+ writer.write(input);
+ writer.close();
+
+ PigServer pigServer = new PigServer(ExecType.LOCAL);
+ pigServer.registerQuery("data = load '" + tempFile.getAbsolutePath()
+ + "' using JsonLoader('" + schema + "');");
+
+ return pigServer.openIterator("data");
+ }
+
+ private BufferedReader storeJson(String input) throws IOException {
+ File tempJsonFile = File.createTempFile("json", "");
+ tempJsonFile.delete();
+
+ File tempInputFile = File.createTempFile("input", null);
+ tempInputFile.deleteOnExit();
+
+ FileWriter w = new FileWriter(tempInputFile);
+ w.write(input);
+ w.close();
+
+ PigServer pigServer = new PigServer(ExecType.LOCAL);
+ pigServer.registerQuery("data = load '" + tempInputFile.getAbsolutePath()
+ + "' as (" + schema + ");");
+ pigServer.registerQuery("store data into '" + tempJsonFile.getAbsolutePath()
+ + "' using JsonStorage();");
+
+ tempJsonFile.deleteOnExit();
+
+ FileReader r = new FileReader(tempJsonFile.getAbsolutePath() + "/part-m-00000");
+ BufferedReader br = new BufferedReader(r);
+
+ return br;
+ }
+
+ @Test
+ public void testJsonLoader() throws IOException {
+ Iterator<Tuple> tuples = loadJson(json);
+
+ int count = 0;
+ while(tuples.hasNext()) {
+ Tuple tuple = tuples.next();
+ assertEquals(Boolean.class, tuple.get(0).getClass());
+ assertEquals(Integer.class, tuple.get(1).getClass());
+ assertEquals(Long.class, tuple.get(2).getClass());
+ assertEquals(Float.class, tuple.get(3).getClass());
+ assertEquals(Double.class, tuple.get(4).getClass());
+
+ assertEquals(DateTime.class, tuple.get(5).getClass());
+ assertEquals("2013-01-02T03:04:05.123+03:00", tuple.get(5).toString());
+
+ assertEquals(DataByteArray.class, tuple.get(6).getClass());
+ assertEquals(String.class, tuple.get(7).getClass());
+ assertEquals(BigInteger.class, tuple.get(8).getClass());
+ assertEquals(BigDecimal.class, tuple.get(9).getClass());
+
+ assertTrue(tuple.get(10) instanceof Map);
+ assertEquals(String.class, ((Map)tuple.get(10)).get("a").getClass());
+
+ assertTrue(tuple.get(11) instanceof Tuple);
+ assertEquals(Integer.class, ((Tuple)tuple.get(11)).get(0).getClass());
+
+ assertTrue(tuple.get(12) instanceof DataBag);
+
+ DataBag bag = (DataBag)tuple.get(12);
+ assertEquals(3, bag.size());
+
+ Iterator<Tuple> bagTuples = bag.iterator();
+ while(bagTuples.hasNext()) {
+ assertEquals(Integer.class, bagTuples.next().get(0).getClass());
+ }
+
+ count++;
}
-
- private static void removeOutput() {
- File outputDir = new File("jsonStorage1.json");
- if (outputDir.exists()) {
- for (File c : outputDir.listFiles())
- c.delete();
- outputDir.delete();
- }
+
+ assertEquals(1, count);
+ }
+
+ @Test
+ public void testJsonLoaderNull() throws IOException {
+ Iterator<Tuple> tuples = loadJson(nullJson);
+
+ int count = 0;
+ while(tuples.hasNext()) {
+ Tuple tuple = tuples.next();
+
+ assertEquals(null, tuple.get(0));
+ assertEquals(null, tuple.get(1));
+ assertEquals(null, tuple.get(2));
+ assertEquals(null, tuple.get(3));
+ assertEquals(null, tuple.get(4));
+ assertEquals(null, tuple.get(5));
+ assertEquals(null, tuple.get(6));
+ assertEquals(null, tuple.get(7));
+ assertEquals(null, tuple.get(8));
+ assertEquals(null, tuple.get(9));
+ assertEquals(null, tuple.get(10));
+ assertEquals(null, tuple.get(11));
+ assertEquals(null, tuple.get(12));
+
+ count++;
}
-
- @Test
- public void testJsonLoaderStorage1() throws Exception{
- removeOutput();
-
- pigServer.registerScript("test/org/apache/pig/test/data/jsonStorage1.pig");
-
- File resultFile = new File("jsonStorage1.json/part-m-00000");
-
- String result = Util.readFile(resultFile);
- String expected = Util.readFile(new File("test/org/apache/pig/test/data/jsonStorage1.result"));
- Assert.assertEquals(TestHelper.sortStringList(expected,"{","}",","), TestHelper.sortStringList(result,"{","}",","));
-
- File schemaFile = new File("jsonStorage1.json/.pig_schema");
- result = Util.readFile(schemaFile);
- expected = Util.readFile(new File("test/org/apache/pig/test/data/jsonStorage1.schema"));
- Assert.assertEquals(expected, result);
-
- File tmpFile = File.createTempFile("tmp", null);
- tmpFile.delete();
-
- pigServer.registerQuery("a = load 'jsonStorage1.json' using JsonLoader();");
- pigServer.store("a", tmpFile.getCanonicalPath());
-
- result = Util.readFile(new File(tmpFile.getCanonicalPath()+"/part-m-00000"));
- expected = Util.readFile(new File("test/org/apache/pig/test/data/jsonStorage1.txt"));
- Assert.assertEquals(TestHelper.sortStringList(expected,"[","]",","), TestHelper.sortStringList(result,"[","]",","));
-
-
- tmpFile = File.createTempFile("tmp", null);
- tmpFile.delete();
-
- pigServer.registerQuery("a = load 'jsonStorage1.json' using" +
- " JsonLoader('a0:int,a1:{(a10:int,a11:chararray)},a2:(a20:double,a21:bytearray),a3:[chararray]');");
- pigServer.store("a", tmpFile.getCanonicalPath());
-
- result = Util.readFile(new File(tmpFile.getCanonicalPath()+"/part-m-00000"));
- expected = Util.readFile(new File("test/org/apache/pig/test/data/jsonStorage1.txt"));
- Assert.assertEquals(TestHelper.sortStringList(expected,"[","]",","), TestHelper.sortStringList(result,"[","]",","));
+
+ assertEquals(1, count);
+ }
+
+ @Test
+ public void testJsonStorage() throws IOException {
+ BufferedReader br = storeJson(rawInput);
+ String data = br.readLine();
+
+ assertEquals(json, data);
+
+ String line = data;
+ int count = 0;
+ while (line != null) {
+ line = br.readLine();
+ count++;
}
-
- @Test
- public void testJsonLoaderStorage2() throws Exception{
-
- File inputFile = File.createTempFile("tmp", null);
- PrintWriter pw = new PrintWriter(new FileWriter(inputFile));
- pw.println("\t\t\t");
- pw.close();
-
- File interFile = File.createTempFile("tmp", null);
- interFile.delete();
-
- pigServer.registerQuery("a = load '" + Util.encodeEscape(inputFile.getCanonicalPath()) + "' as (a0:int, a1:chararray, a2, a3:(a30:int));");
- pigServer.store("a", interFile.getCanonicalPath(), "JsonStorage");
-
- pigServer.registerQuery("b = load '" + Util.encodeEscape(interFile.getCanonicalPath()) + "' using JsonLoader();");
- Iterator<Tuple> iter = pigServer.openIterator("b");
-
- Tuple t = iter.next();
-
- Assert.assertTrue(t.size()==4);
- Assert.assertTrue(t.get(0)==null);
- Assert.assertTrue(t.get(1)==null);
- Assert.assertTrue(t.get(2)==null);
- Assert.assertTrue(t.get(3)==null);
-
- Assert.assertFalse(iter.hasNext());
+ assertEquals(1, count);
+
+ br.close();
+ }
+
+ @Test
+ public void testJsonStorageNull() throws IOException {
+ BufferedReader br = storeJson(nullInput);
+ String data = br.readLine();
+
+ assertEquals(nullJson, data);
+
+ String line = data;
+ int count = 0;
+ while (line != null) {
+ line = br.readLine();
+ count++;
}
+ assertEquals(1, count);
+
+ br.close();
+ }
+
+ @Test
+ public void testJsonLoaderStorage() throws IOException {
+ File tempJsonFile = File.createTempFile("json", "");
+ tempJsonFile.delete();
+
+ File tempJson2File = File.createTempFile("json2", "");
+ tempJson2File.delete();
+
+ File tempInputFile = File.createTempFile("input", null);
+ tempInputFile.deleteOnExit();
- @AfterClass
- public static void tearDown() {
- removeOutput();
+ FileWriter w = new FileWriter(tempInputFile);
+ w.write(rawInput);
+ w.close();
+
+ PigServer pigServer = new PigServer(ExecType.LOCAL);
+ pigServer.registerQuery("data = load '" + tempInputFile.getAbsolutePath()
+ + "' as (" + schema + ");");
+ pigServer.registerQuery("store data into '" + tempJsonFile.getAbsolutePath()
+ + "' using JsonStorage();");
+ pigServer.registerQuery("json = load '" + tempJsonFile.getAbsolutePath()
+ + "' using JsonLoader('" + schema + "');");
+ pigServer.registerQuery("store json into '" + tempJson2File.getAbsolutePath()
+ + "' using JsonStorage();");
+
+ tempJsonFile.deleteOnExit();
+ tempJson2File.deleteOnExit();
+
+ FileReader r = new FileReader(tempJson2File.getAbsolutePath() + "/part-m-00000");
+
+ BufferedReader br = new BufferedReader(r);
+ String data = br.readLine();
+
+ assertEquals(json, data);
+
+ String line = data;
+ int count = 0;
+ while (line != null) {
+ line = br.readLine();
+ count++;
}
+ assertEquals(1, count);
+
+ br.close();
+ }
}