You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2011/11/14 00:46:05 UTC

svn commit: r1201553 - in /pig/trunk: CHANGES.txt src/org/apache/pig/builtin/JsonMetadata.java test/org/apache/pig/test/TestPigStorage.java

Author: daijy
Date: Sun Nov 13 23:46:04 2011
New Revision: 1201553

URL: http://svn.apache.org/viewvc?rev=1201553&view=rev
Log:
PIG-2209: JsonMetadata fails to find schema for glob paths

Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/org/apache/pig/builtin/JsonMetadata.java
    pig/trunk/test/org/apache/pig/test/TestPigStorage.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1201553&r1=1201552&r2=1201553&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Sun Nov 13 23:46:04 2011
@@ -170,6 +170,8 @@ PIG-2228: support partial aggregation in
 
 BUG FIXES
 
+PIG-2209: JsonMetadata fails to find schema for glob paths (daijy)
+
 PIG-2165: Need a way to deal with params and param_file in embedded pig in python (daijy)
 
 PIG-2313: NPE in ILLUSTRATE trying to get StatusReporter in STORE (daijy)

Modified: pig/trunk/src/org/apache/pig/builtin/JsonMetadata.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/builtin/JsonMetadata.java?rev=1201553&r1=1201552&r2=1201553&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/builtin/JsonMetadata.java (original)
+++ pig/trunk/src/org/apache/pig/builtin/JsonMetadata.java Sun Nov 13 23:46:04 2011
@@ -44,6 +44,7 @@ import org.apache.pig.backend.hadoop.dat
 import org.apache.pig.backend.hadoop.datastorage.HDataStorage;
 import org.apache.pig.backend.hadoop.datastorage.HDirectory;
 import org.apache.pig.backend.hadoop.datastorage.HFile;
+import org.apache.pig.backend.hadoop.datastorage.HPath;
 import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.impl.logicalLayer.FrontendException;
 import org.codehaus.jackson.JsonGenerationException;
@@ -115,25 +116,29 @@ public class JsonMetadata implements Loa
             } else {
                 ElementDescriptor[] descriptors = storage.asCollection(loc);
                 for(ElementDescriptor descriptor : descriptors) {
-                    String fileName = null, parentName = null;
-                    ContainerDescriptor parentContainer = null;
+                    ContainerDescriptor container = null;
+                    
                     if (descriptor instanceof HFile) {
-                        Path descriptorPath = ((HFile) descriptor).getPath();
-                        fileName = descriptorPath.getName();
+                        Path descriptorPath = ((HPath) descriptor).getPath();
+                        String fileName = descriptorPath.getName();
                         Path parent = descriptorPath.getParent();
-                        parentName = parent.toString();
-                        parentContainer = new HDirectory((HDataStorage)storage,parent);
-                    }
-                    ElementDescriptor metaFilePath = storage.asElement(parentName, prefix+"."+fileName);
-
-                    // if the file has a custom schema, use it
-                    if (metaFilePath.exists()) {
-                        metaFileSet.add(metaFilePath);
-                        continue;
+                        String parentName = parent.toString();
+                        container = new HDirectory((HDataStorage)storage,parent);
+                        
+                        // try prefix.filename for the file
+                        ElementDescriptor metaFilePath = storage.asElement(parentName, prefix+"."+fileName);
+
+                        // if the file has a custom schema, use it
+                        if (metaFilePath.exists()) {
+                            metaFileSet.add(metaFilePath);
+                            continue;
+                        }
+                    } else { // descriptor instanceof HDirectory
+                        container = (HDirectory)descriptor;
                     }
 
                     // if no custom schema, try the parent directory
-                    metaFilePath = storage.asElement(parentContainer, prefix);
+                    ElementDescriptor metaFilePath = storage.asElement(container, prefix);
                     if (metaFilePath.exists()) {
                         metaFileSet.add(metaFilePath);
                     }

Modified: pig/trunk/test/org/apache/pig/test/TestPigStorage.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestPigStorage.java?rev=1201553&r1=1201552&r2=1201553&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestPigStorage.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestPigStorage.java Sun Nov 13 23:46:04 2011
@@ -37,18 +37,25 @@ import java.util.Properties;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapreduce.Job;
 import org.apache.pig.ExecType;
 import org.apache.pig.PigServer;
 import org.apache.pig.ResourceSchema;
 import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject;
+import org.apache.pig.builtin.PigStorage;
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.impl.logicalLayer.FrontendException;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.util.Utils;
 import org.apache.pig.test.utils.TypeCheckingTestUtil;
+import org.codehaus.jackson.JsonGenerationException;
+import org.codehaus.jackson.map.JsonMappingException;
+import org.codehaus.jackson.map.ObjectMapper;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Before;
@@ -395,4 +402,66 @@ public class TestPigStorage  {
         header = Util.readOutput(pig.getPigContext(), outPath);
         Assert.assertArrayEquals("Headers are not the same.", new String[] {"foo\tbar"}, header);
     }
+    
+    private void putInputFile(String filename) throws IOException {
+        Util.createLocalInputFile(filename, new String[] {});
+    }
+    
+    private void putSchemaFile(String schemaFilename, ResourceSchema testSchema) throws JsonGenerationException, JsonMappingException, IOException {
+        new ObjectMapper().writeValue(new File(schemaFilename), testSchema);
+    }
+    
+    @Test
+    public void testPigStorageSchemaSearch() throws Exception {
+        String globtestdir = "build/test/tmpglobbingdata/";
+        ResourceSchema testSchema = new ResourceSchema(Utils.parseSchema("a0:chararray"));
+        PigStorage pigStorage = new PigStorage();
+        pigContext.connect();
+        try{
+            Util.deleteDirectory(new File(datadir));
+            pig.mkdirs(globtestdir+"a");
+            pig.mkdirs(globtestdir+"a/a0");
+            putInputFile(globtestdir+"a/a0/input");
+            pig.mkdirs(globtestdir+"a/b0");
+            putInputFile(globtestdir+"a/b0/input");
+            pig.mkdirs(globtestdir+"b");
+        } catch (IOException e) {};
+        
+        // if schema file is not found, schema is null
+        ResourceSchema schema = pigStorage.getSchema(globtestdir, new Job(ConfigurationUtil.toConfiguration(pigContext.getProperties())));
+        Assert.assertTrue(schema==null);
+        
+        // .pig_schema.input in along with input file
+        putSchemaFile(globtestdir+"a/a0/.pig_schema.input", testSchema);
+        schema = pigStorage.getSchema(globtestdir+"a/a0/*", new Job(ConfigurationUtil.toConfiguration(pigContext.getProperties())));
+        Assert.assertTrue(ResourceSchema.equals(schema, testSchema));
+        new File(globtestdir+"a/a0/.pig_schema.input").delete();
+        
+        // if .pig_schema is in the input directory
+        putSchemaFile(globtestdir+"a/a0/.pig_schema", testSchema);
+        schema = pigStorage.getSchema(globtestdir+"a/a0", new Job(ConfigurationUtil.toConfiguration(pigContext.getProperties())));
+        Assert.assertTrue(ResourceSchema.equals(schema, testSchema));
+        new File(globtestdir+"a/a0/.pig_schema").delete();
+        
+        // .pig_schema in one of globStatus returned directory
+        putSchemaFile(globtestdir+"a/.pig_schema", testSchema);
+        schema = pigStorage.getSchema(globtestdir+"*", new Job(ConfigurationUtil.toConfiguration(pigContext.getProperties())));
+        Assert.assertTrue(ResourceSchema.equals(schema, testSchema));
+        new File(globtestdir+"a/.pig_schema").delete();
+        
+        putSchemaFile(globtestdir+"b/.pig_schema", testSchema);
+        schema = pigStorage.getSchema(globtestdir+"*", new Job(ConfigurationUtil.toConfiguration(pigContext.getProperties())));
+        Assert.assertTrue(ResourceSchema.equals(schema, testSchema));
+        new File(globtestdir+"b/.pig_schema").delete();
+        
+        // if .pig_schema is deep in the globbing, it will not get used
+        putSchemaFile(globtestdir+"a/a0/.pig_schema", testSchema);
+        schema = pigStorage.getSchema(globtestdir+"*", new Job(ConfigurationUtil.toConfiguration(pigContext.getProperties())));
+        Assert.assertTrue(schema==null);
+        putSchemaFile(globtestdir+"a/.pig_schema", testSchema);
+        schema = pigStorage.getSchema(globtestdir+"*", new Job(ConfigurationUtil.toConfiguration(pigContext.getProperties())));
+        Assert.assertTrue(ResourceSchema.equals(schema, testSchema));
+        new File(globtestdir+"a/a0/.pig_schema").delete();
+        new File(globtestdir+"a/.pig_schema").delete();
+    }
 }