You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2010/08/05 23:07:38 UTC
svn commit: r982786 - in /hadoop/pig/trunk/contrib/piggybank/java/src:
main/java/org/apache/pig/piggybank/storage/partition/
test/java/org/apache/pig/piggybank/test/storage/
Author: daijy
Date: Thu Aug 5 21:07:37 2010
New Revision: 982786
URL: http://svn.apache.org/viewvc?rev=982786&view=rev
Log:
PIG-1526: HiveColumnarLoader Partitioning Support (missing some code in last check in)
Added:
hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/partition/
hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/partition/PathPartitionHelper.java
hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/partition/PathPartitioner.java
hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestPathPartitionHelper.java
hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestPathPartitioner.java
Added: hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/partition/PathPartitionHelper.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/partition/PathPartitionHelper.java?rev=982786&view=auto
==============================================================================
--- hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/partition/PathPartitionHelper.java (added)
+++ hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/partition/PathPartitionHelper.java Thu Aug 5 21:07:37 2010
@@ -0,0 +1,419 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.pig.piggybank.storage.partition;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.Set;
+
+import javax.el.ELContext;
+import javax.el.ELResolver;
+import javax.el.ExpressionFactory;
+import javax.el.FunctionMapper;
+import javax.el.ValueExpression;
+import javax.el.VariableMapper;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.log4j.Logger;
+import org.apache.pig.LoadFunc;
+import org.apache.pig.impl.util.UDFContext;
+
+/**
+ * Implements the logic for:<br/>
+ * <ul>
+ * <li>Listing partition keys and values used in an hdfs path</li>
+ * <li>Filtering of partitions from a pig filter operator expression</li>
+ * </ul>
+ * <p/>
+ * <b>Restrictions</b> <br/>
+ * Function calls are not supported by this partition helper and it can only
+ * handle String values.<br/>
+ * This is normally not a problem given that partition values are part of the
+ * hdfs folder path and is given a<br/>
+ * determined value that would not need parsing by any external processes.<br/>
+ *
+ *
+ */
+public class PathPartitionHelper {
+
+ public static final String PARTITION_COLUMNS = PathPartitionHelper.class
+ + ".partition-columns";
+ public static final String PARITITION_FILTER_EXPRESSION = PathPartitionHelper.class
+ .getName() + ".partition-filter";
+
+ private static final Logger LOG = Logger
+ .getLogger(PathPartitionHelper.class);
+
+ transient PathPartitioner pathPartitioner = new PathPartitioner();
+
+ /**
+ * Returns the Partition keys and each key's value for a single location.<br/>
+ * That is the location must be something like
+ * mytable/partition1=a/partition2=b/myfile.<br/>
+ * This method will return a map with [partition1='a', partition2='b']<br/>
+ * The work is delegated to the PathPartitioner class
+ *
+ * @param location
+ * @return Map of String, String
+ * @throws IOException
+ */
+ public Map<String, String> getPathPartitionKeyValues(String location)
+ throws IOException {
+ return pathPartitioner.getPathPartitionKeyValues(location);
+ }
+
+ /**
+ * Returns the partition keys for a location.<br/>
+ * The work is delegated to the PathPartitioner class
+ *
+ * @param location
+ * String must be the base directory for the partitions
+ * @param conf
+ * @return
+ * @throws IOException
+ */
+ public Set<String> getPartitionKeys(String location, Configuration conf)
+ throws IOException {
+ return pathPartitioner.getPartitionKeys(location, conf);
+ }
+
+ /**
+ * Sets the PARITITION_FILTER_EXPRESSION property in the UDFContext
+ * identified by the loaderClass.
+ *
+ * @param partitionFilterExpression
+ * @param loaderClass
+ * @throws IOException
+ */
+ public void setPartitionFilterExpression(String partitionFilterExpression,
+ Class<? extends LoadFunc> loaderClass, String signature)
+ throws IOException {
+
+ UDFContext
+ .getUDFContext()
+ .getUDFProperties(loaderClass, new String[] { signature })
+ .setProperty(PARITITION_FILTER_EXPRESSION,
+ partitionFilterExpression);
+
+ }
+
+ /**
+ * Reads the partition keys from the location i.e the base directory
+ *
+ * @param location
+ * String must be the base directory for the partitions
+ * @param conf
+ * @param loaderClass
+ * @throws IOException
+ */
+ public void setPartitionKeys(String location, Configuration conf,
+ Class<? extends LoadFunc> loaderClass, String signature)
+ throws IOException {
+
+ Set<String> partitionKeys = getPartitionKeys(location, conf);
+
+ if (partitionKeys != null) {
+ StringBuilder buff = new StringBuilder();
+ int i = 0;
+ for (String key : partitionKeys) {
+ if (i++ != 0) {
+ buff.append(",");
+ }
+
+ buff.append(key);
+ }
+
+ UDFContext.getUDFContext()
+ .getUDFProperties(loaderClass, new String[] { signature })
+ .setProperty(PARTITION_COLUMNS, buff.toString());
+ }
+
+ }
+
+ /**
+ * This method is called by the FileInputFormat to find the input paths for
+ * which splits should be calculated.<br/>
+ * If applyDateRanges == true: Then the HiveRCDateSplitter is used to apply
+ * filtering on the input files.<br/>
+ * Else the default FileInputFormat listStatus method is used.
+ *
+ * @param ctx
+ * JobContext
+ * @param loaderClass
+ * this is chosen to be a subclass of LoadFunc to maintain some
+ * consistency.
+ */
+ public List<FileStatus> listStatus(JobContext ctx,
+ Class<? extends LoadFunc> loaderClass, String signature)
+ throws IOException {
+
+ Properties properties = UDFContext.getUDFContext().getUDFProperties(
+ loaderClass, new String[] { signature });
+
+ String partitionExpression = properties
+ .getProperty(PARITITION_FILTER_EXPRESSION);
+
+ ExpressionFactory expressionFactory = null;
+
+ if (partitionExpression != null) {
+ expressionFactory = ExpressionFactory.newInstance();
+ }
+
+ String partitionColumnStr = properties
+ .getProperty(PathPartitionHelper.PARTITION_COLUMNS);
+ String[] partitionKeys = (partitionColumnStr == null) ? null
+ : partitionColumnStr.split(",");
+
+ Path[] inputPaths = FileInputFormat.getInputPaths(ctx);
+
+ List<FileStatus> splitPaths = null;
+
+ if (partitionKeys != null) {
+
+ splitPaths = new ArrayList<FileStatus>();
+
+ for (Path inputPath : inputPaths) {
+ // for each input path work recursively through each partition
+ // level to find the rc files
+
+ FileSystem fs = inputPath.getFileSystem(ctx.getConfiguration());
+
+ if (fs.getFileStatus(inputPath).isDir()) {
+ // assure that we are at the root of the partition tree.
+ FileStatus fileStatusArr[] = fs.listStatus(inputPath);
+
+ if (fileStatusArr != null) {
+ for (FileStatus childFileStatus : fileStatusArr) {
+ getPartitionedFiles(expressionFactory,
+ partitionExpression, fs, childFileStatus,
+ 0, partitionKeys, splitPaths);
+ }
+ }
+
+ } else {
+ splitPaths.add(fs.getFileStatus(inputPath));
+ }
+
+ }
+
+ if (splitPaths.size() < 1) {
+ LOG.error("Not split paths where found, please check that the filter logic for the partition keys does not filter out everything ");
+ }
+
+ }
+
+ return splitPaths;
+ }
+
+ /**
+ * Recursively works through all directories, skipping filtered partitions.
+ *
+ * @param fs
+ * @param fileStatus
+ * @param partitionLevel
+ * @param partitionKeys
+ * @param splitPaths
+ * @throws IOException
+ */
+ private void getPartitionedFiles(ExpressionFactory expressionFactory,
+ String partitionExpression, FileSystem fs, FileStatus fileStatus,
+ int partitionLevel, String[] partitionKeys,
+ List<FileStatus> splitPaths) throws IOException {
+
+ String partition = (partitionLevel < partitionKeys.length) ? partitionKeys[partitionLevel]
+ : null;
+
+ Path path = fileStatus.getPath();
+
+ // filter out hidden files
+ if (path.getName().startsWith("_")) {
+ return;
+ }
+
+ // pre filter logic
+ // return if any of the logic is not true
+ if (partition != null) {
+ if (fileStatus.isDir()) {
+
+ // check that the dir name is equal to that of the partition
+ // name
+ if (!path.getName().startsWith(partition))
+ return;
+
+ } else {
+ // else its a file but not at the end of the partition tree so
+ // its ignored.
+ return;
+ }
+
+ // this means we are inside the partition so that the path will
+ // contain all partitions plus its values
+ // we can apply the partition filter expression here that was passed
+ // to the HiveColumnarLoader.setPartitionExpression
+ if (partitionLevel == (partitionKeys.length - 1)
+ && !evaluatePartitionExpression(expressionFactory,
+ partitionExpression, path)) {
+
+ LOG.debug("Pruning partition: " + path);
+ return;
+
+ }
+
+ }
+
+ // after this point we now that the partition is either null
+ // which means we are at the end of the partition tree and all files
+ // sub directories should be included.
+ // or that we are still navigating the partition tree.
+ int nextPartitionLevel = partitionLevel + 1;
+
+ // iterate over directories if fileStatus is a dir.
+ FileStatus[] childStatusArr = null;
+
+ if (fileStatus.isDir()) {
+ if ((childStatusArr = fs.listStatus(path)) != null) {
+ for (FileStatus childFileStatus : childStatusArr) {
+ getPartitionedFiles(expressionFactory, partitionExpression,
+ fs, childFileStatus, nextPartitionLevel,
+ partitionKeys, splitPaths);
+ }
+ }
+ } else {
+ // add file to splitPaths
+ splitPaths.add(fileStatus);
+ }
+
+ }
+
+ /**
+ * Evaluates the partitionExpression set in the
+ * HiveColumnarLoader.setPartitionExpression. * @
+ *
+ * @param partitionExpression
+ * String
+ * @param path
+ * Path
+ * @return boolean
+ * @throws IOException
+ */
+ private boolean evaluatePartitionExpression(
+ ExpressionFactory expressionFactory, String partitionExpression,
+ Path path) throws IOException {
+
+ boolean ret = true;
+
+ if (expressionFactory != null) {
+ if (!partitionExpression.startsWith("${")) {
+ partitionExpression = "${" + partitionExpression + "}";
+ }
+
+ Map<String, String> context = pathPartitioner
+ .getPathPartitionKeyValues(path.toString());
+
+ MapVariableMapper mapper = new MapVariableMapper(expressionFactory,
+ context);
+ VariableContext varContext = new VariableContext(mapper);
+
+ ValueExpression evalExpression = expressionFactory
+ .createValueExpression(varContext, partitionExpression,
+ Boolean.class);
+
+ ret = (Boolean) evalExpression.getValue(varContext);
+
+ LOG.debug("Evaluated: " + partitionExpression + " returned: " + ret);
+
+ }
+
+ return ret;
+ }
+
+ /**
+ *
+ * ELContext implementation containing the VariableMapper MapVariableMapper
+ *
+ */
+ class VariableContext extends ELContext {
+
+ VariableMapper variableMapper;
+
+ VariableContext(VariableMapper variableMapper) {
+ this.variableMapper = variableMapper;
+ }
+
+ @Override
+ public ELResolver getELResolver() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public FunctionMapper getFunctionMapper() {
+ return null;
+ }
+
+ @Override
+ public VariableMapper getVariableMapper() {
+ return variableMapper;
+ }
+
+ }
+
+ /**
+ * Implementation for the VariableMapper that takes the values in a Map and
+ * creates ValueExpression objects for each.
+ *
+ */
+ class MapVariableMapper extends VariableMapper {
+ private Map<String, ValueExpression> valueExpressionMap;
+
+ public MapVariableMapper(ExpressionFactory expressionFactory,
+ Map<String, String> variableMap) {
+
+ valueExpressionMap = new HashMap<String, ValueExpression>();
+
+ for (Entry<String, String> entry : variableMap.entrySet()) {
+ ValueExpression valExpr = expressionFactory
+ .createValueExpression(entry.getValue(), String.class);
+ valueExpressionMap.put(entry.getKey(), valExpr);
+ }
+
+ }
+
+ @Override
+ public ValueExpression resolveVariable(String variableName) {
+ return valueExpressionMap.get(variableName);
+ }
+
+ @Override
+ public ValueExpression setVariable(String variableName,
+ ValueExpression valueExpression) {
+ return valueExpressionMap.put(variableName, valueExpression);
+ }
+
+ }
+
+}
Added: hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/partition/PathPartitioner.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/partition/PathPartitioner.java?rev=982786&view=auto
==============================================================================
--- hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/partition/PathPartitioner.java (added)
+++ hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/partition/PathPartitioner.java Thu Aug 5 21:07:37 2010
@@ -0,0 +1,171 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.pig.piggybank.storage.partition;
+
+import java.io.IOException;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+/**
+ *
+ * Its convenient sometimes to partition logs by date values or other e.g.
+ * country, city etc.<br/>
+ * A daydate partitioned hdfs directory might look something like:<br/>
+ *
+ * <pre>
+ * /logs/repo/mylog/
+ * daydate=2010-01-01
+ * daydate=2010-01-02
+ * </pre>
+ *
+ * This class accepts a path like /logs/repo/mylog and return a map of the
+ * partition keys
+ */
+public class PathPartitioner {
+
+ /**
+ * Note: this must be the path lowes in the Searches for the key=value pairs
+ * in the path pointer by the location parameter.
+ *
+ * @param location
+ * String root path in hdsf e.g. /user/hive/warehouse or
+ * /logs/repo
+ * @param conf
+ * Configuration
+ * @return Set of String. The order is maintained as per the directory tree.
+ * i.e. if /logs/repo/year=2010/month=2010 exists the first item in
+ * the set will be year and the second month.
+ * @throws IOException
+ */
+ public Map<String, String> getPathPartitionKeyValues(String location)
+ throws IOException {
+
+ // use LinkedHashSet because order is important here.
+ Map<String, String> partitionKeys = new LinkedHashMap<String, String>();
+
+ String[] pathSplit = location.split("/");
+
+ for (String pathSplitItem : pathSplit) {
+ parseAndPutKeyValue(pathSplitItem, partitionKeys);
+ }
+
+ return partitionKeys;
+ }
+
+ /**
+ * Searches for the key=value pairs in the path pointer by the location
+ * parameter.
+ *
+ * @param location
+ * String root path in hdsf e.g. /user/hive/warehouse or
+ * /logs/repo
+ * @param conf
+ * Configuration
+ * @return Set of String. The order is maintained as per the directory tree.
+ * i.e. if /logs/repo/year=2010/month=2010 exists the first item in
+ * the set will be year and the second month.
+ * @throws IOException
+ */
+ public Set<String> getPartitionKeys(String location, Configuration conf)
+ throws IOException {
+
+ // find the hive type partition key=value pairs from the path.
+ // first parse the string alone.
+ Path path = new Path(location);
+ FileSystem fs = path.getFileSystem(conf);
+
+ FileStatus[] fileStatusArr = null;
+
+ // use LinkedHashSet because order is important here.
+ Set<String> partitionKeys = new LinkedHashSet<String>();
+
+ parseAndPutKeyValue(location, partitionKeys);
+
+ while (!((fileStatusArr = fs.listStatus(path)) == null || fs
+ .isFile(path))) {
+ for (FileStatus fileStatus : fileStatusArr) {
+
+ path = fileStatus.getPath();
+
+ // ignore hidden directories
+ if (fileStatus.getPath().getName().startsWith("_")
+ || !fileStatus.isDir())
+ continue;
+
+ parseAndPutKeyValue(path.getName(), partitionKeys);
+ // at the first directory found stop the for loop after parsing
+ // for key value pairs
+ break;
+ }
+
+ }
+
+ return partitionKeys;
+ }
+
+ private final void parseAndPutKeyValue(String pathName,
+ Map<String, String> partitionKeys) {
+ String[] keyValue = parsePathKeyValue(pathName);
+ if (keyValue != null) {
+ partitionKeys.put(keyValue[0], keyValue[1]);
+ }
+
+ }
+
+ private final void parseAndPutKeyValue(String pathName,
+ Set<String> partitionKeys) {
+ String[] keyValue = parsePathKeyValue(pathName);
+ if (keyValue != null) {
+ partitionKeys.add(keyValue[0]);
+ }
+
+ }
+
+ /**
+ * Will look for key=value pairs in the path for example:
+ * /user/hive/warehouse/mylogs/year=2010/month=07
+ *
+ * @param path
+ * @return String[] [0]= key [1] = value
+ */
+ public String[] parsePathKeyValue(String path) {
+ int slashIndex = path.lastIndexOf('/');
+ String parsedPath = path;
+ String[] keyValue = null;
+
+ if (slashIndex > 0) {
+ parsedPath = path.substring(slashIndex);
+ }
+
+ if (parsedPath.contains("=")) {
+ String split[] = parsedPath.split("=");
+ if (split.length == 2) {
+ keyValue = split;
+ }
+ }
+
+ return keyValue;
+ }
+
+}
Added: hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestPathPartitionHelper.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestPathPartitionHelper.java?rev=982786&view=auto
==============================================================================
--- hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestPathPartitionHelper.java (added)
+++ hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestPathPartitionHelper.java Thu Aug 5 21:07:37 2010
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.pig.piggybank.test.storage;
+
+import java.io.File;
+import java.util.List;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.pig.builtin.PigStorage;
+import org.apache.pig.piggybank.storage.partition.PathPartitionHelper;
+import org.apache.pig.test.Util;
+import org.junit.Test;
+
+/**
+ *
+ * Tests the PathPartitionHelper can:<br/>
+ * <ul>
+ * <li>Filter path partitioned files based on an expression being true</li>
+ * <li>Filter path partitioned files based on an expression being false</li>
+ * <li>Filter path partitioned files with no expression</li>
+ * </ul>
+ *
+ */
+public class TestPathPartitionHelper extends TestCase {
+
+ private static Configuration conf = null;
+
+ File baseDir;
+ File partition1;
+ File partition2;
+ File partition3;
+
+ @Test
+ public void testListStatusPartitionFilterNotFound() throws Exception {
+
+ PathPartitionHelper partitionHelper = new PathPartitionHelper();
+
+ Job job = new Job(conf);
+ job.setJobName("TestJob");
+ job.setInputFormatClass(FileInputFormat.class);
+
+ Configuration conf = job.getConfiguration();
+ FileInputFormat.setInputPaths(job, new Path(baseDir.getAbsolutePath()));
+
+ JobContext jobContext = new JobContext(conf, job.getJobID());
+
+ partitionHelper.setPartitionFilterExpression("year < '2010'",
+ PigStorage.class, "1");
+ partitionHelper.setPartitionKeys(baseDir.getAbsolutePath(), conf,
+ PigStorage.class, "1");
+
+ List<FileStatus> files = partitionHelper.listStatus(jobContext,
+ PigStorage.class, "1");
+
+ assertEquals(0, files.size());
+
+ }
+
+ @Test
+ public void testListStatusPartitionFilterFound() throws Exception {
+
+ PathPartitionHelper partitionHelper = new PathPartitionHelper();
+
+ Job job = new Job(conf);
+ job.setJobName("TestJob");
+ job.setInputFormatClass(FileInputFormat.class);
+
+ Configuration conf = job.getConfiguration();
+ FileInputFormat.setInputPaths(job, new Path(baseDir.getAbsolutePath()));
+
+ JobContext jobContext = new JobContext(conf, job.getJobID());
+
+ partitionHelper.setPartitionFilterExpression(
+ "year<='2010' and month=='01' and day>='01'", PigStorage.class, "2");
+ partitionHelper.setPartitionKeys(baseDir.getAbsolutePath(), conf,
+ PigStorage.class, "2");
+
+ List<FileStatus> files = partitionHelper.listStatus(jobContext,
+ PigStorage.class, "2");
+
+ assertNotNull(files);
+ assertEquals(1, files.size());
+
+ }
+
+ @Test
+ public void testListStatus() throws Exception {
+
+ PathPartitionHelper partitionHelper = new PathPartitionHelper();
+
+ Job job = new Job(conf);
+ job.setJobName("TestJob");
+ job.setInputFormatClass(FileInputFormat.class);
+
+ Configuration conf = job.getConfiguration();
+ FileInputFormat.setInputPaths(job, new Path(baseDir.getAbsolutePath()));
+
+ JobContext jobContext = new JobContext(conf, job.getJobID());
+
+ partitionHelper.setPartitionKeys(baseDir.getAbsolutePath(), conf,
+ PigStorage.class, "3");
+
+ List<FileStatus> files = partitionHelper.listStatus(jobContext,
+ PigStorage.class, "3");
+
+ assertNotNull(files);
+ assertEquals(1, files.size());
+
+ }
+
+ @Override
+ protected void tearDown() throws Exception {
+
+ Util.deleteDirectory(baseDir);
+
+ }
+
+ @Override
+ protected void setUp() throws Exception {
+ conf = new Configuration(false);
+
+ baseDir = createDir(null,
+ "testPathPartitioner-testGetKeys-" + System.currentTimeMillis());
+
+ partition1 = createDir(baseDir, "year=2010");
+ partition2 = createDir(partition1, "month=01");
+ partition3 = createDir(partition2, "day=01");
+
+ File file = new File(partition3, "testfile-"
+ + System.currentTimeMillis());
+ file.createNewFile();
+
+ }
+
+ private File createDir(File parent, String name) {
+ File file = new File(parent, name);
+ file.mkdirs();
+ return file;
+ }
+
+}
Added: hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestPathPartitioner.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestPathPartitioner.java?rev=982786&view=auto
==============================================================================
--- hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestPathPartitioner.java (added)
+++ hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestPathPartitioner.java Thu Aug 5 21:07:37 2010
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.pig.piggybank.test.storage;
+
+import java.io.File;
+import java.util.Map;
+import java.util.Set;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.pig.piggybank.storage.partition.PathPartitioner;
+import org.apache.pig.test.Util;
+import org.junit.Test;
+
+/**
+ *
+ * Tests that the PathPartitioner can:<br/>
+ * <ul>
+ * <li>Read keys from a partitioned file path</li>
+ * <li>Read keys and values from a partitioned file path</li>
+ * </ul>
+ *
+ */
+public class TestPathPartitioner extends TestCase {
+
+ private static Configuration conf = null;
+
+ File baseDir;
+ File partition1;
+ File partition2;
+ File partition3;
+
+ @Override
+ protected void tearDown() throws Exception {
+
+ Util.deleteDirectory(baseDir);
+
+ }
+
+ @Override
+ protected void setUp() throws Exception {
+ conf = new Configuration();
+
+ baseDir = createDir(null,
+ "testPathPartitioner-testGetKeys-" + System.currentTimeMillis());
+
+ partition1 = createDir(baseDir, "year=2010");
+ partition2 = createDir(partition1, "month=01");
+ partition3 = createDir(partition2, "day=01");
+
+ File file = new File(partition3, "testfile-"
+ + System.currentTimeMillis());
+ file.createNewFile();
+
+ }
+
+ @Test
+ public void testGetKeyValues() throws Exception {
+ PathPartitioner partitioner = new PathPartitioner();
+
+ Map<String, String> map = partitioner
+ .getPathPartitionKeyValues(partition3.getAbsolutePath());
+
+ String[] keys = map.keySet().toArray(new String[] {});
+
+ assertEquals("2010", map.get(keys[0]));
+ assertEquals("01", map.get(keys[1]));
+ assertEquals("01", map.get(keys[2]));
+
+ }
+
+ @Test
+ public void testGetKeys() throws Exception {
+
+ PathPartitioner pathPartitioner = new PathPartitioner();
+ Set<String> keys = pathPartitioner.getPartitionKeys(
+ baseDir.getAbsolutePath(), conf);
+
+ assertNotNull(keys);
+ assertEquals(3, keys.size());
+
+ String[] keyArr = keys.toArray(new String[] {});
+
+ assertEquals("year", keyArr[0]);
+ assertEquals("month", keyArr[1]);
+ assertEquals("day", keyArr[2]);
+
+ }
+
+ private File createDir(File parent, String name) {
+ File file = new File(parent, name);
+ file.mkdirs();
+ return file;
+ }
+
+}