You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2015/03/18 18:04:00 UTC

svn commit: r1667589 - in /pig/trunk: ./ src/org/apache/pig/impl/util/avro/ test/org/apache/pig/builtin/ test/org/apache/pig/impl/util/avro/

Author: daijy
Date: Wed Mar 18 17:03:59 2015
New Revision: 1667589

URL: http://svn.apache.org/r1667589
Log:
PIG-4463: AvroMapWrapper still leaks Avro data types and AvroStorageDataConversionUtilities do not handle Pig maps

Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/org/apache/pig/impl/util/avro/AvroMapWrapper.java
    pig/trunk/src/org/apache/pig/impl/util/avro/AvroStorageDataConversionUtilities.java
    pig/trunk/test/org/apache/pig/builtin/TestAvroStorage.java
    pig/trunk/test/org/apache/pig/impl/util/avro/TestAvroStorageSchemaConversionUtilities.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1667589&r1=1667588&r2=1667589&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Wed Mar 18 17:03:59 2015
@@ -54,6 +54,9 @@ PIG-4333: Split BigData tests into multi
  
 BUG FIXES
 
+PIG-4463: AvroMapWrapper still leaks Avro data types and AvroStorageDataConversionUtilities do not handle
+ Pig maps (rdsr via daijy)
+
 PIG-4460: TestBuiltIn testValueListOutputSchemaComplexType and testValueSetOutputSchemaComplexType tests
  create bags whose inner schema is not a tuple (erwaman via daijy)
 

Modified: pig/trunk/src/org/apache/pig/impl/util/avro/AvroMapWrapper.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/util/avro/AvroMapWrapper.java?rev=1667589&r1=1667588&r2=1667589&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/util/avro/AvroMapWrapper.java (original)
+++ pig/trunk/src/org/apache/pig/impl/util/avro/AvroMapWrapper.java Wed Mar 18 17:03:59 2015
@@ -117,11 +117,7 @@ public final class AvroMapWrapper implem
         new Function() {
             @Override
             public Object apply(final Object v) {
-              if (v instanceof Utf8) {
-                return v.toString();
-              } else {
-                return v;
-              }
+              return AvroTupleWrapper.getPigObject(v);
             }
           }
         );
@@ -133,18 +129,13 @@ public final class AvroMapWrapper implem
         Sets.newHashSetWithExpectedSize(innerMap.size());
     for (java.util.Map.Entry<CharSequence, Object> e : innerMap.entrySet()) {
       CharSequence k = e.getKey();
-      Object v = e.getValue();
+      final Object v = AvroTupleWrapper.getPigObject(e.getValue());
       if (k instanceof Utf8) {
         k = k.toString();
       }
-      if (v instanceof Utf8) {
-        v = v.toString();
-      }
       theSet.add(new AbstractMap.SimpleEntry<CharSequence, Object>(k, v));
     }
-
     return theSet;
-
   }
 
 }

Modified: pig/trunk/src/org/apache/pig/impl/util/avro/AvroStorageDataConversionUtilities.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/util/avro/AvroStorageDataConversionUtilities.java?rev=1667589&r1=1667588&r2=1667589&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/util/avro/AvroStorageDataConversionUtilities.java (original)
+++ pig/trunk/src/org/apache/pig/impl/util/avro/AvroStorageDataConversionUtilities.java Wed Mar 18 17:03:59 2015
@@ -21,11 +21,14 @@ package org.apache.pig.impl.util.avro;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
 
 import org.apache.avro.Schema;
 import org.apache.avro.Schema.Field;
 import org.apache.avro.Schema.Type;
 import org.apache.avro.generic.GenericData;
+import org.apache.avro.util.Utf8;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.DataByteArray;
 import org.apache.pig.data.DataType;
@@ -54,35 +57,7 @@ public class AvroStorageDataConversionUt
       for (Field f : s.getFields()) {
         Object o = t.get(f.pos());
         Schema innerSchema = f.schema();
-        if (AvroStorageSchemaConversionUtilities.isNullableUnion(innerSchema)) {
-          if (o == null) {
-            record.put(f.pos(), null);
-            continue;
-          }
-          innerSchema = AvroStorageSchemaConversionUtilities
-              .removeSimpleUnion(innerSchema);
-        }
-        switch(innerSchema.getType()) {
-        case RECORD:
-          record.put(f.pos(), packIntoAvro((Tuple) o, innerSchema));
-          break;
-        case ARRAY:
-          record.put(f.pos(), packIntoAvro((DataBag) o, innerSchema));
-          break;
-        case BYTES:
-          record.put(f.pos(), ByteBuffer.wrap(((DataByteArray) o).get()));
-          break;
-        case FIXED:
-          record.put(f.pos(), new GenericData.Fixed(
-              innerSchema, ((DataByteArray) o).get()));
-          break;
-        default:
-          if (t.getType(f.pos()) == DataType.DATETIME) {
-            record.put(f.pos(), ((DateTime) o).getMillis() );
-          } else {
-            record.put(f.pos(), o);
-          }
-        }
+        record.put(f.pos(), packIntoAvro(o, innerSchema));
       }
       return record;
     } catch (Exception e) {
@@ -123,5 +98,52 @@ public class AvroStorageDataConversionUt
     }
   }
 
+  private static Object packIntoAvro(final Object o, Schema s)
+      throws IOException {
+    if (AvroStorageSchemaConversionUtilities.isNullableUnion(s)) {
+      if (o == null) {
+        return null;
+      }
+      s = AvroStorageSchemaConversionUtilities.removeSimpleUnion(s);
+    }
+    // what if o == null and schema doesn't allow it ?
+    switch (s.getType()) {
+      case RECORD:
+        return packIntoAvro((Tuple) o, s);
+      case ARRAY:
+        return packIntoAvro((DataBag) o, s);
+      case MAP:
+        return packIntoAvro((Map<CharSequence, Object>) o, s);
+      case BYTES:
+        return ByteBuffer.wrap(((DataByteArray) o).get());
+      case FIXED:
+        return new GenericData.Fixed(s, ((DataByteArray) o).get());
+      default:
+        if (DataType.findType(o) == DataType.DATETIME) {
+          return ((DateTime) o).getMillis();
+        } else {
+          return o;
+        }
+    }
+  }
 
+  private static Map<Utf8, Object> packIntoAvro(Map<CharSequence, Object> input, Schema schema)
+      throws IOException {
+    final Map<Utf8, Object> output = new HashMap<Utf8, Object>();
+    for (Map.Entry<CharSequence, Object> e : input.entrySet()) {
+      final Utf8 k = utf8(e.getKey());
+      output.put(k, packIntoAvro(e.getValue(), schema.getValueType()));
+    }
+    return output;
+  }
+
+  private static Utf8 utf8(CharSequence v) {
+    if (v instanceof Utf8) {
+      return (Utf8) v;
+    } else {
+      final StringBuilder sb = new StringBuilder(v.length());
+      sb.append(v);
+      return new Utf8(sb.toString());
+    }
+  }
 }

Modified: pig/trunk/test/org/apache/pig/builtin/TestAvroStorage.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/builtin/TestAvroStorage.java?rev=1667589&r1=1667588&r2=1667589&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/builtin/TestAvroStorage.java (original)
+++ pig/trunk/test/org/apache/pig/builtin/TestAvroStorage.java Wed Mar 18 17:03:59 2015
@@ -34,8 +34,11 @@ import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
 import java.util.TreeSet;
+import java.util.Iterator;
 
+import com.google.common.io.Closeables;
 import org.apache.avro.file.DataFileStream;
+import org.apache.avro.generic.GenericContainer;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericData.Record;
 import org.apache.avro.generic.GenericDatumReader;
@@ -56,6 +59,8 @@ import org.apache.pig.PigServer;
 import org.apache.pig.backend.executionengine.ExecJob;
 import org.apache.pig.backend.executionengine.ExecJob.JOB_STATUS;
 import org.apache.pig.builtin.mock.Storage.Data;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.util.Utils;
 import org.apache.pig.impl.util.avro.AvroBagWrapper;
@@ -904,6 +909,94 @@ public class TestAvroStorage {
         assertEquals("bar", v);
     }
 
+    @Test
+    public void testAvroMapWrapper() throws Exception {
+        final Map<CharSequence, Object> m = new HashMap<CharSequence, Object>();
+        for (String fn : avroSchemas) {
+            final String avro = basedir + "data/avro/uncompressed/" + fn + ".avro";
+            int i = 0;
+            for (GenericContainer r : readAvroData(avro)) {
+                m.put(new Utf8(fn + i), r);
+                i += 1;
+            }
+        }
+        final AvroMapWrapper amw = new AvroMapWrapper(m);
+        // Test out all the interfaces the AvroMapWrapper supports
+        for (Object o : amw.values()) {
+            assertTrue(isValidPigObject(o));
+        }
+        for (CharSequence k : amw.keySet()) {
+            assertTrue(isValidPigObject(amw.get(k)));
+        }
+        for (Map.Entry<CharSequence, Object> e : amw.entrySet()) {
+            assertTrue(isValidPigObject(e.getValue()));
+        }
+    }
+
+    private boolean isValidPigObject(Object o) {
+        if (o == null) {
+            return true;
+        }
+        switch (DataType.findType(o)) {
+            case DataType.TUPLE:
+                for (Object inner : ((Tuple) o).getAll()) {
+                    if (!isValidPigObject(inner)) {
+                        return false;
+                    }
+                }
+                return true;
+            case DataType.BAG:
+                final Iterator<Tuple> bi = ((DataBag) o).iterator();
+                while (bi.hasNext()) {
+                    if (!isValidPigObject(bi.next())) {
+                        return false;
+                    }
+                }
+                return true;
+            case DataType.MAP:
+                for (Object inner : ((Map) o).values()) {
+                    if (!isValidPigObject(inner)) {
+                        return false;
+                    }
+                }
+                return true;
+            case DataType.BIGDECIMAL:
+            case DataType.BIGINTEGER:
+            case DataType.BOOLEAN:
+            case DataType.BYTE:
+            case DataType.BYTEARRAY:
+            case DataType.CHARARRAY:
+            case DataType.DATETIME:
+            case DataType.DOUBLE:
+            case DataType.FLOAT:
+            case DataType.GENERIC_WRITABLECOMPARABLE:
+            case DataType.INTEGER:
+            case DataType.LONG:
+                return true;
+            case DataType.ERROR:
+            default:
+                return false;
+        }
+    }
+
+    private List<GenericContainer> readAvroData(String path) throws IOException {
+        final FileSystem fs = FileSystem.getLocal(new Configuration());
+        final Path filePath = new Path(path);
+        assertTrue("File path " + filePath + " does not exists!", fs.exists(filePath));
+        final GenericDatumReader<GenericContainer> reader = new GenericDatumReader<GenericContainer>();
+        final DataFileStream<GenericContainer> in = new DataFileStream<GenericContainer>(fs.open(filePath), reader);
+        final List<GenericContainer> avroData = new ArrayList<GenericContainer>();
+        try {
+            while (in.hasNext()) {
+                GenericContainer obj = in.next();
+                avroData.add(obj);
+            }
+        } finally {
+            Closeables.closeQuietly(in);
+        }
+        return avroData;
+    }
+
     private void testAvroStorage(boolean expectedToSucceed, String scriptFile, Map<String,String> parameterMap) throws IOException {
         pigServerLocal.setBatchOn();
 

Modified: pig/trunk/test/org/apache/pig/impl/util/avro/TestAvroStorageSchemaConversionUtilities.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/impl/util/avro/TestAvroStorageSchemaConversionUtilities.java?rev=1667589&r1=1667588&r2=1667589&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/impl/util/avro/TestAvroStorageSchemaConversionUtilities.java (original)
+++ pig/trunk/test/org/apache/pig/impl/util/avro/TestAvroStorageSchemaConversionUtilities.java Wed Mar 18 17:03:59 2015
@@ -17,18 +17,13 @@
  */
 package org.apache.pig.impl.util.avro;
 
-import com.google.common.collect.ImmutableMap;
 import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericData;
 import org.apache.pig.ResourceSchema;
-import org.apache.pig.data.Tuple;
 import org.junit.Assert;
 import org.junit.Test;
 
-
 import java.io.File;
 import java.io.IOException;
-import java.util.Map;
 
 public class TestAvroStorageSchemaConversionUtilities {
     final private static String BASE_DIR = "test/org/apache/pig/builtin/avro/schema/";
@@ -47,32 +42,6 @@ public class TestAvroStorageSchemaConver
                 parse(BASE_DIR + "nullableArrayInMap.avsc", true));
     }
 
-    /**
-     * Test verifies that Avro records as map values are correctly converted to tuples
-     */
-    @Test
-    public void testRecordAsMapValue() throws IOException {
-        final Schema schema = new Schema.Parser().parse(new File(BASE_DIR, "recordInMap.avsc"));
-        final GenericData.Record record = new GenericData.Record(schema);
-        record.put("key", "k");
-        record.put("value", 42);
-        final Schema valueSchema = schema.getField("parameters").schema().getValueType();
-        final GenericData.Record valueRecord = new GenericData.Record(valueSchema);
-        valueRecord.put("id", 1);
-        record.put("parameters", ImmutableMap.of("record_in_map", valueRecord));
-        final Tuple tuple = new AvroTupleWrapper<GenericData.Record>(record);
-        // Third parameter is the map
-        final Object o = tuple.get(2);
-        Assert.assertTrue(o instanceof Map);
-        final Map<CharSequence, Object> map = (Map<CharSequence, Object>) o;
-        final Object recordInMap = map.get("record_in_map");
-        // Assert that the record got converted to a tuple
-        Assert.assertTrue(recordInMap instanceof Tuple);
-        final Tuple tuple2 = (Tuple) recordInMap;
-        final Object id = tuple2.get(0);
-        Assert.assertEquals(1, id);
-    }
-
     private static String parse(String schema, boolean recursive) throws IOException {
         final Schema s = new Schema.Parser().parse(new File(schema));
         final ResourceSchema resourceSchema = AvroStorageSchemaConversionUtilities.avroSchemaToResourceSchema(s, recursive);