You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ch...@apache.org on 2012/12/13 20:57:47 UTC
svn commit: r1421461 - in /pig/trunk: CHANGES.txt
src/docs/src/documentation/content/xdocs/func.xml
src/org/apache/pig/builtin/PigStorage.java
src/org/apache/pig/impl/util/Utils.java
test/org/apache/pig/test/TestPigStorage.java
Author: cheolsoo
Date: Thu Dec 13 19:57:46 2012
New Revision: 1421461
URL: http://svn.apache.org/viewvc?rev=1421461&view=rev
Log:
PIG-2857: Add a -tagPath option to PigStorage (prkommireddi via cheolsoo)
Modified:
pig/trunk/CHANGES.txt
pig/trunk/src/docs/src/documentation/content/xdocs/func.xml
pig/trunk/src/org/apache/pig/builtin/PigStorage.java
pig/trunk/src/org/apache/pig/impl/util/Utils.java
pig/trunk/test/org/apache/pig/test/TestPigStorage.java
Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1421461&r1=1421460&r2=1421461&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Thu Dec 13 19:57:46 2012
@@ -24,6 +24,8 @@ INCOMPATIBLE CHANGES
IMPROVEMENTS
+PIG-2857: Add a -tagPath option to PigStorage (prkommireddi via cheolsoo)
+
PIG-2341: Need better documentation on Pig/HBase integration (jthakrar and billgraham via billgraham)
PIG-3075: Allow AvroStorage STORE Operations To Use Schema Specified By URI (nwhite via cheolsoo)
Modified: pig/trunk/src/docs/src/documentation/content/xdocs/func.xml
URL: http://svn.apache.org/viewvc/pig/trunk/src/docs/src/documentation/content/xdocs/func.xml?rev=1421461&r1=1421460&r2=1421461&view=diff
==============================================================================
--- pig/trunk/src/docs/src/documentation/content/xdocs/func.xml (original)
+++ pig/trunk/src/docs/src/documentation/content/xdocs/func.xml Thu Dec 13 19:57:46 2012
@@ -1436,10 +1436,12 @@ STORE X INTO 'output' USING PigDump();
<p>A string that contains space-separated options (âoptionA optionB optionCâ)</p>
<p>Currently supported options are:</p>
<ul>
- <li>(âschemaâ) - Stores the schema of the relation using a hidden JSON file.</li>
- <li>(ânoschemaâ) - Ignores a stored schema during the load.</li>
- <li>('tagsource') - Add a first column indicates the input file of the record.</li>
- </ul>
+ <li>(âschemaâ) - Stores the schema of the relation using a hidden JSON file.</li>
+ <li>(ânoschemaâ) - Ignores a stored schema during the load.</li>
+ <li>('tagsource') - (deprecated, Use tagPath instead) Add a first column indicates the input file of the record.</li>
+ <li>('tagPath') - Add a first column indicates the input path of the record.</li>
+ <li>('tagFile') - Add a first column indicates the input file name of the record.</li>
+ </ul>
</td>
</tr>
</table></section>
@@ -1471,7 +1473,7 @@ STORE X INTO 'output' USING PigDump();
<p>Note that regardless of whether or not you store the schema, you always need to specify the correct delimiter to read your data. If you store reading delimiter "#" and then load using the default delimiter, your data will not be parsed correctly.</p>
<p><strong>Record Provenance</strong></p>
-<p>If tagsource option is specified, PigStorage will add a psudo-column INPUT_FILE_NAME to the beginning of the record. As the name suggests, it is the input file name containing this particular record.</p>
+<p>If tagPath or tagFile option is specified, PigStorage will add a pseudo-column INPUT_FILE_PATH or INPUT_FILE_NAME respectively to the beginning of the record. As the name suggests, it is the input file path/name containing this particular record. Please note tagsource is deprecated.</p>
<p><strong>Complex Data Types</strong></p>
<p>The formats for complex data types are shown here:</p>
Modified: pig/trunk/src/org/apache/pig/builtin/PigStorage.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/builtin/PigStorage.java?rev=1421461&r1=1421460&r2=1421461&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/builtin/PigStorage.java (original)
+++ pig/trunk/src/org/apache/pig/builtin/PigStorage.java Thu Dec 13 19:57:46 2012
@@ -41,8 +41,8 @@ import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.pig.Expression;
import org.apache.pig.FileInputLoadFunc;
@@ -84,7 +84,8 @@ import org.apache.pig.parser.ParserExcep
* <li><code>-schema</code> Reads/Stores the schema of the relation using a
* hidden JSON file.
* <li><code>-noschema</code> Ignores a stored schema during loading.
- * <li><code>-tagsource</code> Appends input source file path to beginning of each tuple.
+ * <li><code>-tagFile</code> Appends input source file name to beginning of each tuple.
+ * <li><code>-tagPath</code> Appends input source file path to beginning of each tuple.
* </ul>
* <p>
* <h3>Schemas</h3>
@@ -101,9 +102,12 @@ import org.apache.pig.parser.ParserExcep
* files with header lines easier (just cat the header to your data).
* <p>
* <h3>Source tagging</h3>
- * If<code>-tagsource</code> is specified, PigStorage will prepend input split path to each Tuple/row.
- * Usage: A = LOAD 'input' using PigStorage(',','-tagsource'); B = foreach A generate $0;
- * The first field (0th index) in each Tuple will contain input path
+ * If<code>-tagFile</code> is specified, PigStorage will prepend input split name to each Tuple/row.
+ * Usage: A = LOAD 'input' using PigStorage(',','-tagFile'); B = foreach A generate $0;
+ * The first field (0th index) in each Tuple will contain input file name.
+ * If<code>-tagPath</code> is specified, PigStorage will prepend input split path to each Tuple/row.
+ * Usage: A = LOAD 'input' using PigStorage(',','-tagPath'); B = foreach A generate $0;
+ * The first field (0th index) in each Tuple will contain input file path
* <p>
* Note that regardless of whether or not you store the schema, you <b>always</b> need to specify
* the correct delimiter to read your data. If you store reading delimiter "#" and then load using
@@ -144,15 +148,19 @@ LoadPushDown, LoadMetadata, StoreMetadat
protected boolean[] mRequiredColumns = null;
private boolean mRequiredColumnsInitialized = false;
- //Indicates whether the input file path should be read.
- private boolean tagSource = false;
- private static final String TAG_SOURCE_PATH = "tagsource";
+ // Indicates whether the input file name/path should be read.
+ private boolean tagFile = false;
+ private static final String TAG_SOURCE_FILE = "tagFile";
+ private boolean tagPath = false;
+ private static final String TAG_SOURCE_PATH = "tagPath";
private Path sourcePath = null;
private void populateValidOptions() {
validOptions.addOption("schema", false, "Loads / Stores the schema of the relation using a hidden JSON file.");
validOptions.addOption("noschema", false, "Disable attempting to load data schema from the filesystem.");
- validOptions.addOption(TAG_SOURCE_PATH, false, "Appends input source file path to beginning of each tuple. ");
+ validOptions.addOption(TAG_SOURCE_FILE, false, "Appends input source file name to beginning of each tuple.");
+ validOptions.addOption(TAG_SOURCE_PATH, false, "Appends input source file path to beginning of each tuple.");
+ validOptions.addOption("tagsource", false, "Appends input source file name to beginning of each tuple.");
}
public PigStorage() {
@@ -178,7 +186,8 @@ LoadPushDown, LoadMetadata, StoreMetadat
* <ul>
* <li><code>-schema</code> Loads / Stores the schema of the relation using a hidden JSON file.
* <li><code>-noschema</code> Ignores a stored schema during loading.
- * <li><code>-tagsource</code> Appends input source file path to beginning of each tuple.
+ * <li><code>-tagFile</code> Appends input source file name to beginning of each tuple.
+ * <li><code>-tagPath</code> Appends input source file path to beginning of each tuple.
* </ul>
* @param delimiter the single byte character that is used to separate fields.
* @param options a list of options that can be used to modify PigStorage behavior
@@ -192,7 +201,14 @@ LoadPushDown, LoadMetadata, StoreMetadat
configuredOptions = parser.parse(validOptions, optsArr);
isSchemaOn = configuredOptions.hasOption("schema");
dontLoadSchema = configuredOptions.hasOption("noschema");
- tagSource = configuredOptions.hasOption(TAG_SOURCE_PATH);
+ tagFile = configuredOptions.hasOption(TAG_SOURCE_FILE);
+ tagPath = configuredOptions.hasOption(TAG_SOURCE_PATH);
+ // TODO: Remove -tagsource in 0.13. For backward compatibility, we
+ // need tagsource to be supported until at least 0.12
+ if (configuredOptions.hasOption("tagsource")) {
+ mLog.warn("'-tagsource' is deprecated. Use '-tagFile' instead.");
+ tagFile = true;
+ }
} catch (ParseException e) {
HelpFormatter formatter = new HelpFormatter();
formatter.printHelp( "PigStorage(',', '[options]')", validOptions);
@@ -213,8 +229,10 @@ LoadPushDown, LoadMetadata, StoreMetadat
mRequiredColumnsInitialized = true;
}
//Prepend input source path if source tagging is enabled
- if(tagSource) {
- mProtoTuple.add(new DataByteArray(sourcePath.getName()));
+ if(tagFile) {
+ mProtoTuple.add(new DataByteArray(sourcePath.getName()));
+ } else if (tagPath) {
+ mProtoTuple.add(new DataByteArray(sourcePath.toString()));
}
try {
@@ -362,8 +380,8 @@ LoadPushDown, LoadMetadata, StoreMetadat
@Override
public void prepareToRead(RecordReader reader, PigSplit split) {
in = reader;
- if(tagSource) {
- sourcePath = ((FileSplit)split.getWrappedSplit()).getPath();
+ if (tagFile || tagPath) {
+ sourcePath = ((FileSplit)split.getWrappedSplit()).getPath();
}
}
@@ -471,8 +489,10 @@ LoadPushDown, LoadMetadata, StoreMetadat
schema = (new JsonMetadata()).getSchema(location, job, isSchemaOn);
if (signature != null && schema != null) {
- if(tagSource) {
- schema = Utils.getSchemaWithInputSourceTag(schema);
+ if(tagFile) {
+ schema = Utils.getSchemaWithInputSourceTag(schema, "INPUT_FILE_NAME");
+ } else if(tagPath) {
+ schema = Utils.getSchemaWithInputSourceTag(schema, "INPUT_FILE_PATH");
}
Properties p = UDFContext.getUDFContext().getUDFProperties(this.getClass(),
new String[] {signature});
Modified: pig/trunk/src/org/apache/pig/impl/util/Utils.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/util/Utils.java?rev=1421461&r1=1421460&r2=1421461&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/util/Utils.java (original)
+++ pig/trunk/src/org/apache/pig/impl/util/Utils.java Thu Dec 13 19:57:46 2012
@@ -17,7 +17,6 @@
*/
package org.apache.pig.impl.util;
-import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
@@ -51,7 +50,6 @@ import org.apache.pig.impl.io.ReadToEndL
import org.apache.pig.impl.io.TFileStorage;
import org.apache.pig.impl.logicalLayer.schema.Schema;
import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
-import org.apache.pig.newplan.logical.Util;
import org.apache.pig.newplan.logical.relational.LogicalSchema;
import org.apache.pig.parser.ParserException;
import org.apache.pig.parser.QueryParserDriver;
@@ -62,7 +60,7 @@ import com.google.common.collect.Lists;
* Class with utility static methods
*/
public class Utils {
- private static final Log log = LogFactory.getLog(Utils.class);
+ private static final Log log = LogFactory.getLog(Utils.class);
/**
* This method is a helper for classes to implement {@link java.lang.Object#equals(java.lang.Object)}
* checks if two objects are equals - two levels of checks are
@@ -204,24 +202,26 @@ public class Utils {
return getSchemaFromString(unwrappedSchemaString);
}
- public static LogicalSchema parseSchema(String schemaString) throws ParserException {
- QueryParserDriver queryParser = new QueryParserDriver( new PigContext(),
- "util", new HashMap<String, String>() ) ;
+ public static LogicalSchema parseSchema(String schemaString) throws ParserException {
+ QueryParserDriver queryParser = new QueryParserDriver( new PigContext(),
+ "util", new HashMap<String, String>() ) ;
LogicalSchema schema = queryParser.parseSchema(schemaString);
- return schema;
- }
-
+ return schema;
+ }
+
/**
* This method adds FieldSchema of 'input source tag/path' as the first
* field. This will be called only when PigStorage is invoked with
- * '-tagsource' option and the schema file is present to be loaded.
+ * '-tagFile' or '-tagPath' option and the schema file is present to be
+ * loaded.
*
* @param schema
+ * @param fieldName
* @return ResourceSchema
*/
- public static ResourceSchema getSchemaWithInputSourceTag(ResourceSchema schema) {
+ public static ResourceSchema getSchemaWithInputSourceTag(ResourceSchema schema, String fieldName) {
ResourceFieldSchema[] fieldSchemas = schema.getFields();
- ResourceFieldSchema sourceTagSchema = new ResourceFieldSchema(new FieldSchema("INPUT_FILE_NAME", DataType.CHARARRAY));
+ ResourceFieldSchema sourceTagSchema = new ResourceFieldSchema(new FieldSchema(fieldName, DataType.CHARARRAY));
ResourceFieldSchema[] fieldSchemasWithSourceTag = new ResourceFieldSchema[fieldSchemas.length + 1];
fieldSchemasWithSourceTag[0] = sourceTagSchema;
for(int j = 0; j < fieldSchemas.length; j++) {
@@ -324,15 +324,15 @@ public class Utils {
}
public static InputStream getCompositeStream(InputStream in, Properties properties) {
- //Load default ~/.pigbootup if not specified by user
- final String bootupFile = properties.getProperty("pig.load.default.statements", System.getProperty("user.home") + "/.pigbootup");
- try {
- final InputStream inputSteam = new FileInputStream(new File(bootupFile));
- return new SequenceInputStream(inputSteam, in);
- } catch(FileNotFoundException fe) {
- log.info("Default bootup file " +bootupFile+ " not found");
- return in;
- }
+ //Load default ~/.pigbootup if not specified by user
+ final String bootupFile = properties.getProperty("pig.load.default.statements", System.getProperty("user.home") + "/.pigbootup");
+ try {
+ final InputStream inputSteam = new FileInputStream(new File(bootupFile));
+ return new SequenceInputStream(inputSteam, in);
+ } catch(FileNotFoundException fe) {
+ log.info("Default bootup file " +bootupFile+ " not found");
+ return in;
+ }
}
/**
Modified: pig/trunk/test/org/apache/pig/test/TestPigStorage.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestPigStorage.java?rev=1421461&r1=1421460&r2=1421461&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestPigStorage.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestPigStorage.java Thu Dec 13 19:57:46 2012
@@ -28,7 +28,6 @@ import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.io.PrintWriter;
-import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
@@ -59,9 +58,9 @@ import org.codehaus.jackson.map.JsonMapp
import org.codehaus.jackson.map.ObjectMapper;
import org.junit.After;
import org.junit.AfterClass;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
-import org.junit.Assert;
public class TestPigStorage {
@@ -468,7 +467,7 @@ public class TestPigStorage {
/**
* This is for testing source tagging option on PigStorage. When a user
- * specifies '-tagsource' as an option, PigStorage must prepend the input
+ * specifies '-tagFile' as an option, PigStorage must prepend the input
* source path to the tuple and "INPUT_FILE_NAME" to schema.
*
* @throws Exception
@@ -482,18 +481,29 @@ public class TestPigStorage {
pig.store("a", datadir + "aout", "PigStorage('\\t', '-schema')");
// aout now has a schema.
- // Verify that loading a-out with '-tagsource' produces
+ // Verify that loading a-out with '-tagFile' produces
// the original schema, and prepends 'INPUT_FILE_NAME' to
// original schema.
- pig.registerQuery("b = LOAD '" + datadir + "aout' using PigStorage('\\t', '-tagsource');");
+ pig.registerQuery("b = LOAD '" + datadir + "aout' using PigStorage('\\t', '-tagFile');");
Schema genSchema = pig.dumpSchema("b");
- // Verify that -tagsource schema works
- String[] aliases = {"INPUT_FILE_NAME", "f1", "f2"};
- byte[] types = {DataType.CHARARRAY, DataType.CHARARRAY, DataType.INTEGER};
+ String[] fileAliases = {"INPUT_FILE_NAME", "f1", "f2"};
+ byte[] fileTypes = {DataType.CHARARRAY, DataType.CHARARRAY, DataType.INTEGER};
Schema newSchema = TypeCheckingTestUtil.genFlatSchema(
- aliases,types);
- Assert.assertTrue("schema with -tagsource preprends INPUT_FILE_NAME",
+ fileAliases,fileTypes);
+ Assert.assertTrue("schema with -tagFile preprends INPUT_FILE_NAME",
+ Schema.equals(newSchema, genSchema, true, false));
+
+ // Verify that loading a-out with '-tagPath' produces
+ // the original schema, and prepends 'INPUT_FILE_PATH' to
+ // original schema.
+ pig.registerQuery("b = LOAD '" + datadir + "aout' using PigStorage('\\t', '-tagPath');");
+ genSchema = pig.dumpSchema("b");
+ String[] pathAliases = {"INPUT_FILE_PATH", "f1", "f2"};
+ byte[] pathTypes = {DataType.CHARARRAY, DataType.CHARARRAY, DataType.INTEGER};
+ newSchema = TypeCheckingTestUtil.genFlatSchema(pathAliases,pathTypes);
+ Assert.assertTrue("schema with -tagPath preprends INPUT_FILE_PATH",
Schema.equals(newSchema, genSchema, true, false));
+
// Verify that explicitly requesting no schema works
pig.registerQuery("d = LOAD '" + datadir + "aout' using PigStorage('\\t', '-noschema');");
@@ -501,7 +511,7 @@ public class TestPigStorage {
assertNull(genSchema);
// Verify specifying your own schema works
- pig.registerQuery("b = LOAD '" + datadir + "aout' using PigStorage('\\t', '-tagsource') " +
+ pig.registerQuery("b = LOAD '" + datadir + "aout' using PigStorage('\\t', '-tagFile') " +
"as (input_file:chararray, foo:chararray, bar:int);");
genSchema = pig.dumpSchema("b");
String[] newAliases = {"input_file", "foo", "bar"};
@@ -522,14 +532,14 @@ public class TestPigStorage {
// Storing in 'aout' directory will store contents in part-m-00000
pig.store("a", datadir + "aout", "PigStorage('\\t', '-schema')");
- // Verify input source tag is present when using -tagsource
- pig.registerQuery("b = LOAD '" + datadir + "aout' using PigStorage('\\t', '-tagsource');");
+ // Verify input source tag is present when using -tagFile or -tagPath
+ pig.registerQuery("b = LOAD '" + datadir + "aout' using PigStorage('\\t', '-tagFile');");
pig.registerQuery("c = foreach b generate INPUT_FILE_NAME;");
Iterator<Tuple> iter = pig.openIterator("c");
while(iter.hasNext()) {
Tuple tuple = iter.next();
String inputFileName = (String)tuple.get(0);
- assertEquals("tagsource value must be part-m-00000", inputFileName, storeFileName);
+ assertEquals("tagFile value must be part-m-00000", inputFileName, storeFileName);
}
}
}