You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ro...@apache.org on 2013/08/16 00:50:52 UTC

svn commit: r1514523 - in /pig/trunk: CHANGES.txt src/org/apache/pig/impl/util/avro/AvroMapWrapper.java src/org/apache/pig/impl/util/avro/AvroStorageSchemaConversionUtilities.java test/org/apache/pig/builtin/TestAvroStorage.java

Author: rohini
Date: Thu Aug 15 22:50:51 2013
New Revision: 1514523

URL: http://svn.apache.org/r1514523
Log:
PIG-3420: Failed to retrieve map values from data loaded by AvroStorage (yuanlid via rohini)

Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/org/apache/pig/impl/util/avro/AvroMapWrapper.java
    pig/trunk/src/org/apache/pig/impl/util/avro/AvroStorageSchemaConversionUtilities.java
    pig/trunk/test/org/apache/pig/builtin/TestAvroStorage.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1514523&r1=1514522&r2=1514523&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Thu Aug 15 22:50:51 2013
@@ -212,6 +212,8 @@ PIG-3013: BinInterSedes improve chararra
 
 BUG FIXES
 
+PIG-3420: Failed to retrieve map values from data loaded by AvroStorage (yuanlid via rohini)
+
 PIG-3414: QueryParserDriver.parseSchema(String) silently returns a wrong result when a comma is missing in the schema definition (cheolsoo)
 
 PIG-3412: jsonstorage breaks when tuple does not have as many columns as schema (aesilberstein via cheolsoo)

Modified: pig/trunk/src/org/apache/pig/impl/util/avro/AvroMapWrapper.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/util/avro/AvroMapWrapper.java?rev=1514523&r1=1514522&r2=1514523&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/util/avro/AvroMapWrapper.java (original)
+++ pig/trunk/src/org/apache/pig/impl/util/avro/AvroMapWrapper.java Thu Aug 15 22:50:51 2013
@@ -39,6 +39,7 @@ public final class AvroMapWrapper implem
    * The map contained in the wrapper object.
    */
   private Map<CharSequence, Object> innerMap;
+  private boolean isUtf8key;
 
   /**
    * Creates a new AvroMapWrapper object from the map object {@m}.
@@ -46,6 +47,10 @@ public final class AvroMapWrapper implem
    */
   public AvroMapWrapper(final Map<CharSequence, Object> m) {
     innerMap = m;
+    if (m.keySet().size() > 0 && m.keySet().iterator().next() instanceof Utf8)
+      isUtf8key = true;
+    else
+      isUtf8key = false;
   }
 
   @Override
@@ -70,7 +75,13 @@ public final class AvroMapWrapper implem
 
   @Override
   public Object get(final Object key) {
-    Object v = innerMap.get(key);
+    Object v = null;
+    if (isUtf8key) {
+      v = innerMap.get(new Utf8((String) key));
+    } else {
+      v = innerMap.get(key);
+    }
+    
     if (v instanceof Utf8) {
       return v.toString();
     } else {

Modified: pig/trunk/src/org/apache/pig/impl/util/avro/AvroStorageSchemaConversionUtilities.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/util/avro/AvroStorageSchemaConversionUtilities.java?rev=1514523&r1=1514522&r2=1514523&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/util/avro/AvroStorageSchemaConversionUtilities.java (original)
+++ pig/trunk/src/org/apache/pig/impl/util/avro/AvroStorageSchemaConversionUtilities.java Thu Aug 15 22:50:51 2013
@@ -24,6 +24,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.avro.AvroRuntimeException;
 import org.apache.avro.Schema;
 import org.apache.avro.Schema.Field;
 import org.apache.avro.Schema.Type;
@@ -573,32 +574,37 @@ public class AvroStorageSchemaConversion
       final Schema oldSchema, final List<RequiredField> rfl) {
     List<Schema.Field> fields = Lists.newArrayList();
     for (RequiredField rf : rfl) {
-      Schema.Field f = oldSchema.getField(rf.getAlias());
-      if (f == null) {
-        return null;
-      }
       try {
-        if (getPigType(f.schema()) != rf.getType()) {
+        Schema.Field f = oldSchema.getField(rf.getAlias());
+        if (f == null) {
           return null;
         }
-      } catch (ExecException e) {
-        Log.warn("ExecException caught in newSchemaFromRequiredFieldList", e);
-        return null;
-      }
-      if (rf.getSubFields() == null) {
-        fields.add(
-            new Schema.Field(f.name(), f.schema(), f.doc(), f.defaultValue()));
-      } else {
-        Schema innerSchema =
-            newSchemaFromRequiredFieldList(f.schema(), rf.getSubFields());
-        if (innerSchema == null) {
+        try {
+          if (getPigType(f.schema()) != rf.getType()) {
+            return null;
+          }
+        } catch (ExecException e) {
+          Log.warn("ExecException caught in newSchemaFromRequiredFieldList", e);
           return null;
-        } else {
+        }
+        if (rf.getSubFields() == null) {
           fields.add(
-              new Schema.Field(
-                  f.name(), innerSchema, f.doc(), f.defaultValue()));
+              new Schema.Field(f.name(), f.schema(), f.doc(), f.defaultValue()));
+        } else {
+          Schema innerSchema =
+              newSchemaFromRequiredFieldList(f.schema(), rf.getSubFields());
+          if (innerSchema == null) {
+            return null;
+          } else {
+            fields.add(
+                new Schema.Field(
+                    f.name(), innerSchema, f.doc(), f.defaultValue()));
+          }
         }
       }
+      catch (AvroRuntimeException e){
+        return oldSchema;
+      }
     }
 
     Schema newSchema = Schema.createRecord(

Modified: pig/trunk/test/org/apache/pig/builtin/TestAvroStorage.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/builtin/TestAvroStorage.java?rev=1514523&r1=1514522&r2=1514523&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/builtin/TestAvroStorage.java (original)
+++ pig/trunk/test/org/apache/pig/builtin/TestAvroStorage.java Thu Aug 15 22:50:51 2013
@@ -37,6 +37,9 @@ import org.apache.pig.PigServer;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.executionengine.ExecJob;
 import org.apache.pig.backend.executionengine.ExecJob.JOB_STATUS;
+import org.apache.pig.builtin.mock.Storage.Data;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.test.Util;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -50,6 +53,7 @@ import java.io.PrintStream;
 import java.io.PrintWriter;
 import java.util.ArrayList;
 import java.util.Comparator;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -57,6 +61,8 @@ import java.util.TreeSet;
 
 import static junit.framework.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
+import static org.apache.pig.builtin.mock.Storage.resetData;
+import static org.apache.pig.builtin.mock.Storage.tuple;
 
 public class TestAvroStorage {
 
@@ -117,7 +123,7 @@ public class TestAvroStorage {
     @BeforeClass
     public static void setup() throws ExecException, IOException {
         pigServerLocal = new PigServer(ExecType.LOCAL);
-        deleteDirectory(new File(outbasedir));
+        Util.deleteDirectory(new File(outbasedir));
         generateInputFiles();
     }
 
@@ -709,16 +715,36 @@ public class TestAvroStorage {
         );
       verifyResults(createOutputName(), check);
     }
-
-    private static void deleteDirectory (File path) {
-        if ( path.exists()) {
-            File [] files = path.listFiles();
-            for (File file: files) {
-                if (file.isDirectory())
-                    deleteDirectory(file);
-                file.delete();
-            }
-        }
+    
+    @Test
+    public void testRetrieveDataFromMap() throws Exception {
+        pigServerLocal = new PigServer(ExecType.LOCAL);
+        Data data = resetData(pigServerLocal);
+        Map<String, String> mapv1 = new HashMap<String, String>();
+        mapv1.put("key1", "v11");
+        mapv1.put("key2", "v12");
+        Map<String, String> mapv2 = new HashMap<String, String>();
+        mapv2.put("key1", "v21");
+        mapv2.put("key2", "v22");
+        data.set("testMap", "maps:map[chararray]", tuple(mapv1), tuple(mapv2));
+        String schemaDescription = new String(
+                "{" +
+                      "\"type\": \"record\"," + 
+                      "\"name\": \"record\"," +
+                      "\"fields\" : [" +
+                      "{\"name\" : \"maps\", \"type\" :{\"type\" : \"map\", \"values\" : \"string\"}}" +
+                      "]" +
+                      "}");
+        pigServerLocal.registerQuery("A = LOAD 'testMap' USING mock.Storage();");
+        pigServerLocal.registerQuery("STORE A INTO '" + createOutputName() + "' USING AvroStorage('"+ schemaDescription +"');");
+        pigServerLocal.registerQuery("B = LOAD '" + createOutputName() + "' USING AvroStorage();");
+        pigServerLocal.registerQuery("C = FOREACH B generate maps#'key1';");
+        pigServerLocal.registerQuery("STORE C INTO 'out' USING mock.Storage();");
+        
+
+        List<Tuple> out = data.get("out");
+        assertEquals(tuple("v11"), out.get(0));
+        assertEquals(tuple("v21"), out.get(1));
     }
 
     private void testAvroStorage(boolean expectedToSucceed, String scriptFile, Map<String,String> parameterMap) throws IOException {