You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ol...@apache.org on 2010/08/05 02:38:59 UTC

svn commit: r982445 [1/2] - in /hadoop/pig/trunk/contrib: ./ piggybank/java/src/main/java/org/apache/pig/piggybank/storage/ piggybank/java/src/main/java/org/apache/pig/piggybank/storage/hiverc/ piggybank/java/src/test/java/org/apache/pig/piggybank/test...

Author: olga
Date: Thu Aug  5 00:38:58 2010
New Revision: 982445

URL: http://svn.apache.org/viewvc?rev=982445&view=rev
Log:
PIG-1526 improvements to HiveColumnarLoader - Partitioning Support (gerritjvv via olgan)

Modified:
    hadoop/pig/trunk/contrib/CHANGES.txt
    hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/HiveColumnarLoader.java
    hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/hiverc/HiveRCDateSplitter.java
    hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/hiverc/HiveRCInputFormat.java
    hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/hiverc/HiveRCRecordReader.java
    hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/hiverc/HiveRCSchemaUtil.java
    hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestHiveColumnarLoader.java

Modified: hadoop/pig/trunk/contrib/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/CHANGES.txt?rev=982445&r1=982444&r2=982445&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/CHANGES.txt (original)
+++ hadoop/pig/trunk/contrib/CHANGES.txt Thu Aug  5 00:38:58 2010
@@ -24,6 +24,8 @@ INCOMPATIBLE CHANGES
 
 IMPROVEMENTS
 
+PIG-1526 improvements to HiveColumnarLoader - Partitioning Support (gerritjvv via olgan)
+
 PIG-1229 allow pig to write output into a JDBC db (ankur via hashutosh)
 
 PIG-1385 UDF to create tuples and bags (hcbusy via gates)

Modified: hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/HiveColumnarLoader.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/HiveColumnarLoader.java?rev=982445&r1=982444&r2=982445&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/HiveColumnarLoader.java (original)
+++ hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/HiveColumnarLoader.java Thu Aug  5 00:38:58 2010
@@ -19,7 +19,9 @@ package org.apache.pig.piggybank.storage
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.LinkedHashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
 import java.util.regex.Pattern;
@@ -27,6 +29,7 @@ import java.util.regex.Pattern;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.hive.serde.Constants;
@@ -45,8 +48,8 @@ import org.apache.pig.FileInputLoadFunc;
 import org.apache.pig.LoadMetadata;
 import org.apache.pig.LoadPushDown;
 import org.apache.pig.ResourceSchema;
-import org.apache.pig.ResourceStatistics;
 import org.apache.pig.ResourceSchema.ResourceFieldSchema;
+import org.apache.pig.ResourceStatistics;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
@@ -58,71 +61,153 @@ import org.apache.pig.impl.util.UDFConte
 import org.apache.pig.piggybank.storage.hiverc.HiveRCInputFormat;
 import org.apache.pig.piggybank.storage.hiverc.HiveRCRecordReader;
 import org.apache.pig.piggybank.storage.hiverc.HiveRCSchemaUtil;
+import org.apache.pig.piggybank.storage.partition.PathPartitionHelper;
 
 /**
  * Loader for Hive RC Columnar files.<br/>
  * Supports the following types:<br/>
- *  * <table>
- *  <tr><th>Hive Type</th><th>Pig Type from DataType</th></tr>
- *  <tr><td>string</td><td>CHARARRAY</td></tr>
- *  <tr><td>int</td><td>INTEGER</td></tr>
- *  <tr><td>bigint or long</td><td>LONG</td></tr>
- *  <tr><td>float</td><td>float</td></tr>
- *  <tr><td>double</td><td>DOUBLE</td></tr>
- *  <tr><td>boolean</td><td>BOOLEAN</td></tr>
- *  <tr><td>byte</td><td>BYTE</td></tr>
- *  <tr><td>array</td><td>TUPLE</td></tr>
- *  <tr><td>map</td><td>MAP</td></tr>
+ * *
+ * <table>
+ * <tr>
+ * <th>Hive Type</th>
+ * <th>Pig Type from DataType</th>
+ * </tr>
+ * <tr>
+ * <td>string</td>
+ * <td>CHARARRAY</td>
+ * </tr>
+ * <tr>
+ * <td>int</td>
+ * <td>INTEGER</td>
+ * </tr>
+ * <tr>
+ * <td>bigint or long</td>
+ * <td>LONG</td>
+ * </tr>
+ * <tr>
+ * <td>float</td>
+ * <td>float</td>
+ * </tr>
+ * <tr>
+ * <td>double</td>
+ * <td>DOUBLE</td>
+ * </tr>
+ * <tr>
+ * <td>boolean</td>
+ * <td>BOOLEAN</td>
+ * </tr>
+ * <tr>
+ * <td>byte</td>
+ * <td>BYTE</td>
+ * </tr>
+ * <tr>
+ * <td>array</td>
+ * <td>TUPLE</td>
+ * </tr>
+ * <tr>
+ * <td>map</td>
+ * <td>MAP</td>
+ * </tr>
  * </table>
  * 
- *<br/>
- *
- *Usage 1:<br/>
- *To load a hive table: uid bigint, ts long, arr ARRAY<string,string>, m MAP<String, String>
- *<code>
+ * <p/>
+ * <b>Partitions</b><br/>
+ * The input paths are scanned by the loader for [partition name]=[value]
+ * patterns in the subdirectories.<br/>
+ * If detected these partitions are appended to the table schema.<br/>
+ * For example if you have the directory structure:<br/>
+ * 
+ * <pre>
+ * /user/hive/warehouse/mytable
+ * 				/year=2010/month=02/day=01
+ * </pre>
+ * 
+ * The mytable schema is (id int,name string).<br/>
+ * The final schema returned in pig will be (id:int, name:chararray,
+ * year:chararray, month:chararray, day:chararray).<br/>
+ * <p/>
+ * Usage 1:
+ * <p/>
+ * To load a hive table: uid bigint, ts long, arr ARRAY<string,string>, m
+ * MAP<String, String> <br/>
+ * <code>
+ * <pre>
  * a = LOAD 'file' USING HiveColumnarLoader("uid bigint, ts long, arr array<string,string>, m map<string,string>");
- *-- to reference the fields
- * b = FOREACH GENERATE a.uid, a.ts, a.arr, a.m; 
- *</code>
- *<p/>
- *Usage 2:<br/>
- *To load a hive table: uid bigint, ts long, arr ARRAY<string,string>, m MAP<String, String> only processing dates 2009-10-01 to 2009-10-02 in a <br/>
- *date partitioned hive table.<br/>
- *<code>
+ * -- to reference the fields
+ * b = FOREACH GENERATE a.uid, a.ts, a.arr, a.m;
+ * </pre> 
+ * </code>
+ * <p/>
+ * Usage 2:
+ * <p/>
+ * To load a hive table: uid bigint, ts long, arr ARRAY<string,string>, m
+ * MAP<String, String> only processing dates 2009-10-01 to 2009-10-02 in a <br/>
+ * date partitioned hive table.<br/>
+ * <b>Old Usage</b><br/>
+ * <b>Note:</b> The partitions can be filtered by using pig's FILTER operator.<br/>
+ * <code>
+ * <pre>
  * a = LOAD 'file' USING HiveColumnarLoader("uid bigint, ts long, arr array<string,string>, m map<string,string>", "2009-10-01:2009-10-02");
- *-- to reference the fields
- * b = FOREACH GENERATE a.uid, a.ts, a.arr, a.m; 
- *</code>
- *<p/>
- *Usage 3:<br/>
- *To load a hive table: uid bigint, ts long, arr ARRAY<string,string>, m MAP<String, String> only reading column uid and ts.<br/
- *<code>
- * a = LOAD 'file' USING HiveColumnarLoader("uid bigint, ts long, arr array<string,string>, m map<string,string>", "", "uid,ts");
- *-- to reference the fields
- * b = FOREACH a GENERATE uid, ts, arr, m; 
- *</code>
- *<p/>
- *Usage 4:<br/>
- *To load a hive table: uid bigint, ts long, arr ARRAY<string,string>, m MAP<String, String> only reading column uid and ts for dates 2009-10-01 to 2009-10-02.<br/
- *<code>
- * a = LOAD 'file' USING HiveColumnarLoader("uid bigint, ts long, arr array<string,string>, m map<string,string>", "2009-10-01:2009-10-02", "uid,ts");
- *-- to reference the fields
- * b = FOREACH a GENERATE uid, ts, arr, m; 
- *</code> 
- *<p/>
- *<b>Issues</b><p/>
- *<u>Table schema definition</u><br/>
- *The schema definition must be column name followed by a space then a comma then no space and the next column name and so on.<br/>
- *This so column1 string, column2 string will not word, it must be column1 string,column2 string
- *<p/>
- *<u>Date partitioning</u><br/>
- *Hive date partition folders must have format daydate=[date].
+ * -- to reference the fields
+ * b = FOREACH GENERATE a.uid, a.ts, a.arr, a.m;
+ * </pre> 
+ * </code> <br/>
+ * <b>New Usage</b/><br/>
+ * <code>
+ * <pre>
+ * a = LOAD 'file' USING HiveColumnarLoader("uid bigint, ts long, arr array<string,string>, m map<string,string>");
+ * f = FILTER a BY daydate>='2009-10-01' AND daydate >='2009-10-02';
+ * </pre>
+ * </code>
+ * <p/>
+ * Usage 3:
+ * <p/>
+ * To load a hive table: uid bigint, ts long, arr ARRAY<string,string>, m
+ * MAP<String, String> only reading column uid and ts for dates 2009-10-01 to
+ * 2009-10-02.<br/ <br/>
+ * <b>Old Usage</b><br/>
+ * <b>Note:<b/> This behaviour is now supported in pig by LoadPushDown adding
+ * the columns needed to be loaded like below is ignored and pig will
+ * automatically send the columns used by the script to the loader.<br/>
+ * <code>
+ * <pre>
+ * a = LOAD 'file' USING HiveColumnarLoader("uid bigint, ts long, arr array<string,string>, m map<string,string>");
+ * f = FILTER a BY daydate>='2009-10-01' AND daydate >='2009-10-02';
+ * -- to reference the fields
+ * b = FOREACH a GENERATE uid, ts, arr, m;
+ * </pre> 
+ * </code>
+ * <p/>
+ * <b>Issues</b>
+ * <p/>
+ * <u>Table schema definition</u><br/>
+ * The schema definition must be column name followed by a space then a comma
+ * then no space and the next column name and so on.<br/>
+ * This so column1 string, column2 string will not work, it must be column1
+ * string,column2 string
+ * <p/>
+ * <u>Partitioning</u><br/>
+ * Partitions must be in the format [partition name]=[partition value]<br/>
+ * Only strings are supported in the partitioning.<br/>
+ * Partitions must follow the same naming convention for all sub directories in
+ * a table<br/>
+ * For example:<br/>
+ * The following is not valid:<br/>
+ * 
+ * <pre>
+ *     mytable/hour=00
+ *     mytable/day=01/hour=00
+ * </pre>
+ * 
  **/
-public class HiveColumnarLoader extends FileInputLoadFunc implements LoadMetadata, LoadPushDown{
+public class HiveColumnarLoader extends FileInputLoadFunc implements
+	LoadMetadata, LoadPushDown {
 
-    private static final String PROJECTION_ID = HiveColumnarLoader.class.getName() + ".projection";
+    public static final String PROJECTION_ID = HiveColumnarLoader.class
+	    .getName() + ".projection";
 
-    public static final String DAY_DATE_COLUMN = "daydate";
+    public static final String DATE_RANGE = HiveColumnarLoader.class.getName()
+	    + ".date-range";
 
     private static final Text text = new Text();
 
@@ -130,357 +215,525 @@ public class HiveColumnarLoader extends 
      * Regex to filter out column names
      */
     protected static final Pattern pcols = Pattern.compile("[a-zA-Z_0-9]*[ ]");
-    protected static final Log LOG = LogFactory.getLog(HiveColumnarLoader.class);
+    protected static final Log LOG = LogFactory
+	    .getLog(HiveColumnarLoader.class);
+
     protected TupleFactory tupleFactory = TupleFactory.getInstance();
 
+    String signature = "";
+
+    // we need to save the dateRange from the constructor if provided to add to
+    // the UDFContext only when the signature is available.
+    String dateRange = null;
+
     HiveRCRecordReader reader;
 
     ColumnarSerDe serde = null;
     Configuration conf = null;
 
     ResourceSchema pigSchema;
-
-    String table_schema;
+    boolean partitionKeysSet = false;
 
     BytesRefArrayWritable buff = null;
 
-    String currentDate;
-
-    private String dateRange = null;
-    boolean applyDateRanges = false;
-
-    private boolean applyColumnRead = false;
     private Properties props;
     private HiveConf hiveConf;
-    private int[] columnToReadPositions;
+
+    transient int[] requiredColumns;
+
+    transient Set<String> partitionColumns;
 
     /**
-     * Table schema should be a space and comma separated string describing the Hive schema.<br/>
-     * For example uid BIGINT, pid long, means 1 column of uid type BIGINT and one column of pid type LONG.<br/>
-     * The types are not case sensitive.
-     * @param table_schema This property cannot be null
+     * Implements the logic for searching partition keys and applying parition
+     * filtering
      */
+    transient PathPartitionHelper pathPartitionerHelper = new PathPartitionHelper();
+
+    transient Path currentPath = null;
+    transient Map<String, String> currentPathPartitionKeyMap;
 
+    /**
+     * Table schema should be a space and comma separated string describing the
+     * Hive schema.<br/>
+     * For example uid BIGINT, pid long, means 1 column of uid type BIGINT and
+     * one column of pid type LONG.<br/>
+     * The types are not case sensitive.
+     * 
+     * @param table_schema
+     *            This property cannot be null
+     */
     public HiveColumnarLoader(String table_schema) {
-        //tells all read methods to not apply date range checking
-        applyDateRanges = false;
-        setup(table_schema, false, null);
+	setup(table_schema);
     }
 
     /**
-     * Table schema should be a space and comma separated string describing the Hive schema.<br/>
-     * For example uid BIGINT, pid long, means 1 column of uid type BIGINT and one column of pid type LONG.<br/>
+     * This constructor is for backward compatibility.
+     * 
+     * Table schema should be a space and comma separated string describing the
+     * Hive schema.<br/>
+     * For example uid BIGINT, pid long, means 1 column of uid type BIGINT and
+     * one column of pid type LONG.<br/>
      * The types are not case sensitive.
-     * @param table_schema This property cannot be null
-     * @param dateRange must have format yyyy-MM-dd:yyy-MM-dd only dates between these two dates inclusively will be considered.
+     * 
+     * @param table_schema
+     *            This property cannot be null
+     * @param dateRange
+     *            String
+     * @param columns
+     *            String not used any more
      */
-    public HiveColumnarLoader(String table_schema, String dateRange) {  
-        applyDateRanges = (dateRange != null && dateRange.trim().length() > 0);
-        this.dateRange = dateRange;
-        setup(table_schema, applyDateRanges, null);
-    }
+    public HiveColumnarLoader(String table_schema, String dateRange,
+	    String columns) {
+	setup(table_schema);
 
+	this.dateRange = dateRange;
+    }
 
-    public HiveColumnarLoader(String table_schema, String dateRange, String columns) {
-        applyDateRanges = (dateRange != null && dateRange.trim().length() > 0);
-        this.dateRange = dateRange;
+    /**
+     * This constructor is for backward compatibility.
+     * 
+     * Table schema should be a space and comma separated string describing the
+     * Hive schema.<br/>
+     * For example uid BIGINT, pid long, means 1 column of uid type BIGINT and
+     * one column of pid type LONG.<br/>
+     * The types are not case sensitive.
+     * 
+     * @param table_schema
+     *            This property cannot be null
+     * @param dateRange
+     *            String
+     */
+    public HiveColumnarLoader(String table_schema, String dateRange) {
+	setup(table_schema);
 
-        setup(table_schema, applyDateRanges, columns);
+	this.dateRange = dateRange;
     }
 
+    private Properties getUDFContext() {
+	return UDFContext.getUDFContext().getUDFProperties(this.getClass(),
+		new String[] { signature });
+    }
 
     @Override
-    public InputFormat<LongWritable, BytesRefArrayWritable> getInputFormat() throws IOException {
-        return new HiveRCInputFormat(dateRange);
+    public InputFormat<LongWritable, BytesRefArrayWritable> getInputFormat()
+	    throws IOException {
+	LOG.info("Signature: " + signature);
+	return new HiveRCInputFormat(signature);
     }
 
     @Override
     public Tuple getNext() throws IOException {
-        Tuple tuple = null;
-
-        try {
-            if(reader.nextKeyValue()){
+	Tuple tuple = null;
 
-                BytesRefArrayWritable buff = reader.getCurrentValue();
-                ColumnarStruct struct = readColumnarStruct(buff);
+	try {
+	    if (reader.nextKeyValue()) {
 
-                if(applyColumnRead) tuple = readColumnarTuple(struct);
-                else tuple = readWholeRow(struct);
-            }
+		BytesRefArrayWritable buff = reader.getCurrentValue();
+		ColumnarStruct struct = readColumnarStruct(buff);
 
-        } catch (InterruptedException e) {
-            throw new IOException(e.toString(), e);
-        }
+		tuple = readColumnarTuple(struct, reader.getSplitPath());
+	    }
 
+	} catch (InterruptedException e) {
+	    throw new IOException(e.toString(), e);
+	}
 
-        return tuple;
+	return tuple;
     }
 
-
-    @SuppressWarnings("unchecked")
     @Override
-    public void prepareToRead(RecordReader reader, PigSplit split)
-    throws IOException {
+    public void prepareToRead(
+	    @SuppressWarnings("rawtypes") RecordReader reader, PigSplit split)
+	    throws IOException {
+
+	this.reader = (HiveRCRecordReader) reader;
+
+	// check that the required indexes actually exist i.e. the columns that
+	// should be read.
+	// assuming this is always defined simplifies the readColumnarTuple
+	// logic.
+
+	int requiredIndexes[] = getRequiredColumns();
+	if (requiredIndexes == null) {
+
+	    int fieldLen = pigSchema.getFields().length;
+
+	    // if any the partition keys should already exist
+	    String[] partitionKeys = getPartitionKeys(null, null);
+	    if (partitionKeys != null) {
+		fieldLen = partitionKeys.length;
+	    }
+
+	    requiredIndexes = new int[fieldLen];
+
+	    for (int i = 0; i < fieldLen; i++) {
+		requiredIndexes[i] = i;
+	    }
+
+	    this.requiredColumns = requiredIndexes;
+	}
+
+	try {
+	    serde = new ColumnarSerDe();
+	    serde.initialize(hiveConf, props);
+	} catch (SerDeException e) {
+	    LOG.error(e.toString(), e);
+	    throw new IOException(e);
+	}
+
+    }
 
-        this.reader = (HiveRCRecordReader)reader;
+    @Override
+    public void setLocation(String location, Job job) throws IOException {
+	FileInputFormat.setInputPaths(job, location);
+    }
 
-        // If the date range applies, set the location for each date range
-        if(applyDateRanges)
-            currentDate = HiveRCSchemaUtil.extractDayDate(this.reader.getSplitPath().toString());
+    /**
+     * Does the configuration setup and schema parsing and setup.
+     * 
+     * @param table_schema
+     *            String
+     * @param columnsToRead
+     *            String
+     */
+    private void setup(String table_schema) {
 
-        // All fields in a hive rc file are thrift serialized, the ColumnarSerDe is used for serialization and deserialization
-        try {
-            serde = new ColumnarSerDe();
-            serde.initialize(hiveConf, props);
-        } catch (SerDeException e) {
-            LOG.error(e.toString(), e);
-            throw new IOException(e);
-        }
+	if (table_schema == null)
+	    throw new RuntimeException(
+		    "The table schema must be defined as colname type, colname type.  All types are hive types");
+
+	// create basic configuration for hdfs and hive
+	conf = new Configuration();
+	hiveConf = new HiveConf(conf, SessionState.class);
+
+	// parse the table_schema string
+	List<String> types = HiveRCSchemaUtil.parseSchemaTypes(table_schema);
+	List<String> cols = HiveRCSchemaUtil.parseSchema(pcols, table_schema);
+
+	List<FieldSchema> fieldSchemaList = new ArrayList<FieldSchema>(
+		cols.size());
+
+	for (int i = 0; i < cols.size(); i++) {
+	    fieldSchemaList.add(new FieldSchema(cols.get(i), HiveRCSchemaUtil
+		    .findPigDataType(types.get(i))));
+	}
+
+	pigSchema = new ResourceSchema(new Schema(fieldSchemaList));
+
+	props = new Properties();
+
+	// setting table schema properties for ColumnarSerDe
+	// these properties are never changed by the columns to read filter,
+	// because the columnar serde needs to now the
+	// complete format of each record.
+	props.setProperty(Constants.LIST_COLUMNS,
+		HiveRCSchemaUtil.listToString(cols));
+	props.setProperty(Constants.LIST_COLUMN_TYPES,
+		HiveRCSchemaUtil.listToString(types));
 
     }
 
-    @Override
-    public void setLocation(String locationStr, Job job) throws IOException {
-        FileInputFormat.setInputPaths(job, locationStr);
+    /**
+     * Uses the ColumnarSerde to deserialize the buff:BytesRefArrayWritable into
+     * a ColumnarStruct instance.
+     * 
+     * @param buff
+     *            BytesRefArrayWritable
+     * @return ColumnarStruct
+     */
+    private ColumnarStruct readColumnarStruct(BytesRefArrayWritable buff) {
+	// use ColumnarSerDe to deserialize row
+	ColumnarStruct struct = null;
+	try {
+	    struct = (ColumnarStruct) serde.deserialize(buff);
+	} catch (SerDeException e) {
+	    LOG.error(e.toString(), e);
+	    throw new RuntimeException(e.toString(), e);
+	}
+
+	return struct;
     }
 
     /**
-     * Does the configuration setup and schema parsing and setup.
-     * @param table_schema String
-     * @param includeDayDateColumn boolean
-     * @param columnsToRead String
+     * Only read the columns that were requested in the constructor.<br/>
+     * 
+     * @param struct
+     *            ColumnarStruct
+     * @param path
+     *            Path
+     * @return Tuple
+     * @throws IOException
      */
-    private void setup(String table_schema, boolean includeDayDateColumn, String columnsToRead){
+    private Tuple readColumnarTuple(ColumnarStruct struct, Path path)
+	    throws IOException {
 
+	int[] columnIndexes = getRequiredColumns();
+	// the partition keys if any will already be in the UDFContext here.
+	String[] partitionKeys = getPartitionKeys(null, null);
+	// only if the path has changed should be run the
+	if (currentPath == null || !currentPath.equals(path)) {
+	    currentPathPartitionKeyMap = (partitionKeys == null) ? null
+		    : pathPartitionerHelper.getPathPartitionKeyValues(path
+			    .toString());
+	    currentPath = path;
+	}
+
+	// if the partitionColumns is null this value will stop the for loop
+	// below from trynig to add any partition columns
+	// that do not exist
+	int partitionColumnStartIndex = Integer.MAX_VALUE;
+
+	if (!(partitionColumns == null || partitionColumns.size() == 0)) {
+	    // partition columns are always appended to the schema fields.
+	    partitionColumnStartIndex = pigSchema.getFields().length;
+
+	}
+
+	// create tuple with determined previous size
+	Tuple t = tupleFactory.newTuple(columnIndexes.length);
+
+	// read in all columns
+	for (int i = 0; i < columnIndexes.length; i++) {
+	    int columnIndex = columnIndexes[i];
+
+	    if (columnIndex < partitionColumnStartIndex) {
+		Object obj = struct.getField(columnIndex, text);
+		Object pigType = HiveRCSchemaUtil
+			.extractPigTypeFromHiveType(obj);
+
+		t.set(i, pigType);
+
+	    } else {
+		// read the partition columns
+		// will only be executed if partitionColumns is not null
+		String key = partitionKeys[columnIndex
+			- partitionColumnStartIndex];
+		Object value = currentPathPartitionKeyMap.get(key);
+		t.set(i, value);
 
-        if(table_schema == null)
-            throw new RuntimeException("The table schema must be defined as colname type, colname type.  All types are hive types");
+	    }
 
-        this.table_schema = table_schema;
+	}
 
-        //create basic configuration for hdfs and hive
-        conf = new Configuration();
-        hiveConf = new HiveConf(conf, SessionState.class);
+	return t;
+    }
 
-        //parse the table_schema string
-        List<String> types = HiveRCSchemaUtil.parseSchemaTypes(table_schema);
-        List<String> cols = HiveRCSchemaUtil.parseSchema(pcols, table_schema);
-        List<FieldSchema> fieldschema = null;
+    /**
+     * Will parse the required columns from the UDFContext properties if the
+     * requiredColumns[] variable is null, or else just return the
+     * requiredColumns.
+     * 
+     * @return int[]
+     */
+    private int[] getRequiredColumns() {
 
-        //all columns must have types defined
-        if(types.size() != cols.size())
-            throw new RuntimeException("Each column in the schema must have a type defined");
+	if (requiredColumns == null) {
+	    Properties properties = getUDFContext();
 
+	    String projectionStr = properties.getProperty(PROJECTION_ID);
 
-        //check if previous projection exists
-        if(columnsToRead == null){
-            Properties properties = UDFContext.getUDFContext().getUDFProperties(this.getClass());
-            String projection = properties.getProperty(PROJECTION_ID);
-            if(projection != null && !projection.isEmpty())
-                columnsToRead = projection;
-        }
+	    if (projectionStr != null) {
+		String[] split = projectionStr.split(",");
+		int columnIndexes[] = new int[split.length];
 
+		int index = 0;
+		for (String splitItem : split) {
+		    columnIndexes[index++] = Integer.parseInt(splitItem);
+		}
 
-        //re-check columnsToRead
-        if (columnsToRead == null) {
+		requiredColumns = columnIndexes;
+	    }
 
-            fieldschema = new ArrayList<FieldSchema>(cols.size());
+	}
 
-            for(int i = 0; i < cols.size(); i++){
-                fieldschema.add(new FieldSchema(cols.get(i), HiveRCSchemaUtil.findPigDataType(types.get(i))));
-            }
+	return requiredColumns;
+    }
 
+    /**
+     * Reads the partition columns
+     * 
+     * @param location
+     * @param job
+     * @return
+     */
+    private Set<String> getPartitionColumns(String location, Job job) {
 
-        } else {
-            //compile list for column filtering
-            Set<String> columnToReadList = HiveRCSchemaUtil.compileSet(columnsToRead);
+	if (partitionColumns == null) {
+	    // read the partition columns from the UDF Context first.
+	    // if not in the UDF context then read it using the PathPartitioner.
 
-            if(columnToReadList.size() < 1)
-                throw new RuntimeException("Error parsing columns: " + columnsToRead);
+	    Properties properties = getUDFContext();
 
-            applyColumnRead = true;
-            int columnToReadLen = columnToReadList.size();
+	    if (properties == null)
+		properties = new Properties();
 
-            fieldschema = new ArrayList<FieldSchema>(columnToReadLen);
+	    String partitionColumnStr = properties
+		    .getProperty(PathPartitionHelper.PARTITION_COLUMNS);
 
+	    if (partitionColumnStr == null
+		    && !(location == null || job == null)) {
+		// if it hasn't been written yet.
+		Set<String> partitionColumnSet;
 
-            //--- create Pig Schema and add columnToReadPositions.
-            columnToReadPositions = new int[columnToReadLen];
+		try {
+		    partitionColumnSet = pathPartitionerHelper
+			    .getPartitionKeys(location, job.getConfiguration());
+		} catch (IOException e) {
 
-            int len = cols.size();
-            String columnName = null;
-            int colArrayPosindex = 0;
+		    RuntimeException rte = new RuntimeException(e);
+		    rte.setStackTrace(e.getStackTrace());
+		    throw rte;
 
-            for(int i = 0; i < len; i++){
-                //i is the column position
-                columnName = cols.get(i);
-                if(columnToReadList.contains(columnName)){
-                    //if the column is contained in the columnList then add its position to the columnPositions array and to the pig schema
-                    columnToReadPositions[colArrayPosindex++] = i;
+		}
 
-                    fieldschema.add(new FieldSchema(columnName, HiveRCSchemaUtil.findPigDataType(types.get(i))));
-                }
+		if (partitionColumnSet != null) {
 
-            }
+		    StringBuilder buff = new StringBuilder();
 
+		    int i = 0;
+		    for (String column : partitionColumnSet) {
+			if (i++ != 0) {
+			    buff.append(',');
+			}
 
-            //sort column positions
-            Arrays.sort(columnToReadPositions);
+			buff.append(column);
+		    }
 
-        }
+		    String buffStr = buff.toString().trim();
 
-        if(includeDayDateColumn){
-            fieldschema.add(new FieldSchema(DAY_DATE_COLUMN, DataType.CHARARRAY));
-        }
+		    if (buffStr.length() > 0) {
 
-        pigSchema = new ResourceSchema(new Schema(fieldschema));
+			properties.setProperty(
+				PathPartitionHelper.PARTITION_COLUMNS,
+				buff.toString());
+		    }
 
-        props =  new Properties();
+		    partitionColumns = partitionColumnSet;
 
-        //   setting table schema properties for ColumnarSerDe
-        //   these properties are never changed by the columns to read filter, because the columnar serde needs to now the 
-        //   complete format of each record.
-        props.setProperty(Constants.LIST_COLUMNS, HiveRCSchemaUtil.listToString(cols));
-        props.setProperty(Constants.LIST_COLUMN_TYPES, HiveRCSchemaUtil.listToString(types));
+		}
 
-    }
+	    } else {
+		// the partition columns has been set already in the UDF Context
+		if (partitionColumnStr != null) {
+		    String split[] = partitionColumnStr.split(",");
+		    partitionColumns = new LinkedHashSet<String>();
+		    if (split.length > 0) {
+			for (String splitItem : split) {
+			    partitionColumns.add(splitItem);
+			}
+		    }
+		}
 
-    /**
-     * Uses the ColumnarSerde to deserialize the buff:BytesRefArrayWritable into a ColumnarStruct instance.
-     * @param buff BytesRefArrayWritable
-     * @return ColumnarStruct
-     */
-    private ColumnarStruct readColumnarStruct(BytesRefArrayWritable buff){
-        //use ColumnarSerDe to deserialize row
-        ColumnarStruct struct = null;
-        try {
-            struct = (ColumnarStruct)serde.deserialize(buff);
-        } catch (SerDeException e) {
-            LOG.error(e.toString(), e);
-            throw new RuntimeException(e.toString(), e);
-        }
+	    }
 
-        return struct;
-    }
-    /**
-     * Only read the columns that were requested in the constructor.<br/> 
-     * @param struct ColumnarStruct
-     * @return Tuple
-     * @throws IOException
-     */
-    private Tuple readColumnarTuple(ColumnarStruct struct) throws IOException{
+	}
 
+	return partitionColumns;
 
-        int columnToReadLen = columnToReadPositions.length;
+    }
 
-        //create tuple with determined previous size
-        Tuple t = tupleFactory.newTuple( (applyDateRanges)?columnToReadLen + 1 : columnToReadLen );
+    @Override
+    public String[] getPartitionKeys(String location, Job job)
+	    throws IOException {
+	Set<String> partitionKeys = getPartitionColumns(location, job);
 
-        int index = 0;
+	return partitionKeys == null ? null : partitionKeys
+		.toArray(new String[] {});
+    }
 
-        //read in all columns
-        for(int i = 0; i < columnToReadLen; i++){
-            index = columnToReadPositions[i];
+    @Override
+    public ResourceSchema getSchema(String location, Job job)
+	    throws IOException {
 
-            Object obj = struct.getField(index, text);
+	if (!partitionKeysSet) {
+	    Set<String> keys = getPartitionColumns(location, job);
 
-            t.set(i, HiveRCSchemaUtil.extractPigTypeFromHiveType(obj));
+	    if (!(keys == null || keys.size() == 0)) {
 
-        }
+		// re-edit the pigSchema to contain the new partition keys.
+		ResourceFieldSchema[] fields = pigSchema.getFields();
 
-        if(applyDateRanges){
-            //see creation of tuple if applyDateRanges == true the length of the tuple is columnToReadLen + 1
-            t.set(columnToReadLen, currentDate);
-        }
+		LOG.debug("Schema: " + Arrays.toString(fields));
 
+		ResourceFieldSchema[] newFields = Arrays.copyOf(fields,
+			fields.length + keys.size());
 
-        return t;
-    }
+		int index = fields.length;
 
-    /**
-     * Read all columns in the row
-     * @param struct
-     * @return Tuple
-     * @throws IOException
-     */
-    private Tuple readWholeRow(ColumnarStruct struct) throws IOException {
+		for (String key : keys) {
+		    newFields[index++] = new ResourceFieldSchema(
+			    new FieldSchema(key, DataType.CHARARRAY));
+		}
 
-        //create tuple
-        Tuple t = tupleFactory.newTuple();
-        //read row fields
-        List<Object> values = struct.getFieldsAsList(text);
-        //for each value in the row convert to the correct pig type
-        if(values != null && values.size() > 0){
-
-            for(Object value : values){
-                t.append(HiveRCSchemaUtil.extractPigTypeFromHiveType(value));
-            }
-
-        }
-
-        if(applyDateRanges){
-            t.append(currentDate);
-        }
+		pigSchema.setFields(newFields);
 
-        return t;
+		LOG.debug("Added partition fields: " + keys
+			+ " to loader schema");
+		LOG.debug("Schema is: " + Arrays.toString(newFields));
+	    }
 
-    }
+	    partitionKeysSet = true;
 
-    @Override
-    public String[] getPartitionKeys(String location, Job job)
-    throws IOException {
-        return null;
-    }
+	}
 
-    @Override
-    public ResourceSchema getSchema(String location, Job job)
-    throws IOException {
-        return pigSchema;
+	return pigSchema;
     }
 
     @Override
     public ResourceStatistics getStatistics(String location, Job job)
-    throws IOException {
-        return null;
+	    throws IOException {
+	return null;
     }
 
     @Override
     public void setPartitionFilter(Expression partitionFilter)
-    throws IOException {
+	    throws IOException {
+	getUDFContext().setProperty(
+		PathPartitionHelper.PARITITION_FILTER_EXPRESSION,
+		partitionFilter.toString());
     }
 
     @Override
     public List<OperatorSet> getFeatures() {
-        return Arrays.asList(LoadPushDown.OperatorSet.PROJECTION);
+	return Arrays.asList(LoadPushDown.OperatorSet.PROJECTION);
     }
 
     @Override
     public RequiredFieldResponse pushProjection(
-            RequiredFieldList requiredFieldList) throws FrontendException {
+	    RequiredFieldList requiredFieldList) throws FrontendException {
 
-        StringBuilder buff = new StringBuilder();
-        ResourceFieldSchema[] fields = pigSchema.getFields();
+	// save the required field list to the UDFContext properties.
+	StringBuilder buff = new StringBuilder();
 
-        String fieldName = null;
+	int i = 0;
+	for (RequiredField f : requiredFieldList.getFields()) {
+	    if (i++ != 0)
+		buff.append(',');
 
-        for(RequiredField f : requiredFieldList.getFields()){
-            fieldName = fields[f.getIndex()].getName();
-            if(!fieldName.equals(DAY_DATE_COLUMN))
-                buff.append(fieldName).append(",");
-        }
+	    buff.append(f.getIndex());
+	}
 
-        String projectionStr = buff.substring(0, buff.length()-1);
+	Properties properties = getUDFContext();
 
-        setup(table_schema, applyDateRanges, projectionStr);
+	properties.setProperty(PROJECTION_ID, buff.toString());
 
-        Properties properties = UDFContext.getUDFContext().getUDFProperties( 
-                this.getClass());
+	return new RequiredFieldResponse(true);
+    }
 
-        if(!projectionStr.isEmpty())
-            properties.setProperty( PROJECTION_ID, projectionStr );
+    @Override
+    public void setUDFContextSignature(String signature) {
+	super.setUDFContextSignature(signature);
 
-        return new RequiredFieldResponse(true);
+	LOG.debug("Signature: " + signature);
+	this.signature = signature;
+	
+	// this provides backwards compatibility
+	// the HiveRCInputFormat will read this and if set will perform the
+	// needed partitionFiltering
+	if (dateRange != null) {
+	    getUDFContext().setProperty(DATE_RANGE, dateRange);
+	}
+	
     }
 
-
 }

Modified: hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/hiverc/HiveRCDateSplitter.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/hiverc/HiveRCDateSplitter.java?rev=982445&r1=982444&r2=982445&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/hiverc/HiveRCDateSplitter.java (original)
+++ hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/hiverc/HiveRCDateSplitter.java Thu Aug  5 00:38:58 2010
@@ -1,159 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership. The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package org.apache.pig.piggybank.storage.hiverc;
-
-import java.io.IOException;
-import java.text.DateFormat;
-import java.text.ParseException;
-import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.List;
-
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.JobContext;
-
-/**
- * Expects the location to be a directory with the format dir/daydate=yyyy-MM-dd/{dirs or files}<br/>
- * Only dateDir(s) within the date range [date1, date2] will be returned in the method splitDirectory
- */
-public class HiveRCDateSplitter {
-
-    private static final DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd");
-
-    /**
-     * Start of date range
-     */
-    Date date1;
-    /**
-     * End of date range
-     */
-    Date date2;
-
-    /**
-     * 
-     * @param dateRange String must have format yyyy-MM-dd:yyyy-MM-dd, the left most date is the start of the range.
-     */
-    public HiveRCDateSplitter(String dateRange){
-        setupDateRange(dateRange);
-    }
-
-
-    /**
-     * 
-     * @param job
-     * @param location
-     * @return
-     * @throws IOException
-     */
-    public List<FileStatus> splitDirectory(JobContext job, Path dir) throws IOException{
-
-        FileSystem fs = dir.getFileSystem(job.getConfiguration());
-
-        List<FileStatus> paths = new ArrayList<FileStatus>();
-
-        if(fs.getFileStatus(dir).isDir()){
-            //expect the structure dir/[datefolder]/{dirs or files}
-
-            FileStatus[] dateDirs = fs.listStatus(dir);
-            Path dateDirPath = null;
-
-            for(FileStatus dateDirStatus : dateDirs){
-                dateDirPath = dateDirStatus.getPath();
-
-                //if the path is a directory and it is within the date range, add all of its sub-files
-                if(dateDirStatus.isDir() && isInDateRange(dateDirPath.getName()))
-                    addAllFiles(fs, dateDirPath, paths);
-
-
-            }
-
-        }
-
-        return paths;
-    }
-
-
-    /**
-     * Parse through the directory structure and for each file 
-     * @param fs
-     * @param dir
-     * @param paths
-     * @throws IOException
-     */
-    private final void addAllFiles(FileSystem fs, Path dir, List<FileStatus> paths) throws IOException{
-
-        FileStatus[] files = fs.listStatus(dir);
-
-        for(FileStatus fileStatus : files){
-
-            if(fileStatus.isDir()){
-                addAllFiles(fs, fileStatus.getPath(), paths);
-            }else{
-                paths.add(fileStatus);
-            }
-
-        }
-    }
-
-    /**
-     * Extracts the daydate parameter from fileName and compares its date value with date1, and date2.
-     * @param fileName
-     * @return boolean true if the date value is between date1 and date2 inclusively
-     */
-    private final boolean isInDateRange(String fileName) {
-        //if date ranges are to be applied, apply them and if the file daydate field 
-        //is not in the date range set the shouldRead to false and return
-        String currentDate;
-        Date date;
-
-        try {
-            currentDate = HiveRCSchemaUtil.extractDayDate(fileName);
-            date = dateFormat.parse(currentDate);
-        } catch (ParseException e) {
-            throw new RuntimeException(e);
-        }
-
-        int c1 = date.compareTo(date1);
-        int c2 = date.compareTo(date2);
-
-
-        return (c1 >= 0 && c2 <= 0);
-    }
-
-    /**
-     * Parses the dateRange:String and creates the date1:Date and date2:Date instances forming the start and end of the date range.
-     * @param dateRange
-     */
-    private void setupDateRange(String dateRange){
-        //if a dateRange is specified apply date range filtering
-        if(dateRange != null && dateRange.trim().length() > 0){
-            String[] dates = dateRange.split(":");
-
-            try {
-                date1 = dateFormat.parse(dates[0]);
-                date2 = dateFormat.parse(dates[1]);
-            } catch (ParseException e) {
-                throw new RuntimeException("The dateRange must have format yyyy-MM-dd:yyyy-MM-dd", e);
-            }
-
-        }
-    }
-
-}

Modified: hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/hiverc/HiveRCInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/hiverc/HiveRCInputFormat.java?rev=982445&r1=982444&r2=982445&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/hiverc/HiveRCInputFormat.java (original)
+++ hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/hiverc/HiveRCInputFormat.java Thu Aug  5 00:38:58 2010
@@ -17,11 +17,15 @@
 package org.apache.pig.piggybank.storage.hiverc;
 
 import java.io.IOException;
-import java.util.ArrayList;
+import java.text.DateFormat;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Calendar;
+import java.util.Date;
 import java.util.List;
+import java.util.Properties;
 
 import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.ql.io.RCFile;
 import org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable;
 import org.apache.hadoop.io.LongWritable;
@@ -30,43 +34,152 @@ import org.apache.hadoop.mapreduce.JobCo
 import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-
+import org.apache.pig.impl.util.UDFContext;
+import org.apache.pig.piggybank.storage.HiveColumnarLoader;
+import org.apache.pig.piggybank.storage.partition.PathPartitionHelper;
 
 /**
  * HiveRCInputFormat used by HiveColumnarLoader as the InputFormat;
  * <p/>
  * Reasons for implementing a new InputFormat sub class:<br/>
  * <ul>
- *  <li>The current RCFileInputFormat uses the old InputFormat mapred interface, and the pig load store design used the new InputFormat mapreduce classes.</li>
- *  <li>The splits are calculated by the InputFormat, HiveColumnarLoader supports date partitions, the filtering is done here.</li>
+ * <li>The current RCFileInputFormat uses the old InputFormat mapred interface,
+ * and the pig load store design used the new InputFormat mapreduce classes.</li>
+ * <li>The splits are calculated by the InputFormat, HiveColumnarLoader supports
+ * date partitions, the filtering is done here.</li>
  * </ul>
  */
-public class HiveRCInputFormat extends FileInputFormat<LongWritable, BytesRefArrayWritable>{
+public class HiveRCInputFormat extends
+	FileInputFormat<LongWritable, BytesRefArrayWritable> {
 
-    /**
-     * Implements the date splitting logic, this keeps the HiveRCInputFormat clean of the date calculations,
-     * and makes extending it to support other types of partitioning in the future easier. 
-     */
-    private HiveRCDateSplitter dateSplitter;
-    /**
-     * Only true if the HiveRCInputFormat(dateRange:String) constructor is used
-     */
-    private boolean applyDateRanges = false;
+    transient PathPartitionHelper partitionHelper = new PathPartitionHelper();
+
+    String signature = "";
+
+    public HiveRCInputFormat() {
+	this(null);
+    }
+
+    public HiveRCInputFormat(String signature) {
+	this.signature = signature;
+
+	Properties properties = UDFContext.getUDFContext().getUDFProperties(
+		HiveColumnarLoader.class, new String[] { signature });
+
+	// This expression is passed in the
+	// HiveColumnarLoader.setPartitionExpression method by the Pig Loader
+	// Classes.
+	String partitionExpression = properties
+		.getProperty(PathPartitionHelper.PARITITION_FILTER_EXPRESSION);
+
+	// backwards compatibility
+	String dateRange = properties
+		.getProperty(HiveColumnarLoader.DATE_RANGE);
+	if (partitionExpression == null && dateRange != null) {
+	    partitionExpression = buildFilterExpressionFromDatePartition(dateRange);
+	    properties.setProperty(
+		    PathPartitionHelper.PARITITION_FILTER_EXPRESSION,
+		    partitionExpression);
+	}
+
+    }
+
+    @Override
+    protected List<FileStatus> listStatus(JobContext jobContext)
+	    throws IOException {
+
+	List<FileStatus> files = partitionHelper.listStatus(jobContext,
+		HiveColumnarLoader.class, signature);
+
+	if (files == null)
+	    files = super.listStatus(jobContext);
+
+	return files;
 
-    /**
-     * No date partitioning is applied
-     */
-    public HiveRCInputFormat(){
     }
 
     /**
-     * Date partitioning will be applied to the input path.<br/>
-     * The path must be partitioned as input-path/daydate=yyyy-MM-dd.
-     * @param dateRange Must have format yyyy-MM-dd:yyyy-MM-dd with the left most being the start of the range.
-     */
-    public HiveRCInputFormat(String dateRange){
-        applyDateRanges = true;
-        dateSplitter = new HiveRCDateSplitter(dateRange);
+     * If the date range was supplied in the loader constructor we need to build
+     * our own filter expression.<br/>
+     * 
+     * @param dateRange
+     * @return String
+     */
+    private String buildFilterExpressionFromDatePartition(String dateRange) {
+	Properties properties = UDFContext.getUDFContext().getUDFProperties(
+		HiveColumnarLoader.class, new String[] { signature });
+
+	String partitionColumnStr = properties
+		.getProperty(PathPartitionHelper.PARTITION_COLUMNS);
+
+	boolean isYearMonthDayFormat = false;
+
+	// only 3 partition types are supported (its impossible with date
+	// partitions to support all possible combinations here).
+	// 1) yyyy-MM-dd which is as /daydate=[date]/files
+	// 2) yyyy-MM-dd which is as /date=[date]/files
+	// 3) yyyy-MM-dd which is as /year=[year]/month=[month]/day=[day]
+	String key = null;
+	if (partitionColumnStr.contains("daydate")) {
+	    key = "daydate"; // use daydate as key
+	} else if (partitionColumnStr.contains("date")) {
+	    key = "date"; // user date as key
+	} else if (partitionColumnStr.contains("year")
+		&& partitionColumnStr.contains("month")
+		&& partitionColumnStr.contains("day")) {
+	    isYearMonthDayFormat = true;
+	} else {
+	    throw new RuntimeException(
+		    "Not date partitions where found for partitions: "
+			    + partitionColumnStr);
+	}
+
+	String[] split = dateRange.split(":");
+
+	if (split.length != 2) {
+	    throw new RuntimeException(
+		    "The date range must have format yyyy-MM-dd:yyyy-MM-dd");
+	}
+
+	String partitionExpression = null;
+	if (isYearMonthDayFormat) {
+	    // extract the YearMonthDay from the to dates;
+	    DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd");
+	    Date date1 = parseDate(dateFormat, split[0]);
+
+	    Calendar cal = Calendar.getInstance();
+	    cal.setTime(date1);
+
+	    partitionExpression = "(year >= '" + cal.get(Calendar.YEAR)
+		    + "' and month >= '"
+		    + formatNumber((cal.get(Calendar.MONTH) + 1))
+		    + "' and day >= '"
+		    + formatNumber(cal.get(Calendar.DAY_OF_MONTH)) + "')";
+
+	    Date date2 = parseDate(dateFormat, split[1]);
+	    cal.setTime(date2);
+
+	    partitionExpression += " and (year <= '" + cal.get(Calendar.YEAR)
+		    + "' and month <= '"
+		    + formatNumber((cal.get(Calendar.MONTH) + 1))
+		    + "' and day <= '"
+		    + formatNumber(cal.get(Calendar.DAY_OF_MONTH)) + "')";
+
+	} else {
+	    partitionExpression = key + " >= '" + split[0] + "' and " + key
+		    + " <= '" + split[1] + "'";
+	}
+
+	return partitionExpression;
+    }
+
+    private static final String formatNumber(int numb) {
+
+	if (numb < 10) {
+	    return "0" + numb;
+	} else {
+	    return "" + numb;
+	}
     }
 
     /**
@@ -74,51 +187,40 @@ public class HiveRCInputFormat extends F
      */
     @Override
     public RecordReader<LongWritable, BytesRefArrayWritable> createRecordReader(
-            InputSplit split, TaskAttemptContext ctx) throws IOException,
-            InterruptedException {
+	    InputSplit split, TaskAttemptContext ctx) throws IOException,
+	    InterruptedException {
 
-        HiveRCRecordReader reader = new HiveRCRecordReader();
-        reader.initialize(split, ctx);
+	HiveRCRecordReader reader = new HiveRCRecordReader();
 
-        return reader;
+	return reader;
     }
 
     /**
-     * This method is called by the FileInputFormat to find the input paths for which splits should be calculated.<br/>
-     * If applyDateRanges == true: Then the HiveRCDateSplitter is used to apply filtering on the input files.<br/>
-     * Else the default FileInputFormat listStatus method is used. 
-     */
-    @Override
-    protected List<FileStatus> listStatus(JobContext ctx)throws IOException {
-        //for each path in the FileInputFormat input paths, create a split.
-        //If applyDateRanges:
-        //the date logic is handled in the HiveRCLoader where the FileInputFormat inputPaths is set
-        //to include only the files within the given date range, when date range applies
-        //Else
-        // add all files
-        Path[] inputPaths = FileInputFormat.getInputPaths(ctx);
-
-        List<FileStatus> splitPaths = new ArrayList<FileStatus>();
-
-        if(applyDateRanges){
-            //use the dateSplitter to calculate only those paths that are in the correct date partition 
-            for(Path inputPath : inputPaths){
-                splitPaths.addAll(dateSplitter.splitDirectory(ctx, inputPath));
-            }
-        }else{
-            //use the default implementation
-            splitPaths = super.listStatus(ctx);
-        }
-
-        return splitPaths;
+     * Parse a date string with format yyyy-MM-dd.
+     * 
+     * @param dateFormat
+     *            DateFormat
+     * @param dateString
+     *            String
+     * @return Date
+     */
+    private static final Date parseDate(DateFormat dateFormat, String dateString) {
+	try {
+	    return dateFormat.parse(dateString);
+	} catch (ParseException e) {
+	    RuntimeException rt = new RuntimeException(e);
+	    rt.setStackTrace(e.getStackTrace());
+	    throw rt;
+	}
     }
 
     /**
-     * The input split size should never be smaller than the RCFile.SYNC_INTERVAL
+     * The input split size should never be smaller than the
+     * RCFile.SYNC_INTERVAL
      */
     @Override
-    protected long getFormatMinSplitSize(){	      
-        return RCFile.SYNC_INTERVAL;
+    protected long getFormatMinSplitSize() {
+	return RCFile.SYNC_INTERVAL;
     }
 
 }

Modified: hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/hiverc/HiveRCRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/hiverc/HiveRCRecordReader.java?rev=982445&r1=982444&r2=982445&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/hiverc/HiveRCRecordReader.java (original)
+++ hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/hiverc/HiveRCRecordReader.java Thu Aug  5 00:38:58 2010
@@ -31,7 +31,8 @@ import org.apache.hadoop.mapreduce.lib.i
 /**
  * This class delegates the work to the RCFileRecordReader<br/>
  */
-public class HiveRCRecordReader extends RecordReader<LongWritable, BytesRefArrayWritable> {
+public class HiveRCRecordReader extends
+	RecordReader<LongWritable, BytesRefArrayWritable> {
 
     LongWritable key;
     BytesRefArrayWritable value;
@@ -42,51 +43,52 @@ public class HiveRCRecordReader extends 
 
     @Override
     public void close() throws IOException {
-        rcFileRecordReader.close();
+	rcFileRecordReader.close();
     }
 
     @Override
-    public LongWritable getCurrentKey() throws IOException, InterruptedException {
-        return key;
+    public LongWritable getCurrentKey() throws IOException,
+	    InterruptedException {
+	return key;
     }
 
     @Override
-    public BytesRefArrayWritable getCurrentValue() throws IOException, InterruptedException {
-        return value;
+    public BytesRefArrayWritable getCurrentValue() throws IOException,
+	    InterruptedException {
+	return value;
     }
 
     @Override
     public float getProgress() throws IOException, InterruptedException {
-        return rcFileRecordReader.getProgress();
+	return rcFileRecordReader.getProgress();
     }
 
-    public Path getSplitPath(){
-        return splitPath;
+    public Path getSplitPath() {
+	return splitPath;
     }
 
     @SuppressWarnings("deprecation")
     @Override
     public void initialize(InputSplit split, TaskAttemptContext ctx)
-    throws IOException, InterruptedException {
+	    throws IOException, InterruptedException {
 
-        FileSplit fileSplit = (FileSplit)split;
-        Configuration conf = ctx.getConfiguration();
-        splitPath = fileSplit.getPath();
+	FileSplit fileSplit = (FileSplit) split;
+	Configuration conf = ctx.getConfiguration();
+	splitPath = fileSplit.getPath();
+
+	rcFileRecordReader = new RCFileRecordReader<LongWritable, BytesRefArrayWritable>(
+		conf, new org.apache.hadoop.mapred.FileSplit(splitPath,
+			fileSplit.getStart(), fileSplit.getLength(),
+			new org.apache.hadoop.mapred.JobConf(conf)));
 
-        rcFileRecordReader = new RCFileRecordReader<LongWritable, BytesRefArrayWritable>(conf,
-                new org.apache.hadoop.mapred.FileSplit(splitPath, fileSplit.getStart(), fileSplit.getLength(), 
-                        new org.apache.hadoop.mapred.JobConf(conf)) );
-
-
-        key = rcFileRecordReader.createKey();
-        value = rcFileRecordReader.createValue();
+	key = rcFileRecordReader.createKey();
+	value = rcFileRecordReader.createValue();
 
     }
 
     @Override
     public boolean nextKeyValue() throws IOException, InterruptedException {
-        return rcFileRecordReader.next(key, value);
+	return rcFileRecordReader.next(key, value);
     }
 
-
 }

Modified: hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/hiverc/HiveRCSchemaUtil.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/hiverc/HiveRCSchemaUtil.java?rev=982445&r1=982444&r2=982445&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/hiverc/HiveRCSchemaUtil.java (original)
+++ hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/hiverc/HiveRCSchemaUtil.java Thu Aug  5 00:38:58 2010
@@ -19,9 +19,9 @@ package org.apache.pig.piggybank.storage
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Set;
 import java.util.TreeSet;
-import java.util.Map.Entry;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
@@ -57,7 +57,7 @@ public class HiveRCSchemaUtil {
      * Regex to filter out column types
      */
     protected static final Pattern ptypes = Pattern
-    .compile("([ ][a-zA-Z0-9]*)|([a-zA-Z_0-9]*[<][a-zA-Z,_0-9]*[>])");
+	    .compile("([ ][a-zA-Z0-9]*)|([a-zA-Z_0-9]*[<][a-zA-Z,_0-9]*[>])");
 
     /**
      * General schema parsing method, is used to parse the column names.
@@ -69,15 +69,15 @@ public class HiveRCSchemaUtil {
      * @return List of String
      */
     public static List<String> parseSchema(Pattern pattern, String schema) {
-        List<String> types = new ArrayList<String>();
-        Matcher m = pattern.matcher(schema);
-        String item = null;
-        while (m.find()) {
-            item = m.group().trim();
-            if (item.length() > 0)
-                types.add(item);
-        }
-        return types;
+	List<String> types = new ArrayList<String>();
+	Matcher m = pattern.matcher(schema);
+	String item = null;
+	while (m.find()) {
+	    item = m.group().trim();
+	    if (item.length() > 0)
+		types.add(item);
+	}
+	return types;
     }
 
     /**
@@ -87,35 +87,35 @@ public class HiveRCSchemaUtil {
      * @return List of String
      */
     public static List<String> parseSchemaTypes(String schema) {
-        List<String> types = new ArrayList<String>();
-        Matcher m = ptypes.matcher(schema);
-        String item = null;
-
-        while (m.find()) {
-            item = m.group().trim();
-            if (item.length() > 0) {
-                if (item.equalsIgnoreCase("map")) {
-                    // if generic type
-                    if (m.find()) {
-                        types.add(item + m.group().trim());
-                    } else {
-                        throw new RuntimeException(
-                        "Map must have generic types specified");
-                    }
-                } else if (item.equalsIgnoreCase("array")) {
-                    // if generic type
-                    if (m.find()) {
-                        types.add(item + m.group().trim());
-                    } else {
-                        throw new RuntimeException(
-                        "Array must have generic types specified");
-                    }
-                } else {
-                    types.add(item);
-                }
-            }
-        }
-        return types;
+	List<String> types = new ArrayList<String>();
+	Matcher m = ptypes.matcher(schema);
+	String item = null;
+
+	while (m.find()) {
+	    item = m.group().trim();
+	    if (item.length() > 0) {
+		if (item.equalsIgnoreCase("map")) {
+		    // if generic type
+		    if (m.find()) {
+			types.add(item + m.group().trim());
+		    } else {
+			throw new RuntimeException(
+				"Map must have generic types specified");
+		    }
+		} else if (item.equalsIgnoreCase("array")) {
+		    // if generic type
+		    if (m.find()) {
+			types.add(item + m.group().trim());
+		    } else {
+			throw new RuntimeException(
+				"Array must have generic types specified");
+		    }
+		} else {
+		    types.add(item);
+		}
+	    }
+	}
+	return types;
     }
 
     /**
@@ -125,14 +125,14 @@ public class HiveRCSchemaUtil {
      * @return String
      */
     public static final String listToString(List<String> list) {
-        StringBuilder buff = new StringBuilder();
+	StringBuilder buff = new StringBuilder();
 
-        for (String item : list) {
-            buff.append(item.trim()).append(",");
-        }
-        int len = buff.length() - 1;
-        buff.delete(len, len);
-        return buff.toString();
+	for (String item : list) {
+	    buff.append(item.trim()).append(",");
+	}
+	int len = buff.length() - 1;
+	buff.delete(len, len);
+	return buff.toString();
     }
 
     /**
@@ -144,15 +144,15 @@ public class HiveRCSchemaUtil {
      * @return String
      */
     public static final String extractDayDate(String fileName) {
-        int index = fileName.indexOf("daydate=");
-        String dateStr = null;
-        if (index == 0)
-            dateStr = fileName.substring(8, fileName.length());
-        else if (index > 0)
-            dateStr = fileName.substring(index + 8, fileName
-                    .indexOf('/', index));
+	int index = fileName.indexOf("daydate=");
+	String dateStr = null;
+	if (index == 0)
+	    dateStr = fileName.substring(8, fileName.length());
+	else if (index > 0)
+	    dateStr = fileName.substring(index + 8,
+		    fileName.indexOf('/', index));
 
-        return dateStr;
+	return dateStr;
     }
 
     /**
@@ -163,83 +163,87 @@ public class HiveRCSchemaUtil {
      */
     public static final Set<String> compileSet(String columnsToRead) {
 
-        String[] columnsArr = columnsToRead.split(",");
-        int len = columnsArr.length;
+	String[] columnsArr = columnsToRead.split(",");
+	int len = columnsArr.length;
 
-        Set<String> columnsSet = new TreeSet<String>();
+	Set<String> columnsSet = new TreeSet<String>();
 
-        for (int i = 0; i < len; i++) {
-            columnsSet.add(columnsArr[i].trim());
-        }
+	for (int i = 0; i < len; i++) {
+	    columnsSet.add(columnsArr[i].trim());
+	}
 
-        return columnsSet;
+	return columnsSet;
     }
+
     /**
      * Returns the pig DataType for the hive type
+     * 
      * @param hiveType
      * @return byte from DataType
      */
     public static byte findPigDataType(String hiveType) {
-        hiveType = hiveType.toLowerCase();
+	hiveType = hiveType.toLowerCase();
 
-        if (hiveType.equals("string"))
-            return DataType.CHARARRAY;
-        else if (hiveType.equals("int"))
-            return DataType.INTEGER;
-        else if (hiveType.equals("bigint") || hiveType.equals("long"))
-            return DataType.LONG;
-        else if (hiveType.equals("float"))
-            return DataType.FLOAT;
-        else if (hiveType.equals("double"))
-            return DataType.DOUBLE;
-        else if (hiveType.equals("boolean"))
-            return DataType.INTEGER;
-        else if (hiveType.equals("byte"))
-            return DataType.INTEGER;
-        else if (hiveType.contains("array"))
-            return DataType.TUPLE;
-        else if (hiveType.contains("map"))
-            return DataType.MAP;
-        else
-            return DataType.ERROR;
+	if (hiveType.equals("string"))
+	    return DataType.CHARARRAY;
+	else if (hiveType.equals("int"))
+	    return DataType.INTEGER;
+	else if (hiveType.equals("bigint") || hiveType.equals("long"))
+	    return DataType.LONG;
+	else if (hiveType.equals("float"))
+	    return DataType.FLOAT;
+	else if (hiveType.equals("double"))
+	    return DataType.DOUBLE;
+	else if (hiveType.equals("boolean"))
+	    return DataType.INTEGER;
+	else if (hiveType.equals("byte"))
+	    return DataType.INTEGER;
+	else if (hiveType.contains("array"))
+	    return DataType.TUPLE;
+	else if (hiveType.contains("map"))
+	    return DataType.MAP;
+	else
+	    return DataType.ERROR;
     }
 
     /**
      * Converts from a hive type to a pig type
-     * @param value Object hive type
+     * 
+     * @param value
+     *            Object hive type
      * @return Object pig type
      */
     public static Object extractPigTypeFromHiveType(Object value) {
 
-        if (value instanceof org.apache.hadoop.hive.serde2.lazy.LazyArray) {
-            value = parseLazyArrayToPigArray((org.apache.hadoop.hive.serde2.lazy.LazyArray) value);
-        } else if (value instanceof org.apache.hadoop.hive.serde2.lazy.LazyMap) {
-            value = parseLazyMapToPigMap((org.apache.hadoop.hive.serde2.lazy.LazyMap) value);
-        } else {
-
-            if (value instanceof LazyString) {
-                value = ((LazyString) value).getWritableObject().toString();
-            } else if (value instanceof LazyInteger) {
-                value = ((LazyInteger) value).getWritableObject().get();
-            } else if (value instanceof LazyLong) {
-                value = ((LazyLong) value).getWritableObject().get();
-            } else if (value instanceof LazyFloat) {
-                value = ((LazyFloat) value).getWritableObject().get();
-            } else if (value instanceof LazyDouble) {
-                value = ((LazyDouble) value).getWritableObject().get();
-            } else if (value instanceof LazyBoolean) {
-                boolean boolvalue = ((LazyBoolean) value).getWritableObject()
-                .get();
-                value = (boolvalue) ? 1 : 0;
-            } else if (value instanceof LazyByte) {
-                value = (int) ((LazyByte) value).getWritableObject().get();
-            } else if (value instanceof LazyShort) {
-                value = ((LazyShort) value).getWritableObject().get();
-            }
+	if (value instanceof org.apache.hadoop.hive.serde2.lazy.LazyArray) {
+	    value = parseLazyArrayToPigArray((org.apache.hadoop.hive.serde2.lazy.LazyArray) value);
+	} else if (value instanceof org.apache.hadoop.hive.serde2.lazy.LazyMap) {
+	    value = parseLazyMapToPigMap((org.apache.hadoop.hive.serde2.lazy.LazyMap) value);
+	} else {
+
+	    if (value instanceof LazyString) {
+		value = ((LazyString) value).getWritableObject().toString();
+	    } else if (value instanceof LazyInteger) {
+		value = ((LazyInteger) value).getWritableObject().get();
+	    } else if (value instanceof LazyLong) {
+		value = ((LazyLong) value).getWritableObject().get();
+	    } else if (value instanceof LazyFloat) {
+		value = ((LazyFloat) value).getWritableObject().get();
+	    } else if (value instanceof LazyDouble) {
+		value = ((LazyDouble) value).getWritableObject().get();
+	    } else if (value instanceof LazyBoolean) {
+		boolean boolvalue = ((LazyBoolean) value).getWritableObject()
+			.get();
+		value = (boolvalue) ? 1 : 0;
+	    } else if (value instanceof LazyByte) {
+		value = (int) ((LazyByte) value).getWritableObject().get();
+	    } else if (value instanceof LazyShort) {
+		value = ((LazyShort) value).getWritableObject().get();
+	    }
 
-        }
+	}
 
-        return value;
+	return value;
     }
 
     /**
@@ -250,21 +254,21 @@ public class HiveRCSchemaUtil {
      * @return InternalMap
      */
     public static InternalMap parseLazyMapToPigMap(LazyMap map) {
-        InternalMap pigmap = new InternalMap();
+	InternalMap pigmap = new InternalMap();
 
-        Map<Object, Object> javamap = map.getMap();
+	Map<Object, Object> javamap = map.getMap();
 
-        if (javamap != null) {
+	if (javamap != null) {
 
-            // for each item in the map extract the java primitive type
-            for (Entry<Object, Object> entry : javamap.entrySet()) {
-                pigmap.put(extractPigTypeFromHiveType(entry.getKey()),
-                        extractPigTypeFromHiveType(entry.getValue()));
-            }
+	    // for each item in the map extract the java primitive type
+	    for (Entry<Object, Object> entry : javamap.entrySet()) {
+		pigmap.put(extractPigTypeFromHiveType(entry.getKey()),
+			extractPigTypeFromHiveType(entry.getValue()));
+	    }
 
-        }
+	}
 
-        return pigmap;
+	return pigmap;
     }
 
     /**
@@ -275,17 +279,17 @@ public class HiveRCSchemaUtil {
      * @return Tuple
      */
     public static Tuple parseLazyArrayToPigArray(LazyArray arr) {
-        List<Object> list = new ArrayList<Object>();
+	List<Object> list = new ArrayList<Object>();
 
-        // each item inside the LazyArray must be converted to its java
-        // primitive type
-        List<Object> hivedataList = arr.getList();
+	// each item inside the LazyArray must be converted to its java
+	// primitive type
+	List<Object> hivedataList = arr.getList();
 
-        for (Object item : hivedataList) {
-            list.add(extractPigTypeFromHiveType(item));
-        }
+	for (Object item : hivedataList) {
+	    list.add(extractPigTypeFromHiveType(item));
+	}
 
-        return tupleFactory.newTuple(list);
+	return tupleFactory.newTuple(list);
     }
 
 }