You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ol...@apache.org on 2010/08/05 02:38:59 UTC
svn commit: r982445 [1/2] - in /hadoop/pig/trunk/contrib: ./
piggybank/java/src/main/java/org/apache/pig/piggybank/storage/
piggybank/java/src/main/java/org/apache/pig/piggybank/storage/hiverc/
piggybank/java/src/test/java/org/apache/pig/piggybank/test...
Author: olga
Date: Thu Aug 5 00:38:58 2010
New Revision: 982445
URL: http://svn.apache.org/viewvc?rev=982445&view=rev
Log:
PIG-1526 improvements to HiveColumnarLoader - Partitioning Support (gerritjvv via olgan)
Modified:
hadoop/pig/trunk/contrib/CHANGES.txt
hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/HiveColumnarLoader.java
hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/hiverc/HiveRCDateSplitter.java
hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/hiverc/HiveRCInputFormat.java
hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/hiverc/HiveRCRecordReader.java
hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/hiverc/HiveRCSchemaUtil.java
hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestHiveColumnarLoader.java
Modified: hadoop/pig/trunk/contrib/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/CHANGES.txt?rev=982445&r1=982444&r2=982445&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/CHANGES.txt (original)
+++ hadoop/pig/trunk/contrib/CHANGES.txt Thu Aug 5 00:38:58 2010
@@ -24,6 +24,8 @@ INCOMPATIBLE CHANGES
IMPROVEMENTS
+PIG-1526 improvements to HiveColumnarLoader - Partitioning Support (gerritjvv via olgan)
+
PIG-1229 allow pig to write output into a JDBC db (ankur via hashutosh)
PIG-1385 UDF to create tuples and bags (hcbusy via gates)
Modified: hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/HiveColumnarLoader.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/HiveColumnarLoader.java?rev=982445&r1=982444&r2=982445&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/HiveColumnarLoader.java (original)
+++ hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/HiveColumnarLoader.java Thu Aug 5 00:38:58 2010
@@ -19,7 +19,9 @@ package org.apache.pig.piggybank.storage
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.LinkedHashSet;
import java.util.List;
+import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.regex.Pattern;
@@ -27,6 +29,7 @@ import java.util.regex.Pattern;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.serde.Constants;
@@ -45,8 +48,8 @@ import org.apache.pig.FileInputLoadFunc;
import org.apache.pig.LoadMetadata;
import org.apache.pig.LoadPushDown;
import org.apache.pig.ResourceSchema;
-import org.apache.pig.ResourceStatistics;
import org.apache.pig.ResourceSchema.ResourceFieldSchema;
+import org.apache.pig.ResourceStatistics;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
@@ -58,71 +61,153 @@ import org.apache.pig.impl.util.UDFConte
import org.apache.pig.piggybank.storage.hiverc.HiveRCInputFormat;
import org.apache.pig.piggybank.storage.hiverc.HiveRCRecordReader;
import org.apache.pig.piggybank.storage.hiverc.HiveRCSchemaUtil;
+import org.apache.pig.piggybank.storage.partition.PathPartitionHelper;
/**
* Loader for Hive RC Columnar files.<br/>
* Supports the following types:<br/>
- * * <table>
- * <tr><th>Hive Type</th><th>Pig Type from DataType</th></tr>
- * <tr><td>string</td><td>CHARARRAY</td></tr>
- * <tr><td>int</td><td>INTEGER</td></tr>
- * <tr><td>bigint or long</td><td>LONG</td></tr>
- * <tr><td>float</td><td>float</td></tr>
- * <tr><td>double</td><td>DOUBLE</td></tr>
- * <tr><td>boolean</td><td>BOOLEAN</td></tr>
- * <tr><td>byte</td><td>BYTE</td></tr>
- * <tr><td>array</td><td>TUPLE</td></tr>
- * <tr><td>map</td><td>MAP</td></tr>
+ * *
+ * <table>
+ * <tr>
+ * <th>Hive Type</th>
+ * <th>Pig Type from DataType</th>
+ * </tr>
+ * <tr>
+ * <td>string</td>
+ * <td>CHARARRAY</td>
+ * </tr>
+ * <tr>
+ * <td>int</td>
+ * <td>INTEGER</td>
+ * </tr>
+ * <tr>
+ * <td>bigint or long</td>
+ * <td>LONG</td>
+ * </tr>
+ * <tr>
+ * <td>float</td>
+ * <td>float</td>
+ * </tr>
+ * <tr>
+ * <td>double</td>
+ * <td>DOUBLE</td>
+ * </tr>
+ * <tr>
+ * <td>boolean</td>
+ * <td>BOOLEAN</td>
+ * </tr>
+ * <tr>
+ * <td>byte</td>
+ * <td>BYTE</td>
+ * </tr>
+ * <tr>
+ * <td>array</td>
+ * <td>TUPLE</td>
+ * </tr>
+ * <tr>
+ * <td>map</td>
+ * <td>MAP</td>
+ * </tr>
* </table>
*
- *<br/>
- *
- *Usage 1:<br/>
- *To load a hive table: uid bigint, ts long, arr ARRAY<string,string>, m MAP<String, String>
- *<code>
+ * <p/>
+ * <b>Partitions</b><br/>
+ * The input paths are scanned by the loader for [partition name]=[value]
+ * patterns in the subdirectories.<br/>
+ * If detected these partitions are appended to the table schema.<br/>
+ * For example if you have the directory structure:<br/>
+ *
+ * <pre>
+ * /user/hive/warehouse/mytable
+ * /year=2010/month=02/day=01
+ * </pre>
+ *
+ * The mytable schema is (id int,name string).<br/>
+ * The final schema returned in pig will be (id:int, name:chararray,
+ * year:chararray, month:chararray, day:chararray).<br/>
+ * <p/>
+ * Usage 1:
+ * <p/>
+ * To load a hive table: uid bigint, ts long, arr ARRAY<string,string>, m
+ * MAP<String, String> <br/>
+ * <code>
+ * <pre>
* a = LOAD 'file' USING HiveColumnarLoader("uid bigint, ts long, arr array<string,string>, m map<string,string>");
- *-- to reference the fields
- * b = FOREACH GENERATE a.uid, a.ts, a.arr, a.m;
- *</code>
- *<p/>
- *Usage 2:<br/>
- *To load a hive table: uid bigint, ts long, arr ARRAY<string,string>, m MAP<String, String> only processing dates 2009-10-01 to 2009-10-02 in a <br/>
- *date partitioned hive table.<br/>
- *<code>
+ * -- to reference the fields
+ * b = FOREACH GENERATE a.uid, a.ts, a.arr, a.m;
+ * </pre>
+ * </code>
+ * <p/>
+ * Usage 2:
+ * <p/>
+ * To load a hive table: uid bigint, ts long, arr ARRAY<string,string>, m
+ * MAP<String, String> only processing dates 2009-10-01 to 2009-10-02 in a <br/>
+ * date partitioned hive table.<br/>
+ * <b>Old Usage</b><br/>
+ * <b>Note:</b> The partitions can be filtered by using pig's FILTER operator.<br/>
+ * <code>
+ * <pre>
* a = LOAD 'file' USING HiveColumnarLoader("uid bigint, ts long, arr array<string,string>, m map<string,string>", "2009-10-01:2009-10-02");
- *-- to reference the fields
- * b = FOREACH GENERATE a.uid, a.ts, a.arr, a.m;
- *</code>
- *<p/>
- *Usage 3:<br/>
- *To load a hive table: uid bigint, ts long, arr ARRAY<string,string>, m MAP<String, String> only reading column uid and ts.<br/
- *<code>
- * a = LOAD 'file' USING HiveColumnarLoader("uid bigint, ts long, arr array<string,string>, m map<string,string>", "", "uid,ts");
- *-- to reference the fields
- * b = FOREACH a GENERATE uid, ts, arr, m;
- *</code>
- *<p/>
- *Usage 4:<br/>
- *To load a hive table: uid bigint, ts long, arr ARRAY<string,string>, m MAP<String, String> only reading column uid and ts for dates 2009-10-01 to 2009-10-02.<br/
- *<code>
- * a = LOAD 'file' USING HiveColumnarLoader("uid bigint, ts long, arr array<string,string>, m map<string,string>", "2009-10-01:2009-10-02", "uid,ts");
- *-- to reference the fields
- * b = FOREACH a GENERATE uid, ts, arr, m;
- *</code>
- *<p/>
- *<b>Issues</b><p/>
- *<u>Table schema definition</u><br/>
- *The schema definition must be column name followed by a space then a comma then no space and the next column name and so on.<br/>
- *This so column1 string, column2 string will not word, it must be column1 string,column2 string
- *<p/>
- *<u>Date partitioning</u><br/>
- *Hive date partition folders must have format daydate=[date].
+ * -- to reference the fields
+ * b = FOREACH GENERATE a.uid, a.ts, a.arr, a.m;
+ * </pre>
+ * </code> <br/>
+ * <b>New Usage</b/><br/>
+ * <code>
+ * <pre>
+ * a = LOAD 'file' USING HiveColumnarLoader("uid bigint, ts long, arr array<string,string>, m map<string,string>");
+ * f = FILTER a BY daydate>='2009-10-01' AND daydate >='2009-10-02';
+ * </pre>
+ * </code>
+ * <p/>
+ * Usage 3:
+ * <p/>
+ * To load a hive table: uid bigint, ts long, arr ARRAY<string,string>, m
+ * MAP<String, String> only reading column uid and ts for dates 2009-10-01 to
+ * 2009-10-02.<br/ <br/>
+ * <b>Old Usage</b><br/>
+ * <b>Note:<b/> This behaviour is now supported in pig by LoadPushDown adding
+ * the columns needed to be loaded like below is ignored and pig will
+ * automatically send the columns used by the script to the loader.<br/>
+ * <code>
+ * <pre>
+ * a = LOAD 'file' USING HiveColumnarLoader("uid bigint, ts long, arr array<string,string>, m map<string,string>");
+ * f = FILTER a BY daydate>='2009-10-01' AND daydate >='2009-10-02';
+ * -- to reference the fields
+ * b = FOREACH a GENERATE uid, ts, arr, m;
+ * </pre>
+ * </code>
+ * <p/>
+ * <b>Issues</b>
+ * <p/>
+ * <u>Table schema definition</u><br/>
+ * The schema definition must be column name followed by a space then a comma
+ * then no space and the next column name and so on.<br/>
+ * This so column1 string, column2 string will not work, it must be column1
+ * string,column2 string
+ * <p/>
+ * <u>Partitioning</u><br/>
+ * Partitions must be in the format [partition name]=[partition value]<br/>
+ * Only strings are supported in the partitioning.<br/>
+ * Partitions must follow the same naming convention for all sub directories in
+ * a table<br/>
+ * For example:<br/>
+ * The following is not valid:<br/>
+ *
+ * <pre>
+ * mytable/hour=00
+ * mytable/day=01/hour=00
+ * </pre>
+ *
**/
-public class HiveColumnarLoader extends FileInputLoadFunc implements LoadMetadata, LoadPushDown{
+public class HiveColumnarLoader extends FileInputLoadFunc implements
+ LoadMetadata, LoadPushDown {
- private static final String PROJECTION_ID = HiveColumnarLoader.class.getName() + ".projection";
+ public static final String PROJECTION_ID = HiveColumnarLoader.class
+ .getName() + ".projection";
- public static final String DAY_DATE_COLUMN = "daydate";
+ public static final String DATE_RANGE = HiveColumnarLoader.class.getName()
+ + ".date-range";
private static final Text text = new Text();
@@ -130,357 +215,525 @@ public class HiveColumnarLoader extends
* Regex to filter out column names
*/
protected static final Pattern pcols = Pattern.compile("[a-zA-Z_0-9]*[ ]");
- protected static final Log LOG = LogFactory.getLog(HiveColumnarLoader.class);
+ protected static final Log LOG = LogFactory
+ .getLog(HiveColumnarLoader.class);
+
protected TupleFactory tupleFactory = TupleFactory.getInstance();
+ String signature = "";
+
+ // we need to save the dateRange from the constructor if provided to add to
+ // the UDFContext only when the signature is available.
+ String dateRange = null;
+
HiveRCRecordReader reader;
ColumnarSerDe serde = null;
Configuration conf = null;
ResourceSchema pigSchema;
-
- String table_schema;
+ boolean partitionKeysSet = false;
BytesRefArrayWritable buff = null;
- String currentDate;
-
- private String dateRange = null;
- boolean applyDateRanges = false;
-
- private boolean applyColumnRead = false;
private Properties props;
private HiveConf hiveConf;
- private int[] columnToReadPositions;
+
+ transient int[] requiredColumns;
+
+ transient Set<String> partitionColumns;
/**
- * Table schema should be a space and comma separated string describing the Hive schema.<br/>
- * For example uid BIGINT, pid long, means 1 column of uid type BIGINT and one column of pid type LONG.<br/>
- * The types are not case sensitive.
- * @param table_schema This property cannot be null
+ * Implements the logic for searching partition keys and applying parition
+ * filtering
*/
+ transient PathPartitionHelper pathPartitionerHelper = new PathPartitionHelper();
+
+ transient Path currentPath = null;
+ transient Map<String, String> currentPathPartitionKeyMap;
+ /**
+ * Table schema should be a space and comma separated string describing the
+ * Hive schema.<br/>
+ * For example uid BIGINT, pid long, means 1 column of uid type BIGINT and
+ * one column of pid type LONG.<br/>
+ * The types are not case sensitive.
+ *
+ * @param table_schema
+ * This property cannot be null
+ */
public HiveColumnarLoader(String table_schema) {
- //tells all read methods to not apply date range checking
- applyDateRanges = false;
- setup(table_schema, false, null);
+ setup(table_schema);
}
/**
- * Table schema should be a space and comma separated string describing the Hive schema.<br/>
- * For example uid BIGINT, pid long, means 1 column of uid type BIGINT and one column of pid type LONG.<br/>
+ * This constructor is for backward compatibility.
+ *
+ * Table schema should be a space and comma separated string describing the
+ * Hive schema.<br/>
+ * For example uid BIGINT, pid long, means 1 column of uid type BIGINT and
+ * one column of pid type LONG.<br/>
* The types are not case sensitive.
- * @param table_schema This property cannot be null
- * @param dateRange must have format yyyy-MM-dd:yyy-MM-dd only dates between these two dates inclusively will be considered.
+ *
+ * @param table_schema
+ * This property cannot be null
+ * @param dateRange
+ * String
+ * @param columns
+ * String not used any more
*/
- public HiveColumnarLoader(String table_schema, String dateRange) {
- applyDateRanges = (dateRange != null && dateRange.trim().length() > 0);
- this.dateRange = dateRange;
- setup(table_schema, applyDateRanges, null);
- }
+ public HiveColumnarLoader(String table_schema, String dateRange,
+ String columns) {
+ setup(table_schema);
+ this.dateRange = dateRange;
+ }
- public HiveColumnarLoader(String table_schema, String dateRange, String columns) {
- applyDateRanges = (dateRange != null && dateRange.trim().length() > 0);
- this.dateRange = dateRange;
+ /**
+ * This constructor is for backward compatibility.
+ *
+ * Table schema should be a space and comma separated string describing the
+ * Hive schema.<br/>
+ * For example uid BIGINT, pid long, means 1 column of uid type BIGINT and
+ * one column of pid type LONG.<br/>
+ * The types are not case sensitive.
+ *
+ * @param table_schema
+ * This property cannot be null
+ * @param dateRange
+ * String
+ */
+ public HiveColumnarLoader(String table_schema, String dateRange) {
+ setup(table_schema);
- setup(table_schema, applyDateRanges, columns);
+ this.dateRange = dateRange;
}
+ private Properties getUDFContext() {
+ return UDFContext.getUDFContext().getUDFProperties(this.getClass(),
+ new String[] { signature });
+ }
@Override
- public InputFormat<LongWritable, BytesRefArrayWritable> getInputFormat() throws IOException {
- return new HiveRCInputFormat(dateRange);
+ public InputFormat<LongWritable, BytesRefArrayWritable> getInputFormat()
+ throws IOException {
+ LOG.info("Signature: " + signature);
+ return new HiveRCInputFormat(signature);
}
@Override
public Tuple getNext() throws IOException {
- Tuple tuple = null;
-
- try {
- if(reader.nextKeyValue()){
+ Tuple tuple = null;
- BytesRefArrayWritable buff = reader.getCurrentValue();
- ColumnarStruct struct = readColumnarStruct(buff);
+ try {
+ if (reader.nextKeyValue()) {
- if(applyColumnRead) tuple = readColumnarTuple(struct);
- else tuple = readWholeRow(struct);
- }
+ BytesRefArrayWritable buff = reader.getCurrentValue();
+ ColumnarStruct struct = readColumnarStruct(buff);
- } catch (InterruptedException e) {
- throw new IOException(e.toString(), e);
- }
+ tuple = readColumnarTuple(struct, reader.getSplitPath());
+ }
+ } catch (InterruptedException e) {
+ throw new IOException(e.toString(), e);
+ }
- return tuple;
+ return tuple;
}
-
- @SuppressWarnings("unchecked")
@Override
- public void prepareToRead(RecordReader reader, PigSplit split)
- throws IOException {
+ public void prepareToRead(
+ @SuppressWarnings("rawtypes") RecordReader reader, PigSplit split)
+ throws IOException {
+
+ this.reader = (HiveRCRecordReader) reader;
+
+ // check that the required indexes actually exist i.e. the columns that
+ // should be read.
+ // assuming this is always defined simplifies the readColumnarTuple
+ // logic.
+
+ int requiredIndexes[] = getRequiredColumns();
+ if (requiredIndexes == null) {
+
+ int fieldLen = pigSchema.getFields().length;
+
+ // if any the partition keys should already exist
+ String[] partitionKeys = getPartitionKeys(null, null);
+ if (partitionKeys != null) {
+ fieldLen = partitionKeys.length;
+ }
+
+ requiredIndexes = new int[fieldLen];
+
+ for (int i = 0; i < fieldLen; i++) {
+ requiredIndexes[i] = i;
+ }
+
+ this.requiredColumns = requiredIndexes;
+ }
+
+ try {
+ serde = new ColumnarSerDe();
+ serde.initialize(hiveConf, props);
+ } catch (SerDeException e) {
+ LOG.error(e.toString(), e);
+ throw new IOException(e);
+ }
+
+ }
- this.reader = (HiveRCRecordReader)reader;
+ @Override
+ public void setLocation(String location, Job job) throws IOException {
+ FileInputFormat.setInputPaths(job, location);
+ }
- // If the date range applies, set the location for each date range
- if(applyDateRanges)
- currentDate = HiveRCSchemaUtil.extractDayDate(this.reader.getSplitPath().toString());
+ /**
+ * Does the configuration setup and schema parsing and setup.
+ *
+ * @param table_schema
+ * String
+ * @param columnsToRead
+ * String
+ */
+ private void setup(String table_schema) {
- // All fields in a hive rc file are thrift serialized, the ColumnarSerDe is used for serialization and deserialization
- try {
- serde = new ColumnarSerDe();
- serde.initialize(hiveConf, props);
- } catch (SerDeException e) {
- LOG.error(e.toString(), e);
- throw new IOException(e);
- }
+ if (table_schema == null)
+ throw new RuntimeException(
+ "The table schema must be defined as colname type, colname type. All types are hive types");
+
+ // create basic configuration for hdfs and hive
+ conf = new Configuration();
+ hiveConf = new HiveConf(conf, SessionState.class);
+
+ // parse the table_schema string
+ List<String> types = HiveRCSchemaUtil.parseSchemaTypes(table_schema);
+ List<String> cols = HiveRCSchemaUtil.parseSchema(pcols, table_schema);
+
+ List<FieldSchema> fieldSchemaList = new ArrayList<FieldSchema>(
+ cols.size());
+
+ for (int i = 0; i < cols.size(); i++) {
+ fieldSchemaList.add(new FieldSchema(cols.get(i), HiveRCSchemaUtil
+ .findPigDataType(types.get(i))));
+ }
+
+ pigSchema = new ResourceSchema(new Schema(fieldSchemaList));
+
+ props = new Properties();
+
+ // setting table schema properties for ColumnarSerDe
+ // these properties are never changed by the columns to read filter,
+ // because the columnar serde needs to now the
+ // complete format of each record.
+ props.setProperty(Constants.LIST_COLUMNS,
+ HiveRCSchemaUtil.listToString(cols));
+ props.setProperty(Constants.LIST_COLUMN_TYPES,
+ HiveRCSchemaUtil.listToString(types));
}
- @Override
- public void setLocation(String locationStr, Job job) throws IOException {
- FileInputFormat.setInputPaths(job, locationStr);
+ /**
+ * Uses the ColumnarSerde to deserialize the buff:BytesRefArrayWritable into
+ * a ColumnarStruct instance.
+ *
+ * @param buff
+ * BytesRefArrayWritable
+ * @return ColumnarStruct
+ */
+ private ColumnarStruct readColumnarStruct(BytesRefArrayWritable buff) {
+ // use ColumnarSerDe to deserialize row
+ ColumnarStruct struct = null;
+ try {
+ struct = (ColumnarStruct) serde.deserialize(buff);
+ } catch (SerDeException e) {
+ LOG.error(e.toString(), e);
+ throw new RuntimeException(e.toString(), e);
+ }
+
+ return struct;
}
/**
- * Does the configuration setup and schema parsing and setup.
- * @param table_schema String
- * @param includeDayDateColumn boolean
- * @param columnsToRead String
+ * Only read the columns that were requested in the constructor.<br/>
+ *
+ * @param struct
+ * ColumnarStruct
+ * @param path
+ * Path
+ * @return Tuple
+ * @throws IOException
*/
- private void setup(String table_schema, boolean includeDayDateColumn, String columnsToRead){
+ private Tuple readColumnarTuple(ColumnarStruct struct, Path path)
+ throws IOException {
+ int[] columnIndexes = getRequiredColumns();
+ // the partition keys if any will already be in the UDFContext here.
+ String[] partitionKeys = getPartitionKeys(null, null);
+ // only if the path has changed should be run the
+ if (currentPath == null || !currentPath.equals(path)) {
+ currentPathPartitionKeyMap = (partitionKeys == null) ? null
+ : pathPartitionerHelper.getPathPartitionKeyValues(path
+ .toString());
+ currentPath = path;
+ }
+
+ // if the partitionColumns is null this value will stop the for loop
+ // below from trynig to add any partition columns
+ // that do not exist
+ int partitionColumnStartIndex = Integer.MAX_VALUE;
+
+ if (!(partitionColumns == null || partitionColumns.size() == 0)) {
+ // partition columns are always appended to the schema fields.
+ partitionColumnStartIndex = pigSchema.getFields().length;
+
+ }
+
+ // create tuple with determined previous size
+ Tuple t = tupleFactory.newTuple(columnIndexes.length);
+
+ // read in all columns
+ for (int i = 0; i < columnIndexes.length; i++) {
+ int columnIndex = columnIndexes[i];
+
+ if (columnIndex < partitionColumnStartIndex) {
+ Object obj = struct.getField(columnIndex, text);
+ Object pigType = HiveRCSchemaUtil
+ .extractPigTypeFromHiveType(obj);
+
+ t.set(i, pigType);
+
+ } else {
+ // read the partition columns
+ // will only be executed if partitionColumns is not null
+ String key = partitionKeys[columnIndex
+ - partitionColumnStartIndex];
+ Object value = currentPathPartitionKeyMap.get(key);
+ t.set(i, value);
- if(table_schema == null)
- throw new RuntimeException("The table schema must be defined as colname type, colname type. All types are hive types");
+ }
- this.table_schema = table_schema;
+ }
- //create basic configuration for hdfs and hive
- conf = new Configuration();
- hiveConf = new HiveConf(conf, SessionState.class);
+ return t;
+ }
- //parse the table_schema string
- List<String> types = HiveRCSchemaUtil.parseSchemaTypes(table_schema);
- List<String> cols = HiveRCSchemaUtil.parseSchema(pcols, table_schema);
- List<FieldSchema> fieldschema = null;
+ /**
+ * Will parse the required columns from the UDFContext properties if the
+ * requiredColumns[] variable is null, or else just return the
+ * requiredColumns.
+ *
+ * @return int[]
+ */
+ private int[] getRequiredColumns() {
- //all columns must have types defined
- if(types.size() != cols.size())
- throw new RuntimeException("Each column in the schema must have a type defined");
+ if (requiredColumns == null) {
+ Properties properties = getUDFContext();
+ String projectionStr = properties.getProperty(PROJECTION_ID);
- //check if previous projection exists
- if(columnsToRead == null){
- Properties properties = UDFContext.getUDFContext().getUDFProperties(this.getClass());
- String projection = properties.getProperty(PROJECTION_ID);
- if(projection != null && !projection.isEmpty())
- columnsToRead = projection;
- }
+ if (projectionStr != null) {
+ String[] split = projectionStr.split(",");
+ int columnIndexes[] = new int[split.length];
+ int index = 0;
+ for (String splitItem : split) {
+ columnIndexes[index++] = Integer.parseInt(splitItem);
+ }
- //re-check columnsToRead
- if (columnsToRead == null) {
+ requiredColumns = columnIndexes;
+ }
- fieldschema = new ArrayList<FieldSchema>(cols.size());
+ }
- for(int i = 0; i < cols.size(); i++){
- fieldschema.add(new FieldSchema(cols.get(i), HiveRCSchemaUtil.findPigDataType(types.get(i))));
- }
+ return requiredColumns;
+ }
+ /**
+ * Reads the partition columns
+ *
+ * @param location
+ * @param job
+ * @return
+ */
+ private Set<String> getPartitionColumns(String location, Job job) {
- } else {
- //compile list for column filtering
- Set<String> columnToReadList = HiveRCSchemaUtil.compileSet(columnsToRead);
+ if (partitionColumns == null) {
+ // read the partition columns from the UDF Context first.
+ // if not in the UDF context then read it using the PathPartitioner.
- if(columnToReadList.size() < 1)
- throw new RuntimeException("Error parsing columns: " + columnsToRead);
+ Properties properties = getUDFContext();
- applyColumnRead = true;
- int columnToReadLen = columnToReadList.size();
+ if (properties == null)
+ properties = new Properties();
- fieldschema = new ArrayList<FieldSchema>(columnToReadLen);
+ String partitionColumnStr = properties
+ .getProperty(PathPartitionHelper.PARTITION_COLUMNS);
+ if (partitionColumnStr == null
+ && !(location == null || job == null)) {
+ // if it hasn't been written yet.
+ Set<String> partitionColumnSet;
- //--- create Pig Schema and add columnToReadPositions.
- columnToReadPositions = new int[columnToReadLen];
+ try {
+ partitionColumnSet = pathPartitionerHelper
+ .getPartitionKeys(location, job.getConfiguration());
+ } catch (IOException e) {
- int len = cols.size();
- String columnName = null;
- int colArrayPosindex = 0;
+ RuntimeException rte = new RuntimeException(e);
+ rte.setStackTrace(e.getStackTrace());
+ throw rte;
- for(int i = 0; i < len; i++){
- //i is the column position
- columnName = cols.get(i);
- if(columnToReadList.contains(columnName)){
- //if the column is contained in the columnList then add its position to the columnPositions array and to the pig schema
- columnToReadPositions[colArrayPosindex++] = i;
+ }
- fieldschema.add(new FieldSchema(columnName, HiveRCSchemaUtil.findPigDataType(types.get(i))));
- }
+ if (partitionColumnSet != null) {
- }
+ StringBuilder buff = new StringBuilder();
+ int i = 0;
+ for (String column : partitionColumnSet) {
+ if (i++ != 0) {
+ buff.append(',');
+ }
- //sort column positions
- Arrays.sort(columnToReadPositions);
+ buff.append(column);
+ }
- }
+ String buffStr = buff.toString().trim();
- if(includeDayDateColumn){
- fieldschema.add(new FieldSchema(DAY_DATE_COLUMN, DataType.CHARARRAY));
- }
+ if (buffStr.length() > 0) {
- pigSchema = new ResourceSchema(new Schema(fieldschema));
+ properties.setProperty(
+ PathPartitionHelper.PARTITION_COLUMNS,
+ buff.toString());
+ }
- props = new Properties();
+ partitionColumns = partitionColumnSet;
- // setting table schema properties for ColumnarSerDe
- // these properties are never changed by the columns to read filter, because the columnar serde needs to now the
- // complete format of each record.
- props.setProperty(Constants.LIST_COLUMNS, HiveRCSchemaUtil.listToString(cols));
- props.setProperty(Constants.LIST_COLUMN_TYPES, HiveRCSchemaUtil.listToString(types));
+ }
- }
+ } else {
+ // the partition columns has been set already in the UDF Context
+ if (partitionColumnStr != null) {
+ String split[] = partitionColumnStr.split(",");
+ partitionColumns = new LinkedHashSet<String>();
+ if (split.length > 0) {
+ for (String splitItem : split) {
+ partitionColumns.add(splitItem);
+ }
+ }
+ }
- /**
- * Uses the ColumnarSerde to deserialize the buff:BytesRefArrayWritable into a ColumnarStruct instance.
- * @param buff BytesRefArrayWritable
- * @return ColumnarStruct
- */
- private ColumnarStruct readColumnarStruct(BytesRefArrayWritable buff){
- //use ColumnarSerDe to deserialize row
- ColumnarStruct struct = null;
- try {
- struct = (ColumnarStruct)serde.deserialize(buff);
- } catch (SerDeException e) {
- LOG.error(e.toString(), e);
- throw new RuntimeException(e.toString(), e);
- }
+ }
- return struct;
- }
- /**
- * Only read the columns that were requested in the constructor.<br/>
- * @param struct ColumnarStruct
- * @return Tuple
- * @throws IOException
- */
- private Tuple readColumnarTuple(ColumnarStruct struct) throws IOException{
+ }
+ return partitionColumns;
- int columnToReadLen = columnToReadPositions.length;
+ }
- //create tuple with determined previous size
- Tuple t = tupleFactory.newTuple( (applyDateRanges)?columnToReadLen + 1 : columnToReadLen );
+ @Override
+ public String[] getPartitionKeys(String location, Job job)
+ throws IOException {
+ Set<String> partitionKeys = getPartitionColumns(location, job);
- int index = 0;
+ return partitionKeys == null ? null : partitionKeys
+ .toArray(new String[] {});
+ }
- //read in all columns
- for(int i = 0; i < columnToReadLen; i++){
- index = columnToReadPositions[i];
+ @Override
+ public ResourceSchema getSchema(String location, Job job)
+ throws IOException {
- Object obj = struct.getField(index, text);
+ if (!partitionKeysSet) {
+ Set<String> keys = getPartitionColumns(location, job);
- t.set(i, HiveRCSchemaUtil.extractPigTypeFromHiveType(obj));
+ if (!(keys == null || keys.size() == 0)) {
- }
+ // re-edit the pigSchema to contain the new partition keys.
+ ResourceFieldSchema[] fields = pigSchema.getFields();
- if(applyDateRanges){
- //see creation of tuple if applyDateRanges == true the length of the tuple is columnToReadLen + 1
- t.set(columnToReadLen, currentDate);
- }
+ LOG.debug("Schema: " + Arrays.toString(fields));
+ ResourceFieldSchema[] newFields = Arrays.copyOf(fields,
+ fields.length + keys.size());
- return t;
- }
+ int index = fields.length;
- /**
- * Read all columns in the row
- * @param struct
- * @return Tuple
- * @throws IOException
- */
- private Tuple readWholeRow(ColumnarStruct struct) throws IOException {
+ for (String key : keys) {
+ newFields[index++] = new ResourceFieldSchema(
+ new FieldSchema(key, DataType.CHARARRAY));
+ }
- //create tuple
- Tuple t = tupleFactory.newTuple();
- //read row fields
- List<Object> values = struct.getFieldsAsList(text);
- //for each value in the row convert to the correct pig type
- if(values != null && values.size() > 0){
-
- for(Object value : values){
- t.append(HiveRCSchemaUtil.extractPigTypeFromHiveType(value));
- }
-
- }
-
- if(applyDateRanges){
- t.append(currentDate);
- }
+ pigSchema.setFields(newFields);
- return t;
+ LOG.debug("Added partition fields: " + keys
+ + " to loader schema");
+ LOG.debug("Schema is: " + Arrays.toString(newFields));
+ }
- }
+ partitionKeysSet = true;
- @Override
- public String[] getPartitionKeys(String location, Job job)
- throws IOException {
- return null;
- }
+ }
- @Override
- public ResourceSchema getSchema(String location, Job job)
- throws IOException {
- return pigSchema;
+ return pigSchema;
}
@Override
public ResourceStatistics getStatistics(String location, Job job)
- throws IOException {
- return null;
+ throws IOException {
+ return null;
}
@Override
public void setPartitionFilter(Expression partitionFilter)
- throws IOException {
+ throws IOException {
+ getUDFContext().setProperty(
+ PathPartitionHelper.PARITITION_FILTER_EXPRESSION,
+ partitionFilter.toString());
}
@Override
public List<OperatorSet> getFeatures() {
- return Arrays.asList(LoadPushDown.OperatorSet.PROJECTION);
+ return Arrays.asList(LoadPushDown.OperatorSet.PROJECTION);
}
@Override
public RequiredFieldResponse pushProjection(
- RequiredFieldList requiredFieldList) throws FrontendException {
+ RequiredFieldList requiredFieldList) throws FrontendException {
- StringBuilder buff = new StringBuilder();
- ResourceFieldSchema[] fields = pigSchema.getFields();
+ // save the required field list to the UDFContext properties.
+ StringBuilder buff = new StringBuilder();
- String fieldName = null;
+ int i = 0;
+ for (RequiredField f : requiredFieldList.getFields()) {
+ if (i++ != 0)
+ buff.append(',');
- for(RequiredField f : requiredFieldList.getFields()){
- fieldName = fields[f.getIndex()].getName();
- if(!fieldName.equals(DAY_DATE_COLUMN))
- buff.append(fieldName).append(",");
- }
+ buff.append(f.getIndex());
+ }
- String projectionStr = buff.substring(0, buff.length()-1);
+ Properties properties = getUDFContext();
- setup(table_schema, applyDateRanges, projectionStr);
+ properties.setProperty(PROJECTION_ID, buff.toString());
- Properties properties = UDFContext.getUDFContext().getUDFProperties(
- this.getClass());
+ return new RequiredFieldResponse(true);
+ }
- if(!projectionStr.isEmpty())
- properties.setProperty( PROJECTION_ID, projectionStr );
+ @Override
+ public void setUDFContextSignature(String signature) {
+ super.setUDFContextSignature(signature);
- return new RequiredFieldResponse(true);
+ LOG.debug("Signature: " + signature);
+ this.signature = signature;
+
+ // this provides backwards compatibility
+ // the HiveRCInputFormat will read this and if set will perform the
+ // needed partitionFiltering
+ if (dateRange != null) {
+ getUDFContext().setProperty(DATE_RANGE, dateRange);
+ }
+
}
-
}
Modified: hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/hiverc/HiveRCDateSplitter.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/hiverc/HiveRCDateSplitter.java?rev=982445&r1=982444&r2=982445&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/hiverc/HiveRCDateSplitter.java (original)
+++ hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/hiverc/HiveRCDateSplitter.java Thu Aug 5 00:38:58 2010
@@ -1,159 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership. The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package org.apache.pig.piggybank.storage.hiverc;
-
-import java.io.IOException;
-import java.text.DateFormat;
-import java.text.ParseException;
-import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.List;
-
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.JobContext;
-
-/**
- * Expects the location to be a directory with the format dir/daydate=yyyy-MM-dd/{dirs or files}<br/>
- * Only dateDir(s) within the date range [date1, date2] will be returned in the method splitDirectory
- */
-public class HiveRCDateSplitter {
-
- private static final DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd");
-
- /**
- * Start of date range
- */
- Date date1;
- /**
- * End of date range
- */
- Date date2;
-
- /**
- *
- * @param dateRange String must have format yyyy-MM-dd:yyyy-MM-dd, the left most date is the start of the range.
- */
- public HiveRCDateSplitter(String dateRange){
- setupDateRange(dateRange);
- }
-
-
- /**
- *
- * @param job
- * @param location
- * @return
- * @throws IOException
- */
- public List<FileStatus> splitDirectory(JobContext job, Path dir) throws IOException{
-
- FileSystem fs = dir.getFileSystem(job.getConfiguration());
-
- List<FileStatus> paths = new ArrayList<FileStatus>();
-
- if(fs.getFileStatus(dir).isDir()){
- //expect the structure dir/[datefolder]/{dirs or files}
-
- FileStatus[] dateDirs = fs.listStatus(dir);
- Path dateDirPath = null;
-
- for(FileStatus dateDirStatus : dateDirs){
- dateDirPath = dateDirStatus.getPath();
-
- //if the path is a directory and it is within the date range, add all of its sub-files
- if(dateDirStatus.isDir() && isInDateRange(dateDirPath.getName()))
- addAllFiles(fs, dateDirPath, paths);
-
-
- }
-
- }
-
- return paths;
- }
-
-
- /**
- * Parse through the directory structure and for each file
- * @param fs
- * @param dir
- * @param paths
- * @throws IOException
- */
- private final void addAllFiles(FileSystem fs, Path dir, List<FileStatus> paths) throws IOException{
-
- FileStatus[] files = fs.listStatus(dir);
-
- for(FileStatus fileStatus : files){
-
- if(fileStatus.isDir()){
- addAllFiles(fs, fileStatus.getPath(), paths);
- }else{
- paths.add(fileStatus);
- }
-
- }
- }
-
- /**
- * Extracts the daydate parameter from fileName and compares its date value with date1, and date2.
- * @param fileName
- * @return boolean true if the date value is between date1 and date2 inclusively
- */
- private final boolean isInDateRange(String fileName) {
- //if date ranges are to be applied, apply them and if the file daydate field
- //is not in the date range set the shouldRead to false and return
- String currentDate;
- Date date;
-
- try {
- currentDate = HiveRCSchemaUtil.extractDayDate(fileName);
- date = dateFormat.parse(currentDate);
- } catch (ParseException e) {
- throw new RuntimeException(e);
- }
-
- int c1 = date.compareTo(date1);
- int c2 = date.compareTo(date2);
-
-
- return (c1 >= 0 && c2 <= 0);
- }
-
- /**
- * Parses the dateRange:String and creates the date1:Date and date2:Date instances forming the start and end of the date range.
- * @param dateRange
- */
- private void setupDateRange(String dateRange){
- //if a dateRange is specified apply date range filtering
- if(dateRange != null && dateRange.trim().length() > 0){
- String[] dates = dateRange.split(":");
-
- try {
- date1 = dateFormat.parse(dates[0]);
- date2 = dateFormat.parse(dates[1]);
- } catch (ParseException e) {
- throw new RuntimeException("The dateRange must have format yyyy-MM-dd:yyyy-MM-dd", e);
- }
-
- }
- }
-
-}
Modified: hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/hiverc/HiveRCInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/hiverc/HiveRCInputFormat.java?rev=982445&r1=982444&r2=982445&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/hiverc/HiveRCInputFormat.java (original)
+++ hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/hiverc/HiveRCInputFormat.java Thu Aug 5 00:38:58 2010
@@ -17,11 +17,15 @@
package org.apache.pig.piggybank.storage.hiverc;
import java.io.IOException;
-import java.util.ArrayList;
+import java.text.DateFormat;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Calendar;
+import java.util.Date;
import java.util.List;
+import java.util.Properties;
import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.io.RCFile;
import org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable;
import org.apache.hadoop.io.LongWritable;
@@ -30,43 +34,152 @@ import org.apache.hadoop.mapreduce.JobCo
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-
+import org.apache.pig.impl.util.UDFContext;
+import org.apache.pig.piggybank.storage.HiveColumnarLoader;
+import org.apache.pig.piggybank.storage.partition.PathPartitionHelper;
/**
* HiveRCInputFormat used by HiveColumnarLoader as the InputFormat;
* <p/>
* Reasons for implementing a new InputFormat sub class:<br/>
* <ul>
- * <li>The current RCFileInputFormat uses the old InputFormat mapred interface, and the pig load store design used the new InputFormat mapreduce classes.</li>
- * <li>The splits are calculated by the InputFormat, HiveColumnarLoader supports date partitions, the filtering is done here.</li>
+ * <li>The current RCFileInputFormat uses the old InputFormat mapred interface,
+ * and the pig load store design used the new InputFormat mapreduce classes.</li>
+ * <li>The splits are calculated by the InputFormat, HiveColumnarLoader supports
+ * date partitions, the filtering is done here.</li>
* </ul>
*/
-public class HiveRCInputFormat extends FileInputFormat<LongWritable, BytesRefArrayWritable>{
+public class HiveRCInputFormat extends
+ FileInputFormat<LongWritable, BytesRefArrayWritable> {
- /**
- * Implements the date splitting logic, this keeps the HiveRCInputFormat clean of the date calculations,
- * and makes extending it to support other types of partitioning in the future easier.
- */
- private HiveRCDateSplitter dateSplitter;
- /**
- * Only true if the HiveRCInputFormat(dateRange:String) constructor is used
- */
- private boolean applyDateRanges = false;
+ transient PathPartitionHelper partitionHelper = new PathPartitionHelper();
+
+ String signature = "";
+
+ public HiveRCInputFormat() {
+ this(null);
+ }
+
+ public HiveRCInputFormat(String signature) {
+ this.signature = signature;
+
+ Properties properties = UDFContext.getUDFContext().getUDFProperties(
+ HiveColumnarLoader.class, new String[] { signature });
+
+ // This expression is passed in the
+ // HiveColumnarLoader.setPartitionExpression method by the Pig Loader
+ // Classes.
+ String partitionExpression = properties
+ .getProperty(PathPartitionHelper.PARITITION_FILTER_EXPRESSION);
+
+ // backwards compatibility
+ String dateRange = properties
+ .getProperty(HiveColumnarLoader.DATE_RANGE);
+ if (partitionExpression == null && dateRange != null) {
+ partitionExpression = buildFilterExpressionFromDatePartition(dateRange);
+ properties.setProperty(
+ PathPartitionHelper.PARITITION_FILTER_EXPRESSION,
+ partitionExpression);
+ }
+
+ }
+
+ @Override
+ protected List<FileStatus> listStatus(JobContext jobContext)
+ throws IOException {
+
+ List<FileStatus> files = partitionHelper.listStatus(jobContext,
+ HiveColumnarLoader.class, signature);
+
+ if (files == null)
+ files = super.listStatus(jobContext);
+
+ return files;
- /**
- * No date partitioning is applied
- */
- public HiveRCInputFormat(){
}
/**
- * Date partitioning will be applied to the input path.<br/>
- * The path must be partitioned as input-path/daydate=yyyy-MM-dd.
- * @param dateRange Must have format yyyy-MM-dd:yyyy-MM-dd with the left most being the start of the range.
- */
- public HiveRCInputFormat(String dateRange){
- applyDateRanges = true;
- dateSplitter = new HiveRCDateSplitter(dateRange);
+ * If the date range was supplied in the loader constructor we need to build
+ * our own filter expression.<br/>
+ *
+ * @param dateRange
+ * @return String
+ */
+ private String buildFilterExpressionFromDatePartition(String dateRange) {
+ Properties properties = UDFContext.getUDFContext().getUDFProperties(
+ HiveColumnarLoader.class, new String[] { signature });
+
+ String partitionColumnStr = properties
+ .getProperty(PathPartitionHelper.PARTITION_COLUMNS);
+
+ boolean isYearMonthDayFormat = false;
+
+ // only 3 partition types are supported (its impossible with date
+ // partitions to support all possible combinations here).
+ // 1) yyyy-MM-dd which is as /daydate=[date]/files
+ // 2) yyyy-MM-dd which is as /date=[date]/files
+ // 3) yyyy-MM-dd which is as /year=[year]/month=[month]/day=[day]
+ String key = null;
+ if (partitionColumnStr.contains("daydate")) {
+ key = "daydate"; // use daydate as key
+ } else if (partitionColumnStr.contains("date")) {
+ key = "date"; // user date as key
+ } else if (partitionColumnStr.contains("year")
+ && partitionColumnStr.contains("month")
+ && partitionColumnStr.contains("day")) {
+ isYearMonthDayFormat = true;
+ } else {
+ throw new RuntimeException(
+ "Not date partitions where found for partitions: "
+ + partitionColumnStr);
+ }
+
+ String[] split = dateRange.split(":");
+
+ if (split.length != 2) {
+ throw new RuntimeException(
+ "The date range must have format yyyy-MM-dd:yyyy-MM-dd");
+ }
+
+ String partitionExpression = null;
+ if (isYearMonthDayFormat) {
+ // extract the YearMonthDay from the to dates;
+ DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd");
+ Date date1 = parseDate(dateFormat, split[0]);
+
+ Calendar cal = Calendar.getInstance();
+ cal.setTime(date1);
+
+ partitionExpression = "(year >= '" + cal.get(Calendar.YEAR)
+ + "' and month >= '"
+ + formatNumber((cal.get(Calendar.MONTH) + 1))
+ + "' and day >= '"
+ + formatNumber(cal.get(Calendar.DAY_OF_MONTH)) + "')";
+
+ Date date2 = parseDate(dateFormat, split[1]);
+ cal.setTime(date2);
+
+ partitionExpression += " and (year <= '" + cal.get(Calendar.YEAR)
+ + "' and month <= '"
+ + formatNumber((cal.get(Calendar.MONTH) + 1))
+ + "' and day <= '"
+ + formatNumber(cal.get(Calendar.DAY_OF_MONTH)) + "')";
+
+ } else {
+ partitionExpression = key + " >= '" + split[0] + "' and " + key
+ + " <= '" + split[1] + "'";
+ }
+
+ return partitionExpression;
+ }
+
+ private static final String formatNumber(int numb) {
+
+ if (numb < 10) {
+ return "0" + numb;
+ } else {
+ return "" + numb;
+ }
}
/**
@@ -74,51 +187,40 @@ public class HiveRCInputFormat extends F
*/
@Override
public RecordReader<LongWritable, BytesRefArrayWritable> createRecordReader(
- InputSplit split, TaskAttemptContext ctx) throws IOException,
- InterruptedException {
+ InputSplit split, TaskAttemptContext ctx) throws IOException,
+ InterruptedException {
- HiveRCRecordReader reader = new HiveRCRecordReader();
- reader.initialize(split, ctx);
+ HiveRCRecordReader reader = new HiveRCRecordReader();
- return reader;
+ return reader;
}
/**
- * This method is called by the FileInputFormat to find the input paths for which splits should be calculated.<br/>
- * If applyDateRanges == true: Then the HiveRCDateSplitter is used to apply filtering on the input files.<br/>
- * Else the default FileInputFormat listStatus method is used.
- */
- @Override
- protected List<FileStatus> listStatus(JobContext ctx)throws IOException {
- //for each path in the FileInputFormat input paths, create a split.
- //If applyDateRanges:
- //the date logic is handled in the HiveRCLoader where the FileInputFormat inputPaths is set
- //to include only the files within the given date range, when date range applies
- //Else
- // add all files
- Path[] inputPaths = FileInputFormat.getInputPaths(ctx);
-
- List<FileStatus> splitPaths = new ArrayList<FileStatus>();
-
- if(applyDateRanges){
- //use the dateSplitter to calculate only those paths that are in the correct date partition
- for(Path inputPath : inputPaths){
- splitPaths.addAll(dateSplitter.splitDirectory(ctx, inputPath));
- }
- }else{
- //use the default implementation
- splitPaths = super.listStatus(ctx);
- }
-
- return splitPaths;
+ * Parse a date string with format yyyy-MM-dd.
+ *
+ * @param dateFormat
+ * DateFormat
+ * @param dateString
+ * String
+ * @return Date
+ */
+ private static final Date parseDate(DateFormat dateFormat, String dateString) {
+ try {
+ return dateFormat.parse(dateString);
+ } catch (ParseException e) {
+ RuntimeException rt = new RuntimeException(e);
+ rt.setStackTrace(e.getStackTrace());
+ throw rt;
+ }
}
/**
- * The input split size should never be smaller than the RCFile.SYNC_INTERVAL
+ * The input split size should never be smaller than the
+ * RCFile.SYNC_INTERVAL
*/
@Override
- protected long getFormatMinSplitSize(){
- return RCFile.SYNC_INTERVAL;
+ protected long getFormatMinSplitSize() {
+ return RCFile.SYNC_INTERVAL;
}
}
Modified: hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/hiverc/HiveRCRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/hiverc/HiveRCRecordReader.java?rev=982445&r1=982444&r2=982445&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/hiverc/HiveRCRecordReader.java (original)
+++ hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/hiverc/HiveRCRecordReader.java Thu Aug 5 00:38:58 2010
@@ -31,7 +31,8 @@ import org.apache.hadoop.mapreduce.lib.i
/**
* This class delegates the work to the RCFileRecordReader<br/>
*/
-public class HiveRCRecordReader extends RecordReader<LongWritable, BytesRefArrayWritable> {
+public class HiveRCRecordReader extends
+ RecordReader<LongWritable, BytesRefArrayWritable> {
LongWritable key;
BytesRefArrayWritable value;
@@ -42,51 +43,52 @@ public class HiveRCRecordReader extends
@Override
public void close() throws IOException {
- rcFileRecordReader.close();
+ rcFileRecordReader.close();
}
@Override
- public LongWritable getCurrentKey() throws IOException, InterruptedException {
- return key;
+ public LongWritable getCurrentKey() throws IOException,
+ InterruptedException {
+ return key;
}
@Override
- public BytesRefArrayWritable getCurrentValue() throws IOException, InterruptedException {
- return value;
+ public BytesRefArrayWritable getCurrentValue() throws IOException,
+ InterruptedException {
+ return value;
}
@Override
public float getProgress() throws IOException, InterruptedException {
- return rcFileRecordReader.getProgress();
+ return rcFileRecordReader.getProgress();
}
- public Path getSplitPath(){
- return splitPath;
+ public Path getSplitPath() {
+ return splitPath;
}
@SuppressWarnings("deprecation")
@Override
public void initialize(InputSplit split, TaskAttemptContext ctx)
- throws IOException, InterruptedException {
+ throws IOException, InterruptedException {
- FileSplit fileSplit = (FileSplit)split;
- Configuration conf = ctx.getConfiguration();
- splitPath = fileSplit.getPath();
+ FileSplit fileSplit = (FileSplit) split;
+ Configuration conf = ctx.getConfiguration();
+ splitPath = fileSplit.getPath();
+
+ rcFileRecordReader = new RCFileRecordReader<LongWritable, BytesRefArrayWritable>(
+ conf, new org.apache.hadoop.mapred.FileSplit(splitPath,
+ fileSplit.getStart(), fileSplit.getLength(),
+ new org.apache.hadoop.mapred.JobConf(conf)));
- rcFileRecordReader = new RCFileRecordReader<LongWritable, BytesRefArrayWritable>(conf,
- new org.apache.hadoop.mapred.FileSplit(splitPath, fileSplit.getStart(), fileSplit.getLength(),
- new org.apache.hadoop.mapred.JobConf(conf)) );
-
-
- key = rcFileRecordReader.createKey();
- value = rcFileRecordReader.createValue();
+ key = rcFileRecordReader.createKey();
+ value = rcFileRecordReader.createValue();
}
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
- return rcFileRecordReader.next(key, value);
+ return rcFileRecordReader.next(key, value);
}
-
}
Modified: hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/hiverc/HiveRCSchemaUtil.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/hiverc/HiveRCSchemaUtil.java?rev=982445&r1=982444&r2=982445&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/hiverc/HiveRCSchemaUtil.java (original)
+++ hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/hiverc/HiveRCSchemaUtil.java Thu Aug 5 00:38:58 2010
@@ -19,9 +19,9 @@ package org.apache.pig.piggybank.storage
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.Set;
import java.util.TreeSet;
-import java.util.Map.Entry;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -57,7 +57,7 @@ public class HiveRCSchemaUtil {
* Regex to filter out column types
*/
protected static final Pattern ptypes = Pattern
- .compile("([ ][a-zA-Z0-9]*)|([a-zA-Z_0-9]*[<][a-zA-Z,_0-9]*[>])");
+ .compile("([ ][a-zA-Z0-9]*)|([a-zA-Z_0-9]*[<][a-zA-Z,_0-9]*[>])");
/**
* General schema parsing method, is used to parse the column names.
@@ -69,15 +69,15 @@ public class HiveRCSchemaUtil {
* @return List of String
*/
public static List<String> parseSchema(Pattern pattern, String schema) {
- List<String> types = new ArrayList<String>();
- Matcher m = pattern.matcher(schema);
- String item = null;
- while (m.find()) {
- item = m.group().trim();
- if (item.length() > 0)
- types.add(item);
- }
- return types;
+ List<String> types = new ArrayList<String>();
+ Matcher m = pattern.matcher(schema);
+ String item = null;
+ while (m.find()) {
+ item = m.group().trim();
+ if (item.length() > 0)
+ types.add(item);
+ }
+ return types;
}
/**
@@ -87,35 +87,35 @@ public class HiveRCSchemaUtil {
* @return List of String
*/
public static List<String> parseSchemaTypes(String schema) {
- List<String> types = new ArrayList<String>();
- Matcher m = ptypes.matcher(schema);
- String item = null;
-
- while (m.find()) {
- item = m.group().trim();
- if (item.length() > 0) {
- if (item.equalsIgnoreCase("map")) {
- // if generic type
- if (m.find()) {
- types.add(item + m.group().trim());
- } else {
- throw new RuntimeException(
- "Map must have generic types specified");
- }
- } else if (item.equalsIgnoreCase("array")) {
- // if generic type
- if (m.find()) {
- types.add(item + m.group().trim());
- } else {
- throw new RuntimeException(
- "Array must have generic types specified");
- }
- } else {
- types.add(item);
- }
- }
- }
- return types;
+ List<String> types = new ArrayList<String>();
+ Matcher m = ptypes.matcher(schema);
+ String item = null;
+
+ while (m.find()) {
+ item = m.group().trim();
+ if (item.length() > 0) {
+ if (item.equalsIgnoreCase("map")) {
+ // if generic type
+ if (m.find()) {
+ types.add(item + m.group().trim());
+ } else {
+ throw new RuntimeException(
+ "Map must have generic types specified");
+ }
+ } else if (item.equalsIgnoreCase("array")) {
+ // if generic type
+ if (m.find()) {
+ types.add(item + m.group().trim());
+ } else {
+ throw new RuntimeException(
+ "Array must have generic types specified");
+ }
+ } else {
+ types.add(item);
+ }
+ }
+ }
+ return types;
}
/**
@@ -125,14 +125,14 @@ public class HiveRCSchemaUtil {
* @return String
*/
public static final String listToString(List<String> list) {
- StringBuilder buff = new StringBuilder();
+ StringBuilder buff = new StringBuilder();
- for (String item : list) {
- buff.append(item.trim()).append(",");
- }
- int len = buff.length() - 1;
- buff.delete(len, len);
- return buff.toString();
+ for (String item : list) {
+ buff.append(item.trim()).append(",");
+ }
+ int len = buff.length() - 1;
+ buff.delete(len, len);
+ return buff.toString();
}
/**
@@ -144,15 +144,15 @@ public class HiveRCSchemaUtil {
* @return String
*/
public static final String extractDayDate(String fileName) {
- int index = fileName.indexOf("daydate=");
- String dateStr = null;
- if (index == 0)
- dateStr = fileName.substring(8, fileName.length());
- else if (index > 0)
- dateStr = fileName.substring(index + 8, fileName
- .indexOf('/', index));
+ int index = fileName.indexOf("daydate=");
+ String dateStr = null;
+ if (index == 0)
+ dateStr = fileName.substring(8, fileName.length());
+ else if (index > 0)
+ dateStr = fileName.substring(index + 8,
+ fileName.indexOf('/', index));
- return dateStr;
+ return dateStr;
}
/**
@@ -163,83 +163,87 @@ public class HiveRCSchemaUtil {
*/
public static final Set<String> compileSet(String columnsToRead) {
- String[] columnsArr = columnsToRead.split(",");
- int len = columnsArr.length;
+ String[] columnsArr = columnsToRead.split(",");
+ int len = columnsArr.length;
- Set<String> columnsSet = new TreeSet<String>();
+ Set<String> columnsSet = new TreeSet<String>();
- for (int i = 0; i < len; i++) {
- columnsSet.add(columnsArr[i].trim());
- }
+ for (int i = 0; i < len; i++) {
+ columnsSet.add(columnsArr[i].trim());
+ }
- return columnsSet;
+ return columnsSet;
}
+
/**
* Returns the pig DataType for the hive type
+ *
* @param hiveType
* @return byte from DataType
*/
public static byte findPigDataType(String hiveType) {
- hiveType = hiveType.toLowerCase();
+ hiveType = hiveType.toLowerCase();
- if (hiveType.equals("string"))
- return DataType.CHARARRAY;
- else if (hiveType.equals("int"))
- return DataType.INTEGER;
- else if (hiveType.equals("bigint") || hiveType.equals("long"))
- return DataType.LONG;
- else if (hiveType.equals("float"))
- return DataType.FLOAT;
- else if (hiveType.equals("double"))
- return DataType.DOUBLE;
- else if (hiveType.equals("boolean"))
- return DataType.INTEGER;
- else if (hiveType.equals("byte"))
- return DataType.INTEGER;
- else if (hiveType.contains("array"))
- return DataType.TUPLE;
- else if (hiveType.contains("map"))
- return DataType.MAP;
- else
- return DataType.ERROR;
+ if (hiveType.equals("string"))
+ return DataType.CHARARRAY;
+ else if (hiveType.equals("int"))
+ return DataType.INTEGER;
+ else if (hiveType.equals("bigint") || hiveType.equals("long"))
+ return DataType.LONG;
+ else if (hiveType.equals("float"))
+ return DataType.FLOAT;
+ else if (hiveType.equals("double"))
+ return DataType.DOUBLE;
+ else if (hiveType.equals("boolean"))
+ return DataType.INTEGER;
+ else if (hiveType.equals("byte"))
+ return DataType.INTEGER;
+ else if (hiveType.contains("array"))
+ return DataType.TUPLE;
+ else if (hiveType.contains("map"))
+ return DataType.MAP;
+ else
+ return DataType.ERROR;
}
/**
* Converts from a hive type to a pig type
- * @param value Object hive type
+ *
+ * @param value
+ * Object hive type
* @return Object pig type
*/
public static Object extractPigTypeFromHiveType(Object value) {
- if (value instanceof org.apache.hadoop.hive.serde2.lazy.LazyArray) {
- value = parseLazyArrayToPigArray((org.apache.hadoop.hive.serde2.lazy.LazyArray) value);
- } else if (value instanceof org.apache.hadoop.hive.serde2.lazy.LazyMap) {
- value = parseLazyMapToPigMap((org.apache.hadoop.hive.serde2.lazy.LazyMap) value);
- } else {
-
- if (value instanceof LazyString) {
- value = ((LazyString) value).getWritableObject().toString();
- } else if (value instanceof LazyInteger) {
- value = ((LazyInteger) value).getWritableObject().get();
- } else if (value instanceof LazyLong) {
- value = ((LazyLong) value).getWritableObject().get();
- } else if (value instanceof LazyFloat) {
- value = ((LazyFloat) value).getWritableObject().get();
- } else if (value instanceof LazyDouble) {
- value = ((LazyDouble) value).getWritableObject().get();
- } else if (value instanceof LazyBoolean) {
- boolean boolvalue = ((LazyBoolean) value).getWritableObject()
- .get();
- value = (boolvalue) ? 1 : 0;
- } else if (value instanceof LazyByte) {
- value = (int) ((LazyByte) value).getWritableObject().get();
- } else if (value instanceof LazyShort) {
- value = ((LazyShort) value).getWritableObject().get();
- }
+ if (value instanceof org.apache.hadoop.hive.serde2.lazy.LazyArray) {
+ value = parseLazyArrayToPigArray((org.apache.hadoop.hive.serde2.lazy.LazyArray) value);
+ } else if (value instanceof org.apache.hadoop.hive.serde2.lazy.LazyMap) {
+ value = parseLazyMapToPigMap((org.apache.hadoop.hive.serde2.lazy.LazyMap) value);
+ } else {
+
+ if (value instanceof LazyString) {
+ value = ((LazyString) value).getWritableObject().toString();
+ } else if (value instanceof LazyInteger) {
+ value = ((LazyInteger) value).getWritableObject().get();
+ } else if (value instanceof LazyLong) {
+ value = ((LazyLong) value).getWritableObject().get();
+ } else if (value instanceof LazyFloat) {
+ value = ((LazyFloat) value).getWritableObject().get();
+ } else if (value instanceof LazyDouble) {
+ value = ((LazyDouble) value).getWritableObject().get();
+ } else if (value instanceof LazyBoolean) {
+ boolean boolvalue = ((LazyBoolean) value).getWritableObject()
+ .get();
+ value = (boolvalue) ? 1 : 0;
+ } else if (value instanceof LazyByte) {
+ value = (int) ((LazyByte) value).getWritableObject().get();
+ } else if (value instanceof LazyShort) {
+ value = ((LazyShort) value).getWritableObject().get();
+ }
- }
+ }
- return value;
+ return value;
}
/**
@@ -250,21 +254,21 @@ public class HiveRCSchemaUtil {
* @return InternalMap
*/
public static InternalMap parseLazyMapToPigMap(LazyMap map) {
- InternalMap pigmap = new InternalMap();
+ InternalMap pigmap = new InternalMap();
- Map<Object, Object> javamap = map.getMap();
+ Map<Object, Object> javamap = map.getMap();
- if (javamap != null) {
+ if (javamap != null) {
- // for each item in the map extract the java primitive type
- for (Entry<Object, Object> entry : javamap.entrySet()) {
- pigmap.put(extractPigTypeFromHiveType(entry.getKey()),
- extractPigTypeFromHiveType(entry.getValue()));
- }
+ // for each item in the map extract the java primitive type
+ for (Entry<Object, Object> entry : javamap.entrySet()) {
+ pigmap.put(extractPigTypeFromHiveType(entry.getKey()),
+ extractPigTypeFromHiveType(entry.getValue()));
+ }
- }
+ }
- return pigmap;
+ return pigmap;
}
/**
@@ -275,17 +279,17 @@ public class HiveRCSchemaUtil {
* @return Tuple
*/
public static Tuple parseLazyArrayToPigArray(LazyArray arr) {
- List<Object> list = new ArrayList<Object>();
+ List<Object> list = new ArrayList<Object>();
- // each item inside the LazyArray must be converted to its java
- // primitive type
- List<Object> hivedataList = arr.getList();
+ // each item inside the LazyArray must be converted to its java
+ // primitive type
+ List<Object> hivedataList = arr.getList();
- for (Object item : hivedataList) {
- list.add(extractPigTypeFromHiveType(item));
- }
+ for (Object item : hivedataList) {
+ list.add(extractPigTypeFromHiveType(item));
+ }
- return tupleFactory.newTuple(list);
+ return tupleFactory.newTuple(list);
}
}