You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2015/03/18 18:04:00 UTC
svn commit: r1667589 - in /pig/trunk: ./ src/org/apache/pig/impl/util/avro/
test/org/apache/pig/builtin/ test/org/apache/pig/impl/util/avro/
Author: daijy
Date: Wed Mar 18 17:03:59 2015
New Revision: 1667589
URL: http://svn.apache.org/r1667589
Log:
PIG-4463: AvroMapWrapper still leaks Avro data types and AvroStorageDataConversionUtilities do not handle Pig maps
Modified:
pig/trunk/CHANGES.txt
pig/trunk/src/org/apache/pig/impl/util/avro/AvroMapWrapper.java
pig/trunk/src/org/apache/pig/impl/util/avro/AvroStorageDataConversionUtilities.java
pig/trunk/test/org/apache/pig/builtin/TestAvroStorage.java
pig/trunk/test/org/apache/pig/impl/util/avro/TestAvroStorageSchemaConversionUtilities.java
Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1667589&r1=1667588&r2=1667589&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Wed Mar 18 17:03:59 2015
@@ -54,6 +54,9 @@ PIG-4333: Split BigData tests into multi
BUG FIXES
+PIG-4463: AvroMapWrapper still leaks Avro data types and AvroStorageDataConversionUtilities do not handle
+ Pig maps (rdsr via daijy)
+
PIG-4460: TestBuiltIn testValueListOutputSchemaComplexType and testValueSetOutputSchemaComplexType tests
create bags whose inner schema is not a tuple (erwaman via daijy)
Modified: pig/trunk/src/org/apache/pig/impl/util/avro/AvroMapWrapper.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/util/avro/AvroMapWrapper.java?rev=1667589&r1=1667588&r2=1667589&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/util/avro/AvroMapWrapper.java (original)
+++ pig/trunk/src/org/apache/pig/impl/util/avro/AvroMapWrapper.java Wed Mar 18 17:03:59 2015
@@ -117,11 +117,7 @@ public final class AvroMapWrapper implem
new Function() {
@Override
public Object apply(final Object v) {
- if (v instanceof Utf8) {
- return v.toString();
- } else {
- return v;
- }
+ return AvroTupleWrapper.getPigObject(v);
}
}
);
@@ -133,18 +129,13 @@ public final class AvroMapWrapper implem
Sets.newHashSetWithExpectedSize(innerMap.size());
for (java.util.Map.Entry<CharSequence, Object> e : innerMap.entrySet()) {
CharSequence k = e.getKey();
- Object v = e.getValue();
+ final Object v = AvroTupleWrapper.getPigObject(e.getValue());
if (k instanceof Utf8) {
k = k.toString();
}
- if (v instanceof Utf8) {
- v = v.toString();
- }
theSet.add(new AbstractMap.SimpleEntry<CharSequence, Object>(k, v));
}
-
return theSet;
-
}
}
Modified: pig/trunk/src/org/apache/pig/impl/util/avro/AvroStorageDataConversionUtilities.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/util/avro/AvroStorageDataConversionUtilities.java?rev=1667589&r1=1667588&r2=1667589&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/util/avro/AvroStorageDataConversionUtilities.java (original)
+++ pig/trunk/src/org/apache/pig/impl/util/avro/AvroStorageDataConversionUtilities.java Wed Mar 18 17:03:59 2015
@@ -21,11 +21,14 @@ package org.apache.pig.impl.util.avro;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Field;
import org.apache.avro.Schema.Type;
import org.apache.avro.generic.GenericData;
+import org.apache.avro.util.Utf8;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataByteArray;
import org.apache.pig.data.DataType;
@@ -54,35 +57,7 @@ public class AvroStorageDataConversionUt
for (Field f : s.getFields()) {
Object o = t.get(f.pos());
Schema innerSchema = f.schema();
- if (AvroStorageSchemaConversionUtilities.isNullableUnion(innerSchema)) {
- if (o == null) {
- record.put(f.pos(), null);
- continue;
- }
- innerSchema = AvroStorageSchemaConversionUtilities
- .removeSimpleUnion(innerSchema);
- }
- switch(innerSchema.getType()) {
- case RECORD:
- record.put(f.pos(), packIntoAvro((Tuple) o, innerSchema));
- break;
- case ARRAY:
- record.put(f.pos(), packIntoAvro((DataBag) o, innerSchema));
- break;
- case BYTES:
- record.put(f.pos(), ByteBuffer.wrap(((DataByteArray) o).get()));
- break;
- case FIXED:
- record.put(f.pos(), new GenericData.Fixed(
- innerSchema, ((DataByteArray) o).get()));
- break;
- default:
- if (t.getType(f.pos()) == DataType.DATETIME) {
- record.put(f.pos(), ((DateTime) o).getMillis() );
- } else {
- record.put(f.pos(), o);
- }
- }
+ record.put(f.pos(), packIntoAvro(o, innerSchema));
}
return record;
} catch (Exception e) {
@@ -123,5 +98,52 @@ public class AvroStorageDataConversionUt
}
}
+ private static Object packIntoAvro(final Object o, Schema s)
+ throws IOException {
+ if (AvroStorageSchemaConversionUtilities.isNullableUnion(s)) {
+ if (o == null) {
+ return null;
+ }
+ s = AvroStorageSchemaConversionUtilities.removeSimpleUnion(s);
+ }
+ // what if o == null and schema doesn't allow it ?
+ switch (s.getType()) {
+ case RECORD:
+ return packIntoAvro((Tuple) o, s);
+ case ARRAY:
+ return packIntoAvro((DataBag) o, s);
+ case MAP:
+ return packIntoAvro((Map<CharSequence, Object>) o, s);
+ case BYTES:
+ return ByteBuffer.wrap(((DataByteArray) o).get());
+ case FIXED:
+ return new GenericData.Fixed(s, ((DataByteArray) o).get());
+ default:
+ if (DataType.findType(o) == DataType.DATETIME) {
+ return ((DateTime) o).getMillis();
+ } else {
+ return o;
+ }
+ }
+ }
+ private static Map<Utf8, Object> packIntoAvro(Map<CharSequence, Object> input, Schema schema)
+ throws IOException {
+ final Map<Utf8, Object> output = new HashMap<Utf8, Object>();
+ for (Map.Entry<CharSequence, Object> e : input.entrySet()) {
+ final Utf8 k = utf8(e.getKey());
+ output.put(k, packIntoAvro(e.getValue(), schema.getValueType()));
+ }
+ return output;
+ }
+
+ private static Utf8 utf8(CharSequence v) {
+ if (v instanceof Utf8) {
+ return (Utf8) v;
+ } else {
+ final StringBuilder sb = new StringBuilder(v.length());
+ sb.append(v);
+ return new Utf8(sb.toString());
+ }
+ }
}
Modified: pig/trunk/test/org/apache/pig/builtin/TestAvroStorage.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/builtin/TestAvroStorage.java?rev=1667589&r1=1667588&r2=1667589&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/builtin/TestAvroStorage.java (original)
+++ pig/trunk/test/org/apache/pig/builtin/TestAvroStorage.java Wed Mar 18 17:03:59 2015
@@ -34,8 +34,11 @@ import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
+import java.util.Iterator;
+import com.google.common.io.Closeables;
import org.apache.avro.file.DataFileStream;
+import org.apache.avro.generic.GenericContainer;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericData.Record;
import org.apache.avro.generic.GenericDatumReader;
@@ -56,6 +59,8 @@ import org.apache.pig.PigServer;
import org.apache.pig.backend.executionengine.ExecJob;
import org.apache.pig.backend.executionengine.ExecJob.JOB_STATUS;
import org.apache.pig.builtin.mock.Storage.Data;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.util.Utils;
import org.apache.pig.impl.util.avro.AvroBagWrapper;
@@ -904,6 +909,94 @@ public class TestAvroStorage {
assertEquals("bar", v);
}
+ @Test
+ public void testAvroMapWrapper() throws Exception {
+ final Map<CharSequence, Object> m = new HashMap<CharSequence, Object>();
+ for (String fn : avroSchemas) {
+ final String avro = basedir + "data/avro/uncompressed/" + fn + ".avro";
+ int i = 0;
+ for (GenericContainer r : readAvroData(avro)) {
+ m.put(new Utf8(fn + i), r);
+ i += 1;
+ }
+ }
+ final AvroMapWrapper amw = new AvroMapWrapper(m);
+ // Test out all the interfaces the AvroMapWrapper supports
+ for (Object o : amw.values()) {
+ assertTrue(isValidPigObject(o));
+ }
+ for (CharSequence k : amw.keySet()) {
+ assertTrue(isValidPigObject(amw.get(k)));
+ }
+ for (Map.Entry<CharSequence, Object> e : amw.entrySet()) {
+ assertTrue(isValidPigObject(e.getValue()));
+ }
+ }
+
+ private boolean isValidPigObject(Object o) {
+ if (o == null) {
+ return true;
+ }
+ switch (DataType.findType(o)) {
+ case DataType.TUPLE:
+ for (Object inner : ((Tuple) o).getAll()) {
+ if (!isValidPigObject(inner)) {
+ return false;
+ }
+ }
+ return true;
+ case DataType.BAG:
+ final Iterator<Tuple> bi = ((DataBag) o).iterator();
+ while (bi.hasNext()) {
+ if (!isValidPigObject(bi.next())) {
+ return false;
+ }
+ }
+ return true;
+ case DataType.MAP:
+ for (Object inner : ((Map) o).values()) {
+ if (!isValidPigObject(inner)) {
+ return false;
+ }
+ }
+ return true;
+ case DataType.BIGDECIMAL:
+ case DataType.BIGINTEGER:
+ case DataType.BOOLEAN:
+ case DataType.BYTE:
+ case DataType.BYTEARRAY:
+ case DataType.CHARARRAY:
+ case DataType.DATETIME:
+ case DataType.DOUBLE:
+ case DataType.FLOAT:
+ case DataType.GENERIC_WRITABLECOMPARABLE:
+ case DataType.INTEGER:
+ case DataType.LONG:
+ return true;
+ case DataType.ERROR:
+ default:
+ return false;
+ }
+ }
+
+ private List<GenericContainer> readAvroData(String path) throws IOException {
+ final FileSystem fs = FileSystem.getLocal(new Configuration());
+ final Path filePath = new Path(path);
+ assertTrue("File path " + filePath + " does not exists!", fs.exists(filePath));
+ final GenericDatumReader<GenericContainer> reader = new GenericDatumReader<GenericContainer>();
+ final DataFileStream<GenericContainer> in = new DataFileStream<GenericContainer>(fs.open(filePath), reader);
+ final List<GenericContainer> avroData = new ArrayList<GenericContainer>();
+ try {
+ while (in.hasNext()) {
+ GenericContainer obj = in.next();
+ avroData.add(obj);
+ }
+ } finally {
+ Closeables.closeQuietly(in);
+ }
+ return avroData;
+ }
+
private void testAvroStorage(boolean expectedToSucceed, String scriptFile, Map<String,String> parameterMap) throws IOException {
pigServerLocal.setBatchOn();
Modified: pig/trunk/test/org/apache/pig/impl/util/avro/TestAvroStorageSchemaConversionUtilities.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/impl/util/avro/TestAvroStorageSchemaConversionUtilities.java?rev=1667589&r1=1667588&r2=1667589&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/impl/util/avro/TestAvroStorageSchemaConversionUtilities.java (original)
+++ pig/trunk/test/org/apache/pig/impl/util/avro/TestAvroStorageSchemaConversionUtilities.java Wed Mar 18 17:03:59 2015
@@ -17,18 +17,13 @@
*/
package org.apache.pig.impl.util.avro;
-import com.google.common.collect.ImmutableMap;
import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericData;
import org.apache.pig.ResourceSchema;
-import org.apache.pig.data.Tuple;
import org.junit.Assert;
import org.junit.Test;
-
import java.io.File;
import java.io.IOException;
-import java.util.Map;
public class TestAvroStorageSchemaConversionUtilities {
final private static String BASE_DIR = "test/org/apache/pig/builtin/avro/schema/";
@@ -47,32 +42,6 @@ public class TestAvroStorageSchemaConver
parse(BASE_DIR + "nullableArrayInMap.avsc", true));
}
- /**
- * Test verifies that Avro records as map values are correctly converted to tuples
- */
- @Test
- public void testRecordAsMapValue() throws IOException {
- final Schema schema = new Schema.Parser().parse(new File(BASE_DIR, "recordInMap.avsc"));
- final GenericData.Record record = new GenericData.Record(schema);
- record.put("key", "k");
- record.put("value", 42);
- final Schema valueSchema = schema.getField("parameters").schema().getValueType();
- final GenericData.Record valueRecord = new GenericData.Record(valueSchema);
- valueRecord.put("id", 1);
- record.put("parameters", ImmutableMap.of("record_in_map", valueRecord));
- final Tuple tuple = new AvroTupleWrapper<GenericData.Record>(record);
- // Third parameter is the map
- final Object o = tuple.get(2);
- Assert.assertTrue(o instanceof Map);
- final Map<CharSequence, Object> map = (Map<CharSequence, Object>) o;
- final Object recordInMap = map.get("record_in_map");
- // Assert that the record got converted to a tuple
- Assert.assertTrue(recordInMap instanceof Tuple);
- final Tuple tuple2 = (Tuple) recordInMap;
- final Object id = tuple2.get(0);
- Assert.assertEquals(1, id);
- }
-
private static String parse(String schema, boolean recursive) throws IOException {
final Schema s = new Schema.Parser().parse(new File(schema));
final ResourceSchema resourceSchema = AvroStorageSchemaConversionUtilities.avroSchemaToResourceSchema(s, recursive);