You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2011/11/14 00:45:28 UTC
svn commit: r1201550 - in /pig/branches/branch-0.10: CHANGES.txt
src/org/apache/pig/builtin/JsonMetadata.java
test/org/apache/pig/test/TestPigStorage.java
Author: daijy
Date: Sun Nov 13 23:45:27 2011
New Revision: 1201550
URL: http://svn.apache.org/viewvc?rev=1201550&view=rev
Log:
PIG-2209: JsonMetadata fails to find schema for glob paths
Modified:
pig/branches/branch-0.10/CHANGES.txt
pig/branches/branch-0.10/src/org/apache/pig/builtin/JsonMetadata.java
pig/branches/branch-0.10/test/org/apache/pig/test/TestPigStorage.java
Modified: pig/branches/branch-0.10/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.10/CHANGES.txt?rev=1201550&r1=1201549&r2=1201550&view=diff
==============================================================================
--- pig/branches/branch-0.10/CHANGES.txt (original)
+++ pig/branches/branch-0.10/CHANGES.txt Sun Nov 13 23:45:27 2011
@@ -150,6 +150,8 @@ PIG-2228: support partial aggregation in
BUG FIXES
+PIG-2209: JsonMetadata fails to find schema for glob paths (daijy)
+
PIG-2352: e2e test harness' use of environment variables causes unintended effects between tests (gates)
PIG-2165: Need a way to deal with params and param_file in embedded pig in python (daijy)
Modified: pig/branches/branch-0.10/src/org/apache/pig/builtin/JsonMetadata.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.10/src/org/apache/pig/builtin/JsonMetadata.java?rev=1201550&r1=1201549&r2=1201550&view=diff
==============================================================================
--- pig/branches/branch-0.10/src/org/apache/pig/builtin/JsonMetadata.java (original)
+++ pig/branches/branch-0.10/src/org/apache/pig/builtin/JsonMetadata.java Sun Nov 13 23:45:27 2011
@@ -44,6 +44,7 @@ import org.apache.pig.backend.hadoop.dat
import org.apache.pig.backend.hadoop.datastorage.HDataStorage;
import org.apache.pig.backend.hadoop.datastorage.HDirectory;
import org.apache.pig.backend.hadoop.datastorage.HFile;
+import org.apache.pig.backend.hadoop.datastorage.HPath;
import org.apache.pig.impl.io.FileLocalizer;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.codehaus.jackson.JsonGenerationException;
@@ -115,25 +116,29 @@ public class JsonMetadata implements Loa
} else {
ElementDescriptor[] descriptors = storage.asCollection(loc);
for(ElementDescriptor descriptor : descriptors) {
- String fileName = null, parentName = null;
- ContainerDescriptor parentContainer = null;
+ ContainerDescriptor container = null;
+
if (descriptor instanceof HFile) {
- Path descriptorPath = ((HFile) descriptor).getPath();
- fileName = descriptorPath.getName();
+ Path descriptorPath = ((HPath) descriptor).getPath();
+ String fileName = descriptorPath.getName();
Path parent = descriptorPath.getParent();
- parentName = parent.toString();
- parentContainer = new HDirectory((HDataStorage)storage,parent);
- }
- ElementDescriptor metaFilePath = storage.asElement(parentName, prefix+"."+fileName);
-
- // if the file has a custom schema, use it
- if (metaFilePath.exists()) {
- metaFileSet.add(metaFilePath);
- continue;
+ String parentName = parent.toString();
+ container = new HDirectory((HDataStorage)storage,parent);
+
+ // try prefix.filename for the file
+ ElementDescriptor metaFilePath = storage.asElement(parentName, prefix+"."+fileName);
+
+ // if the file has a custom schema, use it
+ if (metaFilePath.exists()) {
+ metaFileSet.add(metaFilePath);
+ continue;
+ }
+ } else { // descriptor instanceof HDirectory
+ container = (HDirectory)descriptor;
}
// if no custom schema, try the parent directory
- metaFilePath = storage.asElement(parentContainer, prefix);
+ ElementDescriptor metaFilePath = storage.asElement(container, prefix);
if (metaFilePath.exists()) {
metaFileSet.add(metaFilePath);
}
Modified: pig/branches/branch-0.10/test/org/apache/pig/test/TestPigStorage.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.10/test/org/apache/pig/test/TestPigStorage.java?rev=1201550&r1=1201549&r2=1201550&view=diff
==============================================================================
--- pig/branches/branch-0.10/test/org/apache/pig/test/TestPigStorage.java (original)
+++ pig/branches/branch-0.10/test/org/apache/pig/test/TestPigStorage.java Sun Nov 13 23:45:27 2011
@@ -37,18 +37,25 @@ import java.util.Properties;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapreduce.Job;
import org.apache.pig.ExecType;
import org.apache.pig.PigServer;
import org.apache.pig.ResourceSchema;
import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject;
+import org.apache.pig.builtin.PigStorage;
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.io.FileLocalizer;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.util.Utils;
import org.apache.pig.test.utils.TypeCheckingTestUtil;
+import org.codehaus.jackson.JsonGenerationException;
+import org.codehaus.jackson.map.JsonMappingException;
+import org.codehaus.jackson.map.ObjectMapper;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
@@ -395,4 +402,66 @@ public class TestPigStorage {
header = Util.readOutput(pig.getPigContext(), outPath);
Assert.assertArrayEquals("Headers are not the same.", new String[] {"foo\tbar"}, header);
}
+
+ private void putInputFile(String filename) throws IOException {
+ Util.createLocalInputFile(filename, new String[] {});
+ }
+
+ private void putSchemaFile(String schemaFilename, ResourceSchema testSchema) throws JsonGenerationException, JsonMappingException, IOException {
+ new ObjectMapper().writeValue(new File(schemaFilename), testSchema);
+ }
+
+ @Test
+ public void testPigStorageSchemaSearch() throws Exception {
+ String globtestdir = "build/test/tmpglobbingdata/";
+ ResourceSchema testSchema = new ResourceSchema(Utils.parseSchema("a0:chararray"));
+ PigStorage pigStorage = new PigStorage();
+ pigContext.connect();
+ try{
+ Util.deleteDirectory(new File(datadir));
+ pig.mkdirs(globtestdir+"a");
+ pig.mkdirs(globtestdir+"a/a0");
+ putInputFile(globtestdir+"a/a0/input");
+ pig.mkdirs(globtestdir+"a/b0");
+ putInputFile(globtestdir+"a/b0/input");
+ pig.mkdirs(globtestdir+"b");
+ } catch (IOException e) {};
+
+ // if schema file is not found, schema is null
+ ResourceSchema schema = pigStorage.getSchema(globtestdir, new Job(ConfigurationUtil.toConfiguration(pigContext.getProperties())));
+ Assert.assertTrue(schema==null);
+
+ // .pig_schema.input in along with input file
+ putSchemaFile(globtestdir+"a/a0/.pig_schema.input", testSchema);
+ schema = pigStorage.getSchema(globtestdir+"a/a0/*", new Job(ConfigurationUtil.toConfiguration(pigContext.getProperties())));
+ Assert.assertTrue(ResourceSchema.equals(schema, testSchema));
+ new File(globtestdir+"a/a0/.pig_schema.input").delete();
+
+ // if .pig_schema is in the input directory
+ putSchemaFile(globtestdir+"a/a0/.pig_schema", testSchema);
+ schema = pigStorage.getSchema(globtestdir+"a/a0", new Job(ConfigurationUtil.toConfiguration(pigContext.getProperties())));
+ Assert.assertTrue(ResourceSchema.equals(schema, testSchema));
+ new File(globtestdir+"a/a0/.pig_schema").delete();
+
+ // .pig_schema in one of globStatus returned directory
+ putSchemaFile(globtestdir+"a/.pig_schema", testSchema);
+ schema = pigStorage.getSchema(globtestdir+"*", new Job(ConfigurationUtil.toConfiguration(pigContext.getProperties())));
+ Assert.assertTrue(ResourceSchema.equals(schema, testSchema));
+ new File(globtestdir+"a/.pig_schema").delete();
+
+ putSchemaFile(globtestdir+"b/.pig_schema", testSchema);
+ schema = pigStorage.getSchema(globtestdir+"*", new Job(ConfigurationUtil.toConfiguration(pigContext.getProperties())));
+ Assert.assertTrue(ResourceSchema.equals(schema, testSchema));
+ new File(globtestdir+"b/.pig_schema").delete();
+
+ // if .pig_schema is deep in the globbing, it will not get used
+ putSchemaFile(globtestdir+"a/a0/.pig_schema", testSchema);
+ schema = pigStorage.getSchema(globtestdir+"*", new Job(ConfigurationUtil.toConfiguration(pigContext.getProperties())));
+ Assert.assertTrue(schema==null);
+ putSchemaFile(globtestdir+"a/.pig_schema", testSchema);
+ schema = pigStorage.getSchema(globtestdir+"*", new Job(ConfigurationUtil.toConfiguration(pigContext.getProperties())));
+ Assert.assertTrue(ResourceSchema.equals(schema, testSchema));
+ new File(globtestdir+"a/a0/.pig_schema").delete();
+ new File(globtestdir+"a/.pig_schema").delete();
+ }
}