You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2012/03/01 03:54:01 UTC

svn commit: r1295389 - in /pig/trunk: CHANGES.txt src/org/apache/pig/builtin/PigStorage.java src/org/apache/pig/impl/util/Utils.java test/org/apache/pig/test/TestPigStorage.java

Author: daijy
Date: Thu Mar  1 02:54:00 2012
New Revision: 1295389

URL: http://svn.apache.org/viewvc?rev=1295389&view=rev
Log:
PIG-2541: Automatic record provenance (source tagging) for PigStorage

Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/org/apache/pig/builtin/PigStorage.java
    pig/trunk/src/org/apache/pig/impl/util/Utils.java
    pig/trunk/test/org/apache/pig/test/TestPigStorage.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1295389&r1=1295388&r2=1295389&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Thu Mar  1 02:54:00 2012
@@ -24,6 +24,8 @@ INCOMPATIBLE CHANGES
 
 IMPROVEMENTS
 
+PIG-2541: Automatic record provenance (source tagging) for PigStorage (prkommireddi via daijy)
+
 PIG-2538: Add helper wrapper classes for StoreFunc (billgraham via dvryaboy)
 
 PIG-2010: registered jars on distributed cache (traviscrawford and julienledem via dvryaboy)

Modified: pig/trunk/src/org/apache/pig/builtin/PigStorage.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/builtin/PigStorage.java?rev=1295389&r1=1295388&r2=1295389&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/builtin/PigStorage.java (original)
+++ pig/trunk/src/org/apache/pig/builtin/PigStorage.java Thu Mar  1 02:54:00 2012
@@ -42,6 +42,7 @@ import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.OutputFormat;
 import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.pig.Expression;
@@ -84,6 +85,7 @@ import org.apache.pig.parser.ParserExcep
  * <li><code>-schema</code> Reads/Stores the schema of the relation using a 
  *  hidden JSON file.
  * <li><code>-noschema</code> Ignores a stored schema during loading.
+ * <li><code>-tagsource</code> Appends input source file path to end of each tuple. Make sure to set "pig.splitCombination" to false
  * </ul>
  * <p>
  * <h3>Schemas</h3>
@@ -99,6 +101,12 @@ import org.apache.pig.parser.ParserExcep
  * This file simply lists the delimited aliases. This is intended to make export to tools that can read
  * files with header lines easier (just cat the header to your data).
  * <p>
+ * <h3>Source tagging</h3>
+ * If<code>-tagsource</code> is specified, PigStorage will prepend input split path to each Tuple/row.
+ * User needs to ensure <code>pig.splitCombination</code> is set to false. 
+ * Usage: A = LOAD 'input' using PigStorage(',','-tagschema'); B = foreach A generate INPUT_FILE_NAME;
+ * The first field in each Tuple will contain input path (INPUT_FILE_NAME)
+ * <p>
  * Note that regardless of whether or not you store the schema, you <b>always</b> need to specify
  * the correct delimiter to read your data. If you store reading delimiter "#" and then load using
  * the default delimiter, your data will not be parsed correctly.
@@ -137,10 +145,16 @@ LoadPushDown, LoadMetadata, StoreMetadat
 
     protected boolean[] mRequiredColumns = null;
     private boolean mRequiredColumnsInitialized = false;
+    
+    //Indicates whether the input file path should be read.
+    private boolean tagSource = false;
+    private static final String TAG_SOURCE_PATH = "tagsource";
+    private Path sourcePath = null;
 
     private void populateValidOptions() {
         validOptions.addOption("schema", false, "Loads / Stores the schema of the relation using a hidden JSON file.");
         validOptions.addOption("noschema", false, "Disable attempting to load data schema from the filesystem.");
+        validOptions.addOption(TAG_SOURCE_PATH, false, "Appends input source file path to end of each tuple. Make sure to set pig.splitCombination to false");
     }
 
     public PigStorage() {
@@ -166,6 +180,7 @@ LoadPushDown, LoadMetadata, StoreMetadat
      * <ul>
      * <li><code>-schema</code> Loads / Stores the schema of the relation using a hidden JSON file.
      * <li><code>-noschema</code> Ignores a stored schema during loading.
+     * <li><code>-tagsource</code> Appends input source file path to end of each tuple. Make sure to set "pig.splitCombination" to false
      * </ul>
      * @param delimiter the single byte character that is used to separate fields.
      * @param options a list of options that can be used to modify PigStorage behavior
@@ -179,6 +194,7 @@ LoadPushDown, LoadMetadata, StoreMetadat
             configuredOptions = parser.parse(validOptions, optsArr);
             isSchemaOn = configuredOptions.hasOption("schema");
             dontLoadSchema = configuredOptions.hasOption("noschema");
+            tagSource = configuredOptions.hasOption(TAG_SOURCE_PATH);
         } catch (ParseException e) {
             HelpFormatter formatter = new HelpFormatter();
             formatter.printHelp( "PigStorage(',', '[options]')", validOptions);
@@ -198,6 +214,11 @@ LoadPushDown, LoadMetadata, StoreMetadat
             }
             mRequiredColumnsInitialized = true;
         }
+        //Prepend input source path if source tagging is enabled
+        if(tagSource) {
+        	mProtoTuple.add(new DataByteArray(sourcePath.getName()));
+        }
+
         try {
             boolean notDone = in.nextKeyValue();
             if (!notDone) {
@@ -343,6 +364,9 @@ LoadPushDown, LoadMetadata, StoreMetadat
     @Override
     public void prepareToRead(RecordReader reader, PigSplit split) {
         in = reader;
+        if(tagSource) {
+        	sourcePath = ((FileSplit)split.getWrappedSplit()).getPath();
+        }
     }
 
     @Override
@@ -442,6 +466,9 @@ LoadPushDown, LoadMetadata, StoreMetadat
             schema = (new JsonMetadata()).getSchema(location, job, isSchemaOn);
 
             if (signature != null && schema != null) {
+                if(tagSource) {
+                    schema = Utils.getSchemaWithInputSourceTag(schema);
+                }
                 Properties p = UDFContext.getUDFContext().getUDFProperties(this.getClass(),
                         new String[] {signature});
                 p.setProperty(signature + ".schema", schema.toString());

Modified: pig/trunk/src/org/apache/pig/impl/util/Utils.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/util/Utils.java?rev=1295389&r1=1295388&r2=1295389&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/util/Utils.java (original)
+++ pig/trunk/src/org/apache/pig/impl/util/Utils.java Thu Mar  1 02:54:00 2012
@@ -40,6 +40,7 @@ import org.apache.pig.FuncSpec;
 import org.apache.pig.LoadFunc;
 import org.apache.pig.PigException;
 import org.apache.pig.ResourceSchema;
+import org.apache.pig.ResourceSchema.ResourceFieldSchema;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
@@ -48,6 +49,7 @@ import org.apache.pig.impl.io.InterStora
 import org.apache.pig.impl.io.ReadToEndLoader;
 import org.apache.pig.impl.io.TFileStorage;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
 import org.apache.pig.newplan.logical.Util;
 import org.apache.pig.newplan.logical.relational.LogicalSchema;
 import org.apache.pig.parser.ParserException;
@@ -183,7 +185,26 @@ public class Utils {
         LogicalSchema schema = queryParser.parseSchema(schemaString);
 		return schema;
 	}
-    
+	
+    /**
+     * This method adds FieldSchema of 'input source tag/path' as the first
+     * field. This will be called only when PigStorage is invoked with
+     * '-tagsource' option and the schema file is present to be loaded.
+     * 
+     * @param schema
+     * @return ResourceSchema
+     */
+    public static ResourceSchema getSchemaWithInputSourceTag(ResourceSchema schema) {
+        ResourceFieldSchema[] fieldSchemas = schema.getFields();
+        ResourceFieldSchema sourceTagSchema = new ResourceFieldSchema(new FieldSchema("INPUT_FILE_NAME", DataType.CHARARRAY));
+        ResourceFieldSchema[] fieldSchemasWithSourceTag = new ResourceFieldSchema[fieldSchemas.length + 1];
+        fieldSchemasWithSourceTag[0] = sourceTagSchema;
+        for(int j = 0; j < fieldSchemas.length; j++) {
+            fieldSchemasWithSourceTag[j + 1] = fieldSchemas[j];
+        }
+        return schema.setFields(fieldSchemasWithSourceTag);
+    }
+
     public static String getTmpFileCompressorName(PigContext pigContext) {
         if (pigContext == null)
             return InterStorage.class.getName();

Modified: pig/trunk/test/org/apache/pig/test/TestPigStorage.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestPigStorage.java?rev=1295389&r1=1295388&r2=1295389&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestPigStorage.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestPigStorage.java Thu Mar  1 02:54:00 2012
@@ -463,4 +463,71 @@ public class TestPigStorage  {
         schema = pigStorage.getSchema(globtestdir+"{a,b}", new Job(ConfigurationUtil.toConfiguration(pigContext.getProperties())));
         Assert.assertTrue(ResourceSchema.equals(schema, testSchema));
     }
+    
+    /**
+     * This is for testing source tagging option on PigStorage. When a user
+     * specifies '-tagsource' as an option, PigStorage must prepend the input
+     * source path to the tuple and "INPUT_FILE_NAME" to schema.
+     * 
+     * @throws Exception
+     */
+    @Test
+    public void testPigStorageSourceTagSchema() throws Exception {
+        pigContext.connect();
+        String query = "a = LOAD '" + datadir + "originput' using PigStorage('\\t') " +
+        "as (f1:chararray, f2:int);";
+        pig.registerQuery(query);
+        pig.store("a", datadir + "aout", "PigStorage('\\t', '-schema')");
+        // aout now has a schema.
+
+        // Verify that loading a-out with '-tagsource' produces
+        // the original schema, and prepends 'INPUT_FILE_NAME' to
+        // original schema.
+        pig.registerQuery("b = LOAD '" + datadir + "aout' using PigStorage('\\t', '-tagsource');");
+        Schema genSchema = pig.dumpSchema("b");
+        // Verify that -tagsource schema works
+        String[] aliases = {"INPUT_FILE_NAME", "f1", "f2"};
+        byte[] types = {DataType.CHARARRAY, DataType.CHARARRAY, DataType.INTEGER};
+        Schema newSchema = TypeCheckingTestUtil.genFlatSchema(
+                aliases,types);
+        Assert.assertTrue("schema with -tagsource preprends INPUT_FILE_NAME",
+                Schema.equals(newSchema, genSchema, true, false));
+        
+        // Verify that explicitly requesting no schema works
+        pig.registerQuery("d = LOAD '" + datadir + "aout' using PigStorage('\\t', '-noschema');");
+        genSchema = pig.dumpSchema("d");
+        assertNull(genSchema);
+
+        // Verify specifying your own schema works
+        pig.registerQuery("b = LOAD '" + datadir + "aout' using PigStorage('\\t', '-tagsource') " +
+        "as (input_file:chararray, foo:chararray, bar:int);");
+        genSchema = pig.dumpSchema("b");
+        String[] newAliases = {"input_file", "foo", "bar"};
+        byte[] newTypes = {DataType.CHARARRAY, DataType.CHARARRAY, DataType.INTEGER};
+        newSchema = TypeCheckingTestUtil.genFlatSchema(newAliases,newTypes);
+        Assert.assertTrue("explicit schema overrides metadata",
+                Schema.equals(newSchema, genSchema, true, false));
+    }
+    
+    @Test
+    public void testPigStorageSourceTagValue() throws Exception {
+        final String storeFileName = "part-m-00000";
+        pigContext.connect();
+
+        String query = "a = LOAD '" + datadir + "' using PigStorage('\\t') " +
+        "as (f1:chararray, f2:int);";
+        pig.registerQuery(query);
+        // Storing in 'aout' directory will store contents in part-m-00000
+        pig.store("a", datadir + "aout", "PigStorage('\\t', '-schema')");
+        
+        // Verify input source tag is present when using -tagsource
+        pig.registerQuery("b = LOAD '" + datadir + "aout' using PigStorage('\\t', '-tagsource');");
+        pig.registerQuery("c = foreach b generate INPUT_FILE_NAME;");
+        Iterator<Tuple> iter = pig.openIterator("c");
+        while(iter.hasNext()) {
+            Tuple tuple = iter.next();
+            String inputFileName = (String)tuple.get(0);
+            assertEquals("tagsource value must be part-m-00000", inputFileName, storeFileName);
+        }
+    }    
 }