You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2010/08/05 23:07:38 UTC

svn commit: r982786 - in /hadoop/pig/trunk/contrib/piggybank/java/src: main/java/org/apache/pig/piggybank/storage/partition/ test/java/org/apache/pig/piggybank/test/storage/

Author: daijy
Date: Thu Aug  5 21:07:37 2010
New Revision: 982786

URL: http://svn.apache.org/viewvc?rev=982786&view=rev
Log:
PIG-1526: HiveColumnarLoader Partitioning Support (missing some code in last check in)

Added:
    hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/partition/
    hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/partition/PathPartitionHelper.java
    hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/partition/PathPartitioner.java
    hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestPathPartitionHelper.java
    hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestPathPartitioner.java

Added: hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/partition/PathPartitionHelper.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/partition/PathPartitionHelper.java?rev=982786&view=auto
==============================================================================
--- hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/partition/PathPartitionHelper.java (added)
+++ hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/partition/PathPartitionHelper.java Thu Aug  5 21:07:37 2010
@@ -0,0 +1,419 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.pig.piggybank.storage.partition;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.Set;
+
+import javax.el.ELContext;
+import javax.el.ELResolver;
+import javax.el.ExpressionFactory;
+import javax.el.FunctionMapper;
+import javax.el.ValueExpression;
+import javax.el.VariableMapper;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.log4j.Logger;
+import org.apache.pig.LoadFunc;
+import org.apache.pig.impl.util.UDFContext;
+
+/**
+ * Implements the logic for:<br/>
+ * <ul>
+ * <li>Listing partition keys and values used in an hdfs path</li>
+ * <li>Filtering of partitions from a pig filter operator expression</li>
+ * </ul>
+ * <p/>
+ * <b>Restrictions</b> <br/>
+ * Function calls are not supported by this partition helper and it can only
+ * handle String values.<br/>
+ * This is normally not a problem given that partition values are part of the
+ * hdfs folder path and is given a<br/>
+ * determined value that would not need parsing by any external processes.<br/>
+ * 
+ * 
+ */
+public class PathPartitionHelper {
+
+    public static final String PARTITION_COLUMNS = PathPartitionHelper.class
+	    + ".partition-columns";
+    public static final String PARITITION_FILTER_EXPRESSION = PathPartitionHelper.class
+	    .getName() + ".partition-filter";
+
+    private static final Logger LOG = Logger
+	    .getLogger(PathPartitionHelper.class);
+
+    transient PathPartitioner pathPartitioner = new PathPartitioner();
+
+    /**
+     * Returns the Partition keys and each key's value for a single location.<br/>
+     * That is the location must be something like
+     * mytable/partition1=a/partition2=b/myfile.<br/>
+     * This method will return a map with [partition1='a', partition2='b']<br/>
+     * The work is delegated to the PathPartitioner class
+     * 
+     * @param location
+     * @return Map of String, String
+     * @throws IOException
+     */
+    public Map<String, String> getPathPartitionKeyValues(String location)
+	    throws IOException {
+	return pathPartitioner.getPathPartitionKeyValues(location);
+    }
+
+    /**
+     * Returns the partition keys for a location.<br/>
+     * The work is delegated to the PathPartitioner class
+     * 
+     * @param location
+     *            String must be the base directory for the partitions
+     * @param conf
+     * @return
+     * @throws IOException
+     */
+    public Set<String> getPartitionKeys(String location, Configuration conf)
+	    throws IOException {
+	return pathPartitioner.getPartitionKeys(location, conf);
+    }
+
+    /**
+     * Sets the PARITITION_FILTER_EXPRESSION property in the UDFContext
+     * identified by the loaderClass.
+     * 
+     * @param partitionFilterExpression
+     * @param loaderClass
+     * @throws IOException
+     */
+    public void setPartitionFilterExpression(String partitionFilterExpression,
+	    Class<? extends LoadFunc> loaderClass, String signature)
+	    throws IOException {
+
+	UDFContext
+		.getUDFContext()
+		.getUDFProperties(loaderClass, new String[] { signature })
+		.setProperty(PARITITION_FILTER_EXPRESSION,
+			partitionFilterExpression);
+
+    }
+
+    /**
+     * Reads the partition keys from the location i.e the base directory
+     * 
+     * @param location
+     *            String must be the base directory for the partitions
+     * @param conf
+     * @param loaderClass
+     * @throws IOException
+     */
+    public void setPartitionKeys(String location, Configuration conf,
+	    Class<? extends LoadFunc> loaderClass, String signature)
+	    throws IOException {
+
+	Set<String> partitionKeys = getPartitionKeys(location, conf);
+
+	if (partitionKeys != null) {
+	    StringBuilder buff = new StringBuilder();
+	    int i = 0;
+	    for (String key : partitionKeys) {
+		if (i++ != 0) {
+		    buff.append(",");
+		}
+
+		buff.append(key);
+	    }
+
+	    UDFContext.getUDFContext()
+		    .getUDFProperties(loaderClass, new String[] { signature })
+		    .setProperty(PARTITION_COLUMNS, buff.toString());
+	}
+
+    }
+
+    /**
+     * This method is called by the FileInputFormat to find the input paths for
+     * which splits should be calculated.<br/>
+     * If applyDateRanges == true: Then the HiveRCDateSplitter is used to apply
+     * filtering on the input files.<br/>
+     * Else the default FileInputFormat listStatus method is used.
+     * 
+     * @param ctx
+     *            JobContext
+     * @param loaderClass
+     *            this is chosen to be a subclass of LoadFunc to maintain some
+     *            consistency.
+     */
+    public List<FileStatus> listStatus(JobContext ctx,
+	    Class<? extends LoadFunc> loaderClass, String signature)
+	    throws IOException {
+
+	Properties properties = UDFContext.getUDFContext().getUDFProperties(
+		loaderClass, new String[] { signature });
+
+	String partitionExpression = properties
+		.getProperty(PARITITION_FILTER_EXPRESSION);
+
+	ExpressionFactory expressionFactory = null;
+
+	if (partitionExpression != null) {
+	    expressionFactory = ExpressionFactory.newInstance();
+	}
+
+	String partitionColumnStr = properties
+		.getProperty(PathPartitionHelper.PARTITION_COLUMNS);
+	String[] partitionKeys = (partitionColumnStr == null) ? null
+		: partitionColumnStr.split(",");
+
+	Path[] inputPaths = FileInputFormat.getInputPaths(ctx);
+
+	List<FileStatus> splitPaths = null;
+
+	if (partitionKeys != null) {
+
+	    splitPaths = new ArrayList<FileStatus>();
+
+	    for (Path inputPath : inputPaths) {
+		// for each input path work recursively through each partition
+		// level to find the rc files
+
+		FileSystem fs = inputPath.getFileSystem(ctx.getConfiguration());
+
+		if (fs.getFileStatus(inputPath).isDir()) {
+		    // assure that we are at the root of the partition tree.
+		    FileStatus fileStatusArr[] = fs.listStatus(inputPath);
+
+		    if (fileStatusArr != null) {
+			for (FileStatus childFileStatus : fileStatusArr) {
+			    getPartitionedFiles(expressionFactory,
+				    partitionExpression, fs, childFileStatus,
+				    0, partitionKeys, splitPaths);
+			}
+		    }
+
+		} else {
+		    splitPaths.add(fs.getFileStatus(inputPath));
+		}
+
+	    }
+
+	    if (splitPaths.size() < 1) {
+		LOG.error("Not split paths where found, please check that the filter logic for the partition keys does not filter out everything ");
+	    }
+
+	}
+
+	return splitPaths;
+    }
+
+    /**
+     * Recursively works through all directories, skipping filtered partitions.
+     * 
+     * @param fs
+     * @param fileStatus
+     * @param partitionLevel
+     * @param partitionKeys
+     * @param splitPaths
+     * @throws IOException
+     */
+    private void getPartitionedFiles(ExpressionFactory expressionFactory,
+	    String partitionExpression, FileSystem fs, FileStatus fileStatus,
+	    int partitionLevel, String[] partitionKeys,
+	    List<FileStatus> splitPaths) throws IOException {
+
+	String partition = (partitionLevel < partitionKeys.length) ? partitionKeys[partitionLevel]
+		: null;
+
+	Path path = fileStatus.getPath();
+
+	// filter out hidden files
+	if (path.getName().startsWith("_")) {
+	    return;
+	}
+
+	// pre filter logic
+	// return if any of the logic is not true
+	if (partition != null) {
+	    if (fileStatus.isDir()) {
+
+		// check that the dir name is equal to that of the partition
+		// name
+		if (!path.getName().startsWith(partition))
+		    return;
+
+	    } else {
+		// else its a file but not at the end of the partition tree so
+		// its ignored.
+		return;
+	    }
+
+	    // this means we are inside the partition so that the path will
+	    // contain all partitions plus its values
+	    // we can apply the partition filter expression here that was passed
+	    // to the HiveColumnarLoader.setPartitionExpression
+	    if (partitionLevel == (partitionKeys.length - 1)
+		    && !evaluatePartitionExpression(expressionFactory,
+			    partitionExpression, path)) {
+
+		LOG.debug("Pruning partition: " + path);
+		return;
+
+	    }
+
+	}
+
+	// after this point we now that the partition is either null
+	// which means we are at the end of the partition tree and all files
+	// sub directories should be included.
+	// or that we are still navigating the partition tree.
+	int nextPartitionLevel = partitionLevel + 1;
+
+	// iterate over directories if fileStatus is a dir.
+	FileStatus[] childStatusArr = null;
+
+	if (fileStatus.isDir()) {
+	    if ((childStatusArr = fs.listStatus(path)) != null) {
+		for (FileStatus childFileStatus : childStatusArr) {
+		    getPartitionedFiles(expressionFactory, partitionExpression,
+			    fs, childFileStatus, nextPartitionLevel,
+			    partitionKeys, splitPaths);
+		}
+	    }
+	} else {
+	    // add file to splitPaths
+	    splitPaths.add(fileStatus);
+	}
+
+    }
+
+    /**
+     * Evaluates the partitionExpression set in the
+     * HiveColumnarLoader.setPartitionExpression. * @
+     * 
+     * @param partitionExpression
+     *            String
+     * @param path
+     *            Path
+     * @return boolean
+     * @throws IOException
+     */
+    private boolean evaluatePartitionExpression(
+	    ExpressionFactory expressionFactory, String partitionExpression,
+	    Path path) throws IOException {
+
+	boolean ret = true;
+
+	if (expressionFactory != null) {
+	    if (!partitionExpression.startsWith("${")) {
+		partitionExpression = "${" + partitionExpression + "}";
+	    }
+
+	    Map<String, String> context = pathPartitioner
+		    .getPathPartitionKeyValues(path.toString());
+
+	    MapVariableMapper mapper = new MapVariableMapper(expressionFactory,
+		    context);
+	    VariableContext varContext = new VariableContext(mapper);
+
+	    ValueExpression evalExpression = expressionFactory
+		    .createValueExpression(varContext, partitionExpression,
+			    Boolean.class);
+
+	    ret = (Boolean) evalExpression.getValue(varContext);
+
+	    LOG.debug("Evaluated: " + partitionExpression + " returned: " + ret);
+
+	}
+
+	return ret;
+    }
+
+    /**
+     * 
+     * ELContext implementation containing the VariableMapper MapVariableMapper
+     * 
+     */
+    class VariableContext extends ELContext {
+
+	VariableMapper variableMapper;
+
+	VariableContext(VariableMapper variableMapper) {
+	    this.variableMapper = variableMapper;
+	}
+
+	@Override
+	public ELResolver getELResolver() {
+	    // TODO Auto-generated method stub
+	    return null;
+	}
+
+	@Override
+	public FunctionMapper getFunctionMapper() {
+	    return null;
+	}
+
+	@Override
+	public VariableMapper getVariableMapper() {
+	    return variableMapper;
+	}
+
+    }
+
+    /**
+     * Implementation for the VariableMapper that takes the values in a Map and
+     * creates ValueExpression objects for each.
+     * 
+     */
+    class MapVariableMapper extends VariableMapper {
+	private Map<String, ValueExpression> valueExpressionMap;
+
+	public MapVariableMapper(ExpressionFactory expressionFactory,
+		Map<String, String> variableMap) {
+
+	    valueExpressionMap = new HashMap<String, ValueExpression>();
+
+	    for (Entry<String, String> entry : variableMap.entrySet()) {
+		ValueExpression valExpr = expressionFactory
+			.createValueExpression(entry.getValue(), String.class);
+		valueExpressionMap.put(entry.getKey(), valExpr);
+	    }
+
+	}
+
+	@Override
+	public ValueExpression resolveVariable(String variableName) {
+	    return valueExpressionMap.get(variableName);
+	}
+
+	@Override
+	public ValueExpression setVariable(String variableName,
+		ValueExpression valueExpression) {
+	    return valueExpressionMap.put(variableName, valueExpression);
+	}
+
+    }
+
+}

Added: hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/partition/PathPartitioner.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/partition/PathPartitioner.java?rev=982786&view=auto
==============================================================================
--- hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/partition/PathPartitioner.java (added)
+++ hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/partition/PathPartitioner.java Thu Aug  5 21:07:37 2010
@@ -0,0 +1,171 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.pig.piggybank.storage.partition;
+
+import java.io.IOException;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * 
+ * Its convenient sometimes to partition logs by date values or other e.g.
+ * country, city etc.<br/>
+ * A daydate partitioned hdfs directory might look something like:<br/>
+ * 
+ * <pre>
+ * /logs/repo/mylog/
+ * 					daydate=2010-01-01
+ * 				    daydate=2010-01-02
+ * </pre>
+ * 
+ * This class accepts a path like /logs/repo/mylog and return a map of the
+ * partition keys
+ */
+public class PathPartitioner {
+
+    /**
+     * Note: this must be the path lowes in the Searches for the key=value pairs
+     * in the path pointer by the location parameter.
+     * 
+     * @param location
+     *            String root path in hdsf e.g. /user/hive/warehouse or
+     *            /logs/repo
+     * @param conf
+     *            Configuration
+     * @return Set of String. The order is maintained as per the directory tree.
+     *         i.e. if /logs/repo/year=2010/month=2010 exists the first item in
+     *         the set will be year and the second month.
+     * @throws IOException
+     */
+    public Map<String, String> getPathPartitionKeyValues(String location)
+	    throws IOException {
+
+	// use LinkedHashSet because order is important here.
+	Map<String, String> partitionKeys = new LinkedHashMap<String, String>();
+
+	String[] pathSplit = location.split("/");
+
+	for (String pathSplitItem : pathSplit) {
+	    parseAndPutKeyValue(pathSplitItem, partitionKeys);
+	}
+
+	return partitionKeys;
+    }
+
+    /**
+     * Searches for the key=value pairs in the path pointer by the location
+     * parameter.
+     * 
+     * @param location
+     *            String root path in hdsf e.g. /user/hive/warehouse or
+     *            /logs/repo
+     * @param conf
+     *            Configuration
+     * @return Set of String. The order is maintained as per the directory tree.
+     *         i.e. if /logs/repo/year=2010/month=2010 exists the first item in
+     *         the set will be year and the second month.
+     * @throws IOException
+     */
+    public Set<String> getPartitionKeys(String location, Configuration conf)
+	    throws IOException {
+
+	// find the hive type partition key=value pairs from the path.
+	// first parse the string alone.
+	Path path = new Path(location);
+	FileSystem fs = path.getFileSystem(conf);
+
+	FileStatus[] fileStatusArr = null;
+
+	// use LinkedHashSet because order is important here.
+	Set<String> partitionKeys = new LinkedHashSet<String>();
+
+	parseAndPutKeyValue(location, partitionKeys);
+
+	while (!((fileStatusArr = fs.listStatus(path)) == null || fs
+		.isFile(path))) {
+	    for (FileStatus fileStatus : fileStatusArr) {
+
+		path = fileStatus.getPath();
+
+		// ignore hidden directories
+		if (fileStatus.getPath().getName().startsWith("_")
+			|| !fileStatus.isDir())
+		    continue;
+
+		parseAndPutKeyValue(path.getName(), partitionKeys);
+		// at the first directory found stop the for loop after parsing
+		// for key value pairs
+		break;
+	    }
+
+	}
+
+	return partitionKeys;
+    }
+
+    private final void parseAndPutKeyValue(String pathName,
+	    Map<String, String> partitionKeys) {
+	String[] keyValue = parsePathKeyValue(pathName);
+	if (keyValue != null) {
+	    partitionKeys.put(keyValue[0], keyValue[1]);
+	}
+
+    }
+
+    private final void parseAndPutKeyValue(String pathName,
+	    Set<String> partitionKeys) {
+	String[] keyValue = parsePathKeyValue(pathName);
+	if (keyValue != null) {
+	    partitionKeys.add(keyValue[0]);
+	}
+
+    }
+
+    /**
+     * Will look for key=value pairs in the path for example:
+     * /user/hive/warehouse/mylogs/year=2010/month=07
+     * 
+     * @param path
+     * @return String[] [0]= key [1] = value
+     */
+    public String[] parsePathKeyValue(String path) {
+	int slashIndex = path.lastIndexOf('/');
+	String parsedPath = path;
+	String[] keyValue = null;
+
+	if (slashIndex > 0) {
+	    parsedPath = path.substring(slashIndex);
+	}
+
+	if (parsedPath.contains("=")) {
+	    String split[] = parsedPath.split("=");
+	    if (split.length == 2) {
+		keyValue = split;
+	    }
+	}
+
+	return keyValue;
+    }
+
+}

Added: hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestPathPartitionHelper.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestPathPartitionHelper.java?rev=982786&view=auto
==============================================================================
--- hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestPathPartitionHelper.java (added)
+++ hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestPathPartitionHelper.java Thu Aug  5 21:07:37 2010
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.pig.piggybank.test.storage;
+
+import java.io.File;
+import java.util.List;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.pig.builtin.PigStorage;
+import org.apache.pig.piggybank.storage.partition.PathPartitionHelper;
+import org.apache.pig.test.Util;
+import org.junit.Test;
+
+/**
+ * 
+ * Tests the PathPartitionHelper can:<br/>
+ * <ul>
+ * <li>Filter path partitioned files based on an expression being true</li>
+ * <li>Filter path partitioned files based on an expression being false</li>
+ * <li>Filter path partitioned files with no expression</li>
+ * </ul>
+ * 
+ */
+public class TestPathPartitionHelper extends TestCase {
+
+    private static Configuration conf = null;
+
+    File baseDir;
+    File partition1;
+    File partition2;
+    File partition3;
+
+    @Test
+    public void testListStatusPartitionFilterNotFound() throws Exception {
+
+	PathPartitionHelper partitionHelper = new PathPartitionHelper();
+
+	Job job = new Job(conf);
+	job.setJobName("TestJob");
+	job.setInputFormatClass(FileInputFormat.class);
+
+	Configuration conf = job.getConfiguration();
+	FileInputFormat.setInputPaths(job, new Path(baseDir.getAbsolutePath()));
+
+	JobContext jobContext = new JobContext(conf, job.getJobID());
+
+	partitionHelper.setPartitionFilterExpression("year < '2010'",
+		PigStorage.class, "1");
+	partitionHelper.setPartitionKeys(baseDir.getAbsolutePath(), conf,
+		PigStorage.class, "1");
+
+	List<FileStatus> files = partitionHelper.listStatus(jobContext,
+		PigStorage.class, "1");
+
+	assertEquals(0, files.size());
+
+    }
+
+    @Test
+    public void testListStatusPartitionFilterFound() throws Exception {
+
+	PathPartitionHelper partitionHelper = new PathPartitionHelper();
+
+	Job job = new Job(conf);
+	job.setJobName("TestJob");
+	job.setInputFormatClass(FileInputFormat.class);
+
+	Configuration conf = job.getConfiguration();
+	FileInputFormat.setInputPaths(job, new Path(baseDir.getAbsolutePath()));
+
+	JobContext jobContext = new JobContext(conf, job.getJobID());
+
+	partitionHelper.setPartitionFilterExpression(
+		"year<='2010' and month=='01' and day>='01'", PigStorage.class, "2");
+	partitionHelper.setPartitionKeys(baseDir.getAbsolutePath(), conf,
+		PigStorage.class, "2");
+
+	List<FileStatus> files = partitionHelper.listStatus(jobContext,
+		PigStorage.class, "2");
+
+	assertNotNull(files);
+	assertEquals(1, files.size());
+
+    }
+
+    @Test
+    public void testListStatus() throws Exception {
+
+	PathPartitionHelper partitionHelper = new PathPartitionHelper();
+
+	Job job = new Job(conf);
+	job.setJobName("TestJob");
+	job.setInputFormatClass(FileInputFormat.class);
+
+	Configuration conf = job.getConfiguration();
+	FileInputFormat.setInputPaths(job, new Path(baseDir.getAbsolutePath()));
+
+	JobContext jobContext = new JobContext(conf, job.getJobID());
+
+	partitionHelper.setPartitionKeys(baseDir.getAbsolutePath(), conf,
+		PigStorage.class, "3");
+
+	List<FileStatus> files = partitionHelper.listStatus(jobContext,
+		PigStorage.class, "3");
+
+	assertNotNull(files);
+	assertEquals(1, files.size());
+
+    }
+
+    @Override
+    protected void tearDown() throws Exception {
+
+	Util.deleteDirectory(baseDir);
+
+    }
+
+    @Override
+    protected void setUp() throws Exception {
+	conf = new Configuration(false);
+
+	baseDir = createDir(null,
+		"testPathPartitioner-testGetKeys-" + System.currentTimeMillis());
+
+	partition1 = createDir(baseDir, "year=2010");
+	partition2 = createDir(partition1, "month=01");
+	partition3 = createDir(partition2, "day=01");
+
+	File file = new File(partition3, "testfile-"
+		+ System.currentTimeMillis());
+	file.createNewFile();
+
+    }
+
+    private File createDir(File parent, String name) {
+	File file = new File(parent, name);
+	file.mkdirs();
+	return file;
+    }
+
+}

Added: hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestPathPartitioner.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestPathPartitioner.java?rev=982786&view=auto
==============================================================================
--- hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestPathPartitioner.java (added)
+++ hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestPathPartitioner.java Thu Aug  5 21:07:37 2010
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.pig.piggybank.test.storage;
+
+import java.io.File;
+import java.util.Map;
+import java.util.Set;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.pig.piggybank.storage.partition.PathPartitioner;
+import org.apache.pig.test.Util;
+import org.junit.Test;
+
+/**
+ * 
+ * Tests that the PathPartitioner can:<br/>
+ * <ul>
+ *   <li>Read keys from a partitioned file path</li>
+ *   <li>Read keys and values from a partitioned file path</li>
+ * </ul>
+ *
+ */
+public class TestPathPartitioner extends TestCase {
+
+    private static Configuration conf = null;
+
+    File baseDir;
+    File partition1;
+    File partition2;
+    File partition3;
+
+    @Override
+    protected void tearDown() throws Exception {
+
+	Util.deleteDirectory(baseDir);
+
+    }
+
+    @Override
+    protected void setUp() throws Exception {
+	conf = new Configuration();
+
+	baseDir = createDir(null,
+		"testPathPartitioner-testGetKeys-" + System.currentTimeMillis());
+
+	partition1 = createDir(baseDir, "year=2010");
+	partition2 = createDir(partition1, "month=01");
+	partition3 = createDir(partition2, "day=01");
+
+	File file = new File(partition3, "testfile-"
+		+ System.currentTimeMillis());
+	file.createNewFile();
+
+    }
+
+    @Test
+    public void testGetKeyValues() throws Exception {
+	PathPartitioner partitioner = new PathPartitioner();
+
+	Map<String, String> map = partitioner
+		.getPathPartitionKeyValues(partition3.getAbsolutePath());
+
+	String[] keys = map.keySet().toArray(new String[] {});
+
+	assertEquals("2010", map.get(keys[0]));
+	assertEquals("01", map.get(keys[1]));
+	assertEquals("01", map.get(keys[2]));
+
+    }
+
+    @Test
+    public void testGetKeys() throws Exception {
+
+	PathPartitioner pathPartitioner = new PathPartitioner();
+	Set<String> keys = pathPartitioner.getPartitionKeys(
+		baseDir.getAbsolutePath(), conf);
+
+	assertNotNull(keys);
+	assertEquals(3, keys.size());
+
+	String[] keyArr = keys.toArray(new String[] {});
+
+	assertEquals("year", keyArr[0]);
+	assertEquals("month", keyArr[1]);
+	assertEquals("day", keyArr[2]);
+
+    }
+
+    private File createDir(File parent, String name) {
+	File file = new File(parent, name);
+	file.mkdirs();
+	return file;
+    }
+
+}