You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ch...@apache.org on 2012/12/13 20:57:47 UTC

svn commit: r1421461 - in /pig/trunk: CHANGES.txt src/docs/src/documentation/content/xdocs/func.xml src/org/apache/pig/builtin/PigStorage.java src/org/apache/pig/impl/util/Utils.java test/org/apache/pig/test/TestPigStorage.java

Author: cheolsoo
Date: Thu Dec 13 19:57:46 2012
New Revision: 1421461

URL: http://svn.apache.org/viewvc?rev=1421461&view=rev
Log:
PIG-2857: Add a -tagPath option to PigStorage (prkommireddi via cheolsoo)

Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/docs/src/documentation/content/xdocs/func.xml
    pig/trunk/src/org/apache/pig/builtin/PigStorage.java
    pig/trunk/src/org/apache/pig/impl/util/Utils.java
    pig/trunk/test/org/apache/pig/test/TestPigStorage.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1421461&r1=1421460&r2=1421461&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Thu Dec 13 19:57:46 2012
@@ -24,6 +24,8 @@ INCOMPATIBLE CHANGES
 
 IMPROVEMENTS
 
+PIG-2857: Add a -tagPath option to PigStorage (prkommireddi via cheolsoo)
+
 PIG-2341: Need better documentation on Pig/HBase integration (jthakrar and billgraham via billgraham)
 
 PIG-3075: Allow AvroStorage STORE Operations To Use Schema Specified By URI (nwhite via cheolsoo)

Modified: pig/trunk/src/docs/src/documentation/content/xdocs/func.xml
URL: http://svn.apache.org/viewvc/pig/trunk/src/docs/src/documentation/content/xdocs/func.xml?rev=1421461&r1=1421460&r2=1421461&view=diff
==============================================================================
--- pig/trunk/src/docs/src/documentation/content/xdocs/func.xml (original)
+++ pig/trunk/src/docs/src/documentation/content/xdocs/func.xml Thu Dec 13 19:57:46 2012
@@ -1436,10 +1436,12 @@ STORE X INTO 'output' USING PigDump();
                <p>A string that contains space-separated options (‘optionA  optionB  optionC’)</p>
                <p>Currently supported options are:</p>
                <ul>
-					<li>(‘schema’) - Stores the schema of the relation using a hidden JSON file.</li>
-					<li>(‘noschema’) - Ignores a stored schema during the load.</li>
-                                        <li>('tagsource') - Add a first column indicates the input file of the record.</li>
-				</ul>
+                    <li>(‘schema’) - Stores the schema of the relation using a hidden JSON file.</li>
+                    <li>(‘noschema’) - Ignores a stored schema during the load.</li>
+                    <li>('tagsource') - (deprecated, Use tagPath instead) Add a first column indicates the input file of the record.</li>
+                    <li>('tagPath') - Add a first column indicates the input path of the record.</li>
+                    <li>('tagFile') - Add a first column indicates the input file name of the record.</li>
+               </ul>
             </td>
          </tr> 
    </table></section>
@@ -1471,7 +1473,7 @@ STORE X INTO 'output' USING PigDump();
 <p>Note that regardless of whether or not you store the schema, you always need to specify the correct delimiter to read your data. If you store reading delimiter "#" and then load using the default delimiter, your data will not be parsed correctly.</p>   
 
 <p><strong>Record Provenance</strong></p>
-<p>If tagsource option is specified, PigStorage will add a psudo-column INPUT_FILE_NAME to the beginning of the record. As the name suggests, it is the input file name containing this particular record.</p>
+<p>If tagPath or tagFile option is specified, PigStorage will add a pseudo-column INPUT_FILE_PATH or INPUT_FILE_NAME respectively to the beginning of the record. As the name suggests, it is the input file path/name containing this particular record. Please note tagsource is deprecated.</p>
    
   <p><strong>Complex Data Types</strong></p>   
   <p>The formats for complex data types are shown here:</p>

Modified: pig/trunk/src/org/apache/pig/builtin/PigStorage.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/builtin/PigStorage.java?rev=1421461&r1=1421460&r2=1421461&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/builtin/PigStorage.java (original)
+++ pig/trunk/src/org/apache/pig/builtin/PigStorage.java Thu Dec 13 19:57:46 2012
@@ -41,8 +41,8 @@ import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.OutputFormat;
 import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.mapreduce.lib.input.FileSplit;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.pig.Expression;
 import org.apache.pig.FileInputLoadFunc;
@@ -84,7 +84,8 @@ import org.apache.pig.parser.ParserExcep
  * <li><code>-schema</code> Reads/Stores the schema of the relation using a 
  *  hidden JSON file.
  * <li><code>-noschema</code> Ignores a stored schema during loading.
- * <li><code>-tagsource</code> Appends input source file path to beginning of each tuple.
+ * <li><code>-tagFile</code> Appends input source file name to beginning of each tuple.
+ * <li><code>-tagPath</code> Appends input source file path to beginning of each tuple.
  * </ul>
  * <p>
  * <h3>Schemas</h3>
@@ -101,9 +102,12 @@ import org.apache.pig.parser.ParserExcep
  * files with header lines easier (just cat the header to your data).
  * <p>
  * <h3>Source tagging</h3>
- * If<code>-tagsource</code> is specified, PigStorage will prepend input split path to each Tuple/row.
- * Usage: A = LOAD 'input' using PigStorage(',','-tagsource'); B = foreach A generate $0;
- * The first field (0th index) in each Tuple will contain input path 
+ * If<code>-tagFile</code> is specified, PigStorage will prepend input split name to each Tuple/row.
+ * Usage: A = LOAD 'input' using PigStorage(',','-tagFile'); B = foreach A generate $0;
+ * The first field (0th index) in each Tuple will contain input file name.
+ * If<code>-tagPath</code> is specified, PigStorage will prepend input split path to each Tuple/row.
+ * Usage: A = LOAD 'input' using PigStorage(',','-tagPath'); B = foreach A generate $0;
+ * The first field (0th index) in each Tuple will contain input file path 
  * <p>
  * Note that regardless of whether or not you store the schema, you <b>always</b> need to specify
  * the correct delimiter to read your data. If you store reading delimiter "#" and then load using
@@ -144,15 +148,19 @@ LoadPushDown, LoadMetadata, StoreMetadat
     protected boolean[] mRequiredColumns = null;
     private boolean mRequiredColumnsInitialized = false;
     
-    //Indicates whether the input file path should be read.
-    private boolean tagSource = false;
-    private static final String TAG_SOURCE_PATH = "tagsource";
+    // Indicates whether the input file name/path should be read.
+    private boolean tagFile = false;
+    private static final String TAG_SOURCE_FILE = "tagFile";
+    private boolean tagPath = false;
+    private static final String TAG_SOURCE_PATH = "tagPath";
     private Path sourcePath = null;
 
     private void populateValidOptions() {
         validOptions.addOption("schema", false, "Loads / Stores the schema of the relation using a hidden JSON file.");
         validOptions.addOption("noschema", false, "Disable attempting to load data schema from the filesystem.");
-        validOptions.addOption(TAG_SOURCE_PATH, false, "Appends input source file path to beginning of each tuple. ");
+        validOptions.addOption(TAG_SOURCE_FILE, false, "Appends input source file name to beginning of each tuple.");
+        validOptions.addOption(TAG_SOURCE_PATH, false, "Appends input source file path to beginning of each tuple.");
+        validOptions.addOption("tagsource", false, "Appends input source file name to beginning of each tuple.");
     }
 
     public PigStorage() {
@@ -178,7 +186,8 @@ LoadPushDown, LoadMetadata, StoreMetadat
      * <ul>
      * <li><code>-schema</code> Loads / Stores the schema of the relation using a hidden JSON file.
      * <li><code>-noschema</code> Ignores a stored schema during loading.
-     * <li><code>-tagsource</code> Appends input source file path to beginning of each tuple.
+     * <li><code>-tagFile</code> Appends input source file name to beginning of each tuple.
+     * <li><code>-tagPath</code> Appends input source file path to beginning of each tuple.
      * </ul>
      * @param delimiter the single byte character that is used to separate fields.
      * @param options a list of options that can be used to modify PigStorage behavior
@@ -192,7 +201,14 @@ LoadPushDown, LoadMetadata, StoreMetadat
             configuredOptions = parser.parse(validOptions, optsArr);
             isSchemaOn = configuredOptions.hasOption("schema");
             dontLoadSchema = configuredOptions.hasOption("noschema");
-            tagSource = configuredOptions.hasOption(TAG_SOURCE_PATH);
+            tagFile = configuredOptions.hasOption(TAG_SOURCE_FILE);
+            tagPath = configuredOptions.hasOption(TAG_SOURCE_PATH);
+            // TODO: Remove -tagsource in 0.13. For backward compatibility, we
+            // need tagsource to be supported until at least 0.12
+            if (configuredOptions.hasOption("tagsource")) {
+                mLog.warn("'-tagsource' is deprecated. Use '-tagFile' instead.");
+                tagFile = true;
+            }
         } catch (ParseException e) {
             HelpFormatter formatter = new HelpFormatter();
             formatter.printHelp( "PigStorage(',', '[options]')", validOptions);
@@ -213,8 +229,10 @@ LoadPushDown, LoadMetadata, StoreMetadat
             mRequiredColumnsInitialized = true;
         }
         //Prepend input source path if source tagging is enabled
-        if(tagSource) {
-        	mProtoTuple.add(new DataByteArray(sourcePath.getName()));
+        if(tagFile) {
+            mProtoTuple.add(new DataByteArray(sourcePath.getName()));
+        } else if (tagPath) {
+            mProtoTuple.add(new DataByteArray(sourcePath.toString()));
         }
 
         try {
@@ -362,8 +380,8 @@ LoadPushDown, LoadMetadata, StoreMetadat
     @Override
     public void prepareToRead(RecordReader reader, PigSplit split) {
         in = reader;
-        if(tagSource) {
-        	sourcePath = ((FileSplit)split.getWrappedSplit()).getPath();
+        if (tagFile || tagPath) {
+            sourcePath = ((FileSplit)split.getWrappedSplit()).getPath();
         }
     }
 
@@ -471,8 +489,10 @@ LoadPushDown, LoadMetadata, StoreMetadat
             schema = (new JsonMetadata()).getSchema(location, job, isSchemaOn);
 
             if (signature != null && schema != null) {
-                if(tagSource) {
-                    schema = Utils.getSchemaWithInputSourceTag(schema);
+                if(tagFile) {
+                    schema = Utils.getSchemaWithInputSourceTag(schema, "INPUT_FILE_NAME");
+                } else if(tagPath) {
+                    schema = Utils.getSchemaWithInputSourceTag(schema, "INPUT_FILE_PATH");
                 }
                 Properties p = UDFContext.getUDFContext().getUDFProperties(this.getClass(),
                         new String[] {signature});

Modified: pig/trunk/src/org/apache/pig/impl/util/Utils.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/util/Utils.java?rev=1421461&r1=1421460&r2=1421461&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/util/Utils.java (original)
+++ pig/trunk/src/org/apache/pig/impl/util/Utils.java Thu Dec 13 19:57:46 2012
@@ -17,7 +17,6 @@
  */
 package org.apache.pig.impl.util;
 
-import java.io.ByteArrayInputStream;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileNotFoundException;
@@ -51,7 +50,6 @@ import org.apache.pig.impl.io.ReadToEndL
 import org.apache.pig.impl.io.TFileStorage;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
 import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
-import org.apache.pig.newplan.logical.Util;
 import org.apache.pig.newplan.logical.relational.LogicalSchema;
 import org.apache.pig.parser.ParserException;
 import org.apache.pig.parser.QueryParserDriver;
@@ -62,7 +60,7 @@ import com.google.common.collect.Lists;
  * Class with utility static methods
  */
 public class Utils {
-	private static final Log log = LogFactory.getLog(Utils.class);
+    private static final Log log = LogFactory.getLog(Utils.class);
     /**
      * This method is a helper for classes to implement {@link java.lang.Object#equals(java.lang.Object)}
      * checks if two objects are equals - two levels of checks are
@@ -204,24 +202,26 @@ public class Utils {
         return getSchemaFromString(unwrappedSchemaString);
     }
 
-	public static LogicalSchema parseSchema(String schemaString) throws ParserException {
-		QueryParserDriver queryParser = new QueryParserDriver( new PigContext(), 
-        		"util", new HashMap<String, String>() ) ;
+    public static LogicalSchema parseSchema(String schemaString) throws ParserException {
+        QueryParserDriver queryParser = new QueryParserDriver( new PigContext(), 
+                "util", new HashMap<String, String>() ) ;
         LogicalSchema schema = queryParser.parseSchema(schemaString);
-		return schema;
-	}
-	
+        return schema;
+    }
+
     /**
      * This method adds FieldSchema of 'input source tag/path' as the first
      * field. This will be called only when PigStorage is invoked with
-     * '-tagsource' option and the schema file is present to be loaded.
+     * '-tagFile' or '-tagPath' option and the schema file is present to be
+     * loaded.
      * 
      * @param schema
+     * @param fieldName
      * @return ResourceSchema
      */
-    public static ResourceSchema getSchemaWithInputSourceTag(ResourceSchema schema) {
+    public static ResourceSchema getSchemaWithInputSourceTag(ResourceSchema schema, String fieldName) {
         ResourceFieldSchema[] fieldSchemas = schema.getFields();
-        ResourceFieldSchema sourceTagSchema = new ResourceFieldSchema(new FieldSchema("INPUT_FILE_NAME", DataType.CHARARRAY));
+        ResourceFieldSchema sourceTagSchema = new ResourceFieldSchema(new FieldSchema(fieldName, DataType.CHARARRAY));
         ResourceFieldSchema[] fieldSchemasWithSourceTag = new ResourceFieldSchema[fieldSchemas.length + 1];
         fieldSchemasWithSourceTag[0] = sourceTagSchema;
         for(int j = 0; j < fieldSchemas.length; j++) {
@@ -324,15 +324,15 @@ public class Utils {
     }
     
    public static InputStream getCompositeStream(InputStream in, Properties properties) {
-	   //Load default ~/.pigbootup if not specified by user
-    	final String bootupFile = properties.getProperty("pig.load.default.statements", System.getProperty("user.home") + "/.pigbootup");
-    	try {
-    	final InputStream inputSteam = new FileInputStream(new File(bootupFile));
-    	return new SequenceInputStream(inputSteam, in);
-    	} catch(FileNotFoundException fe) {
-    		log.info("Default bootup file " +bootupFile+ " not found");
-    		return in;
-    	}
+       //Load default ~/.pigbootup if not specified by user
+        final String bootupFile = properties.getProperty("pig.load.default.statements", System.getProperty("user.home") + "/.pigbootup");
+        try {
+        final InputStream inputSteam = new FileInputStream(new File(bootupFile));
+        return new SequenceInputStream(inputSteam, in);
+        } catch(FileNotFoundException fe) {
+            log.info("Default bootup file " +bootupFile+ " not found");
+            return in;
+        }
     }
 
     /**

Modified: pig/trunk/test/org/apache/pig/test/TestPigStorage.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestPigStorage.java?rev=1421461&r1=1421460&r2=1421461&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestPigStorage.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestPigStorage.java Thu Dec 13 19:57:46 2012
@@ -28,7 +28,6 @@ import java.io.File;
 import java.io.FileWriter;
 import java.io.IOException;
 import java.io.PrintWriter;
-import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
@@ -59,9 +58,9 @@ import org.codehaus.jackson.map.JsonMapp
 import org.codehaus.jackson.map.ObjectMapper;
 import org.junit.After;
 import org.junit.AfterClass;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
-import org.junit.Assert;
 
 public class TestPigStorage  {
 
@@ -468,7 +467,7 @@ public class TestPigStorage  {
     
     /**
      * This is for testing source tagging option on PigStorage. When a user
-     * specifies '-tagsource' as an option, PigStorage must prepend the input
+     * specifies '-tagFile' as an option, PigStorage must prepend the input
      * source path to the tuple and "INPUT_FILE_NAME" to schema.
      * 
      * @throws Exception
@@ -482,18 +481,29 @@ public class TestPigStorage  {
         pig.store("a", datadir + "aout", "PigStorage('\\t', '-schema')");
         // aout now has a schema.
 
-        // Verify that loading a-out with '-tagsource' produces
+        // Verify that loading a-out with '-tagFile' produces
         // the original schema, and prepends 'INPUT_FILE_NAME' to
         // original schema.
-        pig.registerQuery("b = LOAD '" + datadir + "aout' using PigStorage('\\t', '-tagsource');");
+        pig.registerQuery("b = LOAD '" + datadir + "aout' using PigStorage('\\t', '-tagFile');");
         Schema genSchema = pig.dumpSchema("b");
-        // Verify that -tagsource schema works
-        String[] aliases = {"INPUT_FILE_NAME", "f1", "f2"};
-        byte[] types = {DataType.CHARARRAY, DataType.CHARARRAY, DataType.INTEGER};
+        String[] fileAliases = {"INPUT_FILE_NAME", "f1", "f2"};
+        byte[] fileTypes = {DataType.CHARARRAY, DataType.CHARARRAY, DataType.INTEGER};
         Schema newSchema = TypeCheckingTestUtil.genFlatSchema(
-                aliases,types);
-        Assert.assertTrue("schema with -tagsource preprends INPUT_FILE_NAME",
+                fileAliases,fileTypes);
+        Assert.assertTrue("schema with -tagFile preprends INPUT_FILE_NAME",
+                Schema.equals(newSchema, genSchema, true, false));
+        
+        // Verify that loading a-out with '-tagPath' produces
+        // the original schema, and prepends 'INPUT_FILE_PATH' to
+        // original schema.
+        pig.registerQuery("b = LOAD '" + datadir + "aout' using PigStorage('\\t', '-tagPath');");
+        genSchema = pig.dumpSchema("b");
+        String[] pathAliases = {"INPUT_FILE_PATH", "f1", "f2"};
+        byte[] pathTypes = {DataType.CHARARRAY, DataType.CHARARRAY, DataType.INTEGER};
+        newSchema = TypeCheckingTestUtil.genFlatSchema(pathAliases,pathTypes);
+        Assert.assertTrue("schema with -tagPath preprends INPUT_FILE_PATH",
                 Schema.equals(newSchema, genSchema, true, false));
+
         
         // Verify that explicitly requesting no schema works
         pig.registerQuery("d = LOAD '" + datadir + "aout' using PigStorage('\\t', '-noschema');");
@@ -501,7 +511,7 @@ public class TestPigStorage  {
         assertNull(genSchema);
 
         // Verify specifying your own schema works
-        pig.registerQuery("b = LOAD '" + datadir + "aout' using PigStorage('\\t', '-tagsource') " +
+        pig.registerQuery("b = LOAD '" + datadir + "aout' using PigStorage('\\t', '-tagFile') " +
         "as (input_file:chararray, foo:chararray, bar:int);");
         genSchema = pig.dumpSchema("b");
         String[] newAliases = {"input_file", "foo", "bar"};
@@ -522,14 +532,14 @@ public class TestPigStorage  {
         // Storing in 'aout' directory will store contents in part-m-00000
         pig.store("a", datadir + "aout", "PigStorage('\\t', '-schema')");
         
-        // Verify input source tag is present when using -tagsource
-        pig.registerQuery("b = LOAD '" + datadir + "aout' using PigStorage('\\t', '-tagsource');");
+        // Verify input source tag is present when using -tagFile or -tagPath
+        pig.registerQuery("b = LOAD '" + datadir + "aout' using PigStorage('\\t', '-tagFile');");
         pig.registerQuery("c = foreach b generate INPUT_FILE_NAME;");
         Iterator<Tuple> iter = pig.openIterator("c");
         while(iter.hasNext()) {
             Tuple tuple = iter.next();
             String inputFileName = (String)tuple.get(0);
-            assertEquals("tagsource value must be part-m-00000", inputFileName, storeFileName);
+            assertEquals("tagFile value must be part-m-00000", inputFileName, storeFileName);
         }
     }    
 }