You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2012/03/01 03:54:01 UTC
svn commit: r1295389 - in /pig/trunk: CHANGES.txt
src/org/apache/pig/builtin/PigStorage.java
src/org/apache/pig/impl/util/Utils.java
test/org/apache/pig/test/TestPigStorage.java
Author: daijy
Date: Thu Mar 1 02:54:00 2012
New Revision: 1295389
URL: http://svn.apache.org/viewvc?rev=1295389&view=rev
Log:
PIG-2541: Automatic record provenance (source tagging) for PigStorage
Modified:
pig/trunk/CHANGES.txt
pig/trunk/src/org/apache/pig/builtin/PigStorage.java
pig/trunk/src/org/apache/pig/impl/util/Utils.java
pig/trunk/test/org/apache/pig/test/TestPigStorage.java
Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1295389&r1=1295388&r2=1295389&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Thu Mar 1 02:54:00 2012
@@ -24,6 +24,8 @@ INCOMPATIBLE CHANGES
IMPROVEMENTS
+PIG-2541: Automatic record provenance (source tagging) for PigStorage (prkommireddi via daijy)
+
PIG-2538: Add helper wrapper classes for StoreFunc (billgraham via dvryaboy)
PIG-2010: registered jars on distributed cache (traviscrawford and julienledem via dvryaboy)
Modified: pig/trunk/src/org/apache/pig/builtin/PigStorage.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/builtin/PigStorage.java?rev=1295389&r1=1295388&r2=1295389&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/builtin/PigStorage.java (original)
+++ pig/trunk/src/org/apache/pig/builtin/PigStorage.java Thu Mar 1 02:54:00 2012
@@ -42,6 +42,7 @@ import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.pig.Expression;
@@ -84,6 +85,7 @@ import org.apache.pig.parser.ParserExcep
* <li><code>-schema</code> Reads/Stores the schema of the relation using a
* hidden JSON file.
* <li><code>-noschema</code> Ignores a stored schema during loading.
+ * <li><code>-tagsource</code> Appends input source file path to end of each tuple. Make sure to set "pig.splitCombination" to false
* </ul>
* <p>
* <h3>Schemas</h3>
@@ -99,6 +101,12 @@ import org.apache.pig.parser.ParserExcep
* This file simply lists the delimited aliases. This is intended to make export to tools that can read
* files with header lines easier (just cat the header to your data).
* <p>
+ * <h3>Source tagging</h3>
+ * If<code>-tagsource</code> is specified, PigStorage will prepend input split path to each Tuple/row.
+ * User needs to ensure <code>pig.splitCombination</code> is set to false.
+ * Usage: A = LOAD 'input' using PigStorage(',','-tagschema'); B = foreach A generate INPUT_FILE_NAME;
+ * The first field in each Tuple will contain input path (INPUT_FILE_NAME)
+ * <p>
* Note that regardless of whether or not you store the schema, you <b>always</b> need to specify
* the correct delimiter to read your data. If you store reading delimiter "#" and then load using
* the default delimiter, your data will not be parsed correctly.
@@ -137,10 +145,16 @@ LoadPushDown, LoadMetadata, StoreMetadat
protected boolean[] mRequiredColumns = null;
private boolean mRequiredColumnsInitialized = false;
+
+ //Indicates whether the input file path should be read.
+ private boolean tagSource = false;
+ private static final String TAG_SOURCE_PATH = "tagsource";
+ private Path sourcePath = null;
private void populateValidOptions() {
validOptions.addOption("schema", false, "Loads / Stores the schema of the relation using a hidden JSON file.");
validOptions.addOption("noschema", false, "Disable attempting to load data schema from the filesystem.");
+ validOptions.addOption(TAG_SOURCE_PATH, false, "Appends input source file path to end of each tuple. Make sure to set pig.splitCombination to false");
}
public PigStorage() {
@@ -166,6 +180,7 @@ LoadPushDown, LoadMetadata, StoreMetadat
* <ul>
* <li><code>-schema</code> Loads / Stores the schema of the relation using a hidden JSON file.
* <li><code>-noschema</code> Ignores a stored schema during loading.
+ * <li><code>-tagsource</code> Appends input source file path to end of each tuple. Make sure to set "pig.splitCombination" to false
* </ul>
* @param delimiter the single byte character that is used to separate fields.
* @param options a list of options that can be used to modify PigStorage behavior
@@ -179,6 +194,7 @@ LoadPushDown, LoadMetadata, StoreMetadat
configuredOptions = parser.parse(validOptions, optsArr);
isSchemaOn = configuredOptions.hasOption("schema");
dontLoadSchema = configuredOptions.hasOption("noschema");
+ tagSource = configuredOptions.hasOption(TAG_SOURCE_PATH);
} catch (ParseException e) {
HelpFormatter formatter = new HelpFormatter();
formatter.printHelp( "PigStorage(',', '[options]')", validOptions);
@@ -198,6 +214,11 @@ LoadPushDown, LoadMetadata, StoreMetadat
}
mRequiredColumnsInitialized = true;
}
+ //Prepend input source path if source tagging is enabled
+ if(tagSource) {
+ mProtoTuple.add(new DataByteArray(sourcePath.getName()));
+ }
+
try {
boolean notDone = in.nextKeyValue();
if (!notDone) {
@@ -343,6 +364,9 @@ LoadPushDown, LoadMetadata, StoreMetadat
@Override
public void prepareToRead(RecordReader reader, PigSplit split) {
in = reader;
+ if(tagSource) {
+ sourcePath = ((FileSplit)split.getWrappedSplit()).getPath();
+ }
}
@Override
@@ -442,6 +466,9 @@ LoadPushDown, LoadMetadata, StoreMetadat
schema = (new JsonMetadata()).getSchema(location, job, isSchemaOn);
if (signature != null && schema != null) {
+ if(tagSource) {
+ schema = Utils.getSchemaWithInputSourceTag(schema);
+ }
Properties p = UDFContext.getUDFContext().getUDFProperties(this.getClass(),
new String[] {signature});
p.setProperty(signature + ".schema", schema.toString());
Modified: pig/trunk/src/org/apache/pig/impl/util/Utils.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/util/Utils.java?rev=1295389&r1=1295388&r2=1295389&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/util/Utils.java (original)
+++ pig/trunk/src/org/apache/pig/impl/util/Utils.java Thu Mar 1 02:54:00 2012
@@ -40,6 +40,7 @@ import org.apache.pig.FuncSpec;
import org.apache.pig.LoadFunc;
import org.apache.pig.PigException;
import org.apache.pig.ResourceSchema;
+import org.apache.pig.ResourceSchema.ResourceFieldSchema;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
@@ -48,6 +49,7 @@ import org.apache.pig.impl.io.InterStora
import org.apache.pig.impl.io.ReadToEndLoader;
import org.apache.pig.impl.io.TFileStorage;
import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
import org.apache.pig.newplan.logical.Util;
import org.apache.pig.newplan.logical.relational.LogicalSchema;
import org.apache.pig.parser.ParserException;
@@ -183,7 +185,26 @@ public class Utils {
LogicalSchema schema = queryParser.parseSchema(schemaString);
return schema;
}
-
+
+ /**
+ * This method adds FieldSchema of 'input source tag/path' as the first
+ * field. This will be called only when PigStorage is invoked with
+ * '-tagsource' option and the schema file is present to be loaded.
+ *
+ * @param schema
+ * @return ResourceSchema
+ */
+ public static ResourceSchema getSchemaWithInputSourceTag(ResourceSchema schema) {
+ ResourceFieldSchema[] fieldSchemas = schema.getFields();
+ ResourceFieldSchema sourceTagSchema = new ResourceFieldSchema(new FieldSchema("INPUT_FILE_NAME", DataType.CHARARRAY));
+ ResourceFieldSchema[] fieldSchemasWithSourceTag = new ResourceFieldSchema[fieldSchemas.length + 1];
+ fieldSchemasWithSourceTag[0] = sourceTagSchema;
+ for(int j = 0; j < fieldSchemas.length; j++) {
+ fieldSchemasWithSourceTag[j + 1] = fieldSchemas[j];
+ }
+ return schema.setFields(fieldSchemasWithSourceTag);
+ }
+
public static String getTmpFileCompressorName(PigContext pigContext) {
if (pigContext == null)
return InterStorage.class.getName();
Modified: pig/trunk/test/org/apache/pig/test/TestPigStorage.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestPigStorage.java?rev=1295389&r1=1295388&r2=1295389&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestPigStorage.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestPigStorage.java Thu Mar 1 02:54:00 2012
@@ -463,4 +463,71 @@ public class TestPigStorage {
schema = pigStorage.getSchema(globtestdir+"{a,b}", new Job(ConfigurationUtil.toConfiguration(pigContext.getProperties())));
Assert.assertTrue(ResourceSchema.equals(schema, testSchema));
}
+
+ /**
+ * This is for testing source tagging option on PigStorage. When a user
+ * specifies '-tagsource' as an option, PigStorage must prepend the input
+ * source path to the tuple and "INPUT_FILE_NAME" to schema.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testPigStorageSourceTagSchema() throws Exception {
+ pigContext.connect();
+ String query = "a = LOAD '" + datadir + "originput' using PigStorage('\\t') " +
+ "as (f1:chararray, f2:int);";
+ pig.registerQuery(query);
+ pig.store("a", datadir + "aout", "PigStorage('\\t', '-schema')");
+ // aout now has a schema.
+
+ // Verify that loading a-out with '-tagsource' produces
+ // the original schema, and prepends 'INPUT_FILE_NAME' to
+ // original schema.
+ pig.registerQuery("b = LOAD '" + datadir + "aout' using PigStorage('\\t', '-tagsource');");
+ Schema genSchema = pig.dumpSchema("b");
+ // Verify that -tagsource schema works
+ String[] aliases = {"INPUT_FILE_NAME", "f1", "f2"};
+ byte[] types = {DataType.CHARARRAY, DataType.CHARARRAY, DataType.INTEGER};
+ Schema newSchema = TypeCheckingTestUtil.genFlatSchema(
+ aliases,types);
+ Assert.assertTrue("schema with -tagsource preprends INPUT_FILE_NAME",
+ Schema.equals(newSchema, genSchema, true, false));
+
+ // Verify that explicitly requesting no schema works
+ pig.registerQuery("d = LOAD '" + datadir + "aout' using PigStorage('\\t', '-noschema');");
+ genSchema = pig.dumpSchema("d");
+ assertNull(genSchema);
+
+ // Verify specifying your own schema works
+ pig.registerQuery("b = LOAD '" + datadir + "aout' using PigStorage('\\t', '-tagsource') " +
+ "as (input_file:chararray, foo:chararray, bar:int);");
+ genSchema = pig.dumpSchema("b");
+ String[] newAliases = {"input_file", "foo", "bar"};
+ byte[] newTypes = {DataType.CHARARRAY, DataType.CHARARRAY, DataType.INTEGER};
+ newSchema = TypeCheckingTestUtil.genFlatSchema(newAliases,newTypes);
+ Assert.assertTrue("explicit schema overrides metadata",
+ Schema.equals(newSchema, genSchema, true, false));
+ }
+
+ @Test
+ public void testPigStorageSourceTagValue() throws Exception {
+ final String storeFileName = "part-m-00000";
+ pigContext.connect();
+
+ String query = "a = LOAD '" + datadir + "' using PigStorage('\\t') " +
+ "as (f1:chararray, f2:int);";
+ pig.registerQuery(query);
+ // Storing in 'aout' directory will store contents in part-m-00000
+ pig.store("a", datadir + "aout", "PigStorage('\\t', '-schema')");
+
+ // Verify input source tag is present when using -tagsource
+ pig.registerQuery("b = LOAD '" + datadir + "aout' using PigStorage('\\t', '-tagsource');");
+ pig.registerQuery("c = foreach b generate INPUT_FILE_NAME;");
+ Iterator<Tuple> iter = pig.openIterator("c");
+ while(iter.hasNext()) {
+ Tuple tuple = iter.next();
+ String inputFileName = (String)tuple.get(0);
+ assertEquals("tagsource value must be part-m-00000", inputFileName, storeFileName);
+ }
+ }
}