You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2010/08/09 07:27:40 UTC

svn commit: r983524 - in /hadoop/pig/trunk/contrib/piggybank/java: ./ src/test/java/org/apache/pig/piggybank/test/storage/

Author: daijy
Date: Mon Aug  9 05:27:39 2010
New Revision: 983524

URL: http://svn.apache.org/viewvc?rev=983524&view=rev
Log:
PIG-1526: HiveColumnarLoader Partitioning Support (fix piggybank unit test failure)

Added:
    hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestHiveColumnarLoader.java
    hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestPathPartitionHelper.java
    hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestPathPartitioner.java
Modified:
    hadoop/pig/trunk/contrib/piggybank/java/build.xml

Modified: hadoop/pig/trunk/contrib/piggybank/java/build.xml
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/piggybank/java/build.xml?rev=983524&r1=983523&r2=983524&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/piggybank/java/build.xml (original)
+++ hadoop/pig/trunk/contrib/piggybank/java/build.xml Mon Aug  9 05:27:39 2010
@@ -116,7 +116,7 @@
         <echo> *** Running UDF tests ***</echo>
         <delete dir="${test.logs}"/>
         <mkdir dir="${test.logs}"/>
-        <junit printsummary="yes" haltonfailure="no" fork="yes" maxmemory="256m" dir="${basedir}" timeout="${test.timeout}" errorProperty="tests.failed" failureProperty="tests.failed">        
+        <junit printsummary="yes" haltonfailure="no" fork="yes" maxmemory="512m" dir="${basedir}" timeout="${test.timeout}" errorProperty="tests.failed" failureProperty="tests.failed">        
             <sysproperty key="hadoop.log.dir" value="${test.logs}"/>
             <classpath refid="test.classpath"/>
             <formatter type="${test.junit.output.format}" />

Added: hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestHiveColumnarLoader.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestHiveColumnarLoader.java?rev=983524&view=auto
==============================================================================
--- hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestHiveColumnarLoader.java (added)
+++ hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestHiveColumnarLoader.java Mon Aug  9 05:27:39 2010
@@ -0,0 +1,575 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.pig.piggybank.test.storage;
+
+import java.io.File;
+import java.io.IOException;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Random;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.io.RCFile;
+import org.apache.hadoop.hive.ql.io.RCFileOutputFormat;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable;
+import org.apache.hadoop.hive.serde2.columnar.BytesRefWritable;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.DefaultCodec;
+import org.apache.pig.ExecType;
+import org.apache.pig.FuncSpec;
+import org.apache.pig.PigServer;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.test.Util;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * 
+ * Tests that the HiveColumnLoader can:
+ * <ul>
+ * <li>Load files without partitioning</li>
+ * <li>Load files with partitioning and dates defined in constructor, or as a
+ * filter</li>
+ * <li>Load files using pig's push down loader capabilities.</li>
+ * </ul>
+ * 
+ */
+public class TestHiveColumnarLoader extends TestCase {
+
+    static Configuration conf = null;
+
+    // for single non partitioned file testing
+    static File simpleDataFile = null;
+    // for multiple non partitioned file testing
+    static File simpleDataDir = null;
+
+    static File datePartitionedDir = null;
+    static File yearMonthDayHourPartitionedDir = null;
+
+    // used for cleanup
+    static List<String> datePartitionedRCFiles;
+    static List<String> datePartitionedDirs;
+
+    static private FileSystem fs;
+
+    static int columnMaxSize = 30;
+
+    static int columnCount = 3;
+
+    static int simpleDirFileCount = 3;
+    static int simpleRowCount = 10;
+
+    static String endingDate = null;
+    static String startingDate = null;
+    static DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd");
+    static Calendar calendar = null;
+    static int datePartitionedRowCount;
+
+    private static Calendar yearMonthDayHourcalendar;
+
+    @Override
+    public synchronized void setUp() throws Exception {
+
+	conf = new Configuration();
+
+	fs = LocalFileSystem.getLocal(conf);
+
+	produceSimpleData();
+
+	produceDatePartitionedData();
+
+	produceYearMonthDayHourPartitionedData();
+
+    }
+
+    @Override
+    public void tearDown() {
+
+	Util.deleteDirectory(datePartitionedDir);
+
+	Util.deleteDirectory(yearMonthDayHourPartitionedDir);
+
+	Util.deleteDirectory(simpleDataDir);
+
+	simpleDataFile.delete();
+
+    }
+
+    @Test
+    public void testReadingSingleFileNoProjections() throws IOException {
+	String funcSpecString = "org.apache.pig.piggybank.storage.HiveColumnarLoader('f1 string,f2 string,f3 string')";
+
+	String singlePartitionedFile = simpleDataFile.getAbsolutePath();
+
+	PigServer server = new PigServer(ExecType.LOCAL);
+	server.setBatchOn();
+	server.registerFunction(
+		"org.apache.pig.piggybank.storage.HiveColumnarLoader",
+		new FuncSpec(funcSpecString));
+
+	server.registerQuery("a = LOAD '" + singlePartitionedFile + "' using "
+		+ funcSpecString + ";");
+
+	Iterator<Tuple> result = server.openIterator("a");
+
+	int count = 0;
+	Tuple t = null;
+	while ((t = result.next()) != null) {
+	    assertEquals(3, t.size());
+	    assertEquals(DataType.CHARARRAY, t.getType(0));
+	    count++;
+	}
+
+	Assert.assertEquals(simpleRowCount, count);
+    }
+    
+    @Test
+    public void testReadingMultipleNonPartitionedFiles() throws IOException {
+	String funcSpecString = "org.apache.pig.piggybank.storage.HiveColumnarLoader('f1 string,f2 string,f3 string')";
+
+	String singlePartitionedDir = simpleDataDir.getAbsolutePath();
+
+	PigServer server = new PigServer(ExecType.LOCAL);
+	server.setBatchOn();
+	server.registerFunction(
+		"org.apache.pig.piggybank.storage.HiveColumnarLoader",
+		new FuncSpec(funcSpecString));
+
+	server.registerQuery("a = LOAD '" + singlePartitionedDir + "' using "
+		+ funcSpecString + ";");
+
+	server.registerQuery("b = foreach a generate f1;");
+
+	Iterator<Tuple> result = server.openIterator("b");
+
+	int count = 0;
+	Tuple t = null;
+	while ((t = result.next()) != null) {
+	    assertEquals(1, t.size());
+	    assertEquals(DataType.CHARARRAY, t.getType(0));
+	    count++;
+	}
+
+	Assert.assertEquals(simpleDirFileCount * simpleRowCount, count);
+    }
+
+    @Test
+    public void testReadingSingleFile() throws IOException {
+	String funcSpecString = "org.apache.pig.piggybank.storage.HiveColumnarLoader('f1 string,f2 string,f3 string')";
+
+	String singlePartitionedFile = simpleDataFile.getAbsolutePath();
+
+	PigServer server = new PigServer(ExecType.LOCAL);
+	server.setBatchOn();
+	server.registerFunction(
+		"org.apache.pig.piggybank.storage.HiveColumnarLoader",
+		new FuncSpec(funcSpecString));
+
+	server.registerQuery("a = LOAD '" + singlePartitionedFile + "' using "
+		+ funcSpecString + ";");
+
+	server.registerQuery("b = foreach a generate f1;");
+
+	Iterator<Tuple> result = server.openIterator("b");
+
+	int count = 0;
+	Tuple t = null;
+	while ((t = result.next()) != null) {
+	    assertEquals(1, t.size());
+	    assertEquals(DataType.CHARARRAY, t.getType(0));
+	    count++;
+	}
+
+	Assert.assertEquals(simpleRowCount, count);
+    }
+
+    @Test
+    public void testYearMonthDayHourPartitionedFilesWithProjection()
+	    throws IOException {
+	int count = 0;
+
+	String funcSpecString = "org.apache.pig.piggybank.storage.HiveColumnarLoader('f1 string,f2 string,f3 string')";
+
+	PigServer server = new PigServer(ExecType.LOCAL);
+	server.setBatchOn();
+	server.registerFunction(
+		"org.apache.pig.piggybank.storage.HiveColumnarLoader",
+		new FuncSpec(funcSpecString));
+
+	server.registerQuery("a = LOAD '"
+		+ yearMonthDayHourPartitionedDir.getAbsolutePath() + "' using "
+		+ funcSpecString + ";");
+	server.registerQuery("f = FILTER a by year=='2010';");
+	server.registerQuery("b = foreach f generate f1;");
+
+	Iterator<Tuple> result = server.openIterator("b");
+
+	Tuple t = null;
+	while ((t = result.next()) != null) {
+	    assertEquals(1, t.size());
+	    assertEquals(DataType.CHARARRAY, t.getType(0));
+	    count++;
+	}
+
+	Assert.assertEquals(240, count);
+
+    }
+
+    @Test
+    public void testYearMonthDayHourPartitionedFilesWithProjectionAndPartitionColumns()
+	    throws IOException {
+	int count = 0;
+
+	String funcSpecString = "org.apache.pig.piggybank.storage.HiveColumnarLoader('f1 string,f2 string,f3 string')";
+
+	PigServer server = new PigServer(ExecType.LOCAL);
+	server.setBatchOn();
+	server.registerFunction(
+		"org.apache.pig.piggybank.storage.HiveColumnarLoader",
+		new FuncSpec(funcSpecString));
+
+	server.registerQuery("a = LOAD '"
+		+ yearMonthDayHourPartitionedDir.getAbsolutePath() + "' using "
+		+ funcSpecString + ";");
+	server.registerQuery("f = FILTER a by year=='2010';");
+	server.registerQuery("r = foreach f generate year, f2, f3, month, day, hour;");
+	server.registerQuery("b = ORDER r BY year, month, day, hour;");
+	Iterator<Tuple> result = server.openIterator("b");
+
+	Tuple t = null;
+	while ((t = result.next()) != null) {
+	    System.out.println("Tuple: " + t);
+	    assertEquals(6, t.size());
+	    count++;
+	}
+	System.out.println("Count: " + count);
+	Assert.assertEquals(240, count);
+    }
+
+    @Test
+    public void test1DayDatePartitionedFilesWithProjection() throws IOException {
+	int count = 0;
+
+	String funcSpecString = "org.apache.pig.piggybank.storage.HiveColumnarLoader('f1 string,f2 string,f3 string'"
+		+ ", '" + startingDate + ":" + startingDate + "')";
+
+	System.out.println(funcSpecString);
+
+	PigServer server = new PigServer(ExecType.LOCAL);
+	server.setBatchOn();
+	server.registerFunction(
+		"org.apache.pig.piggybank.storage.HiveColumnarLoader",
+		new FuncSpec(funcSpecString));
+
+	server.registerQuery("a = LOAD '"
+		+ datePartitionedDir.getAbsolutePath() + "' using "
+		+ funcSpecString + ";");
+	server.registerQuery("b = FOREACH a GENERATE f2 as p;");
+	Iterator<Tuple> result = server.openIterator("b");
+
+	Tuple t = null;
+	while ((t = result.next()) != null) {
+	    assertEquals(1, t.size());
+	    assertEquals(DataType.CHARARRAY, t.getType(0));
+	    count++;
+	}
+
+	Assert.assertEquals(50, count);
+    }
+
+    @Test
+    public void test1DayDatePartitionedFiles() throws IOException {
+	int count = 0;
+
+	String funcSpecString = "org.apache.pig.piggybank.storage.HiveColumnarLoader('f1 string,f2 string,f3 string'"
+		+ ", '" + startingDate + ":" + startingDate + "')";
+
+	System.out.println(funcSpecString);
+
+	PigServer server = new PigServer(ExecType.LOCAL);
+	server.setBatchOn();
+	server.registerFunction(
+		"org.apache.pig.piggybank.storage.HiveColumnarLoader",
+		new FuncSpec(funcSpecString));
+
+	server.registerQuery("a = LOAD '"
+		+ datePartitionedDir.getAbsolutePath() + "' using "
+		+ funcSpecString + ";");
+	Iterator<Tuple> result = server.openIterator("a");
+
+	while ((result.next()) != null) {
+	    count++;
+	}
+
+	Assert.assertEquals(50, count);
+    }
+
+    @Test
+    public void testDatePartitionedFiles() throws IOException {
+	int count = 0;
+
+	String funcSpecString = "org.apache.pig.piggybank.storage.HiveColumnarLoader('f1 string,f2 string,f3 string'"
+		+ ", '" + startingDate + ":" + endingDate + "')";
+
+	System.out.println(funcSpecString);
+
+	PigServer server = new PigServer(ExecType.LOCAL);
+	server.setBatchOn();
+	server.registerFunction(
+		"org.apache.pig.piggybank.storage.HiveColumnarLoader",
+		new FuncSpec(funcSpecString));
+
+	server.registerQuery("a = LOAD '"
+		+ datePartitionedDir.getAbsolutePath() + "' using "
+		+ funcSpecString + ";");
+	Iterator<Tuple> result = server.openIterator("a");
+
+	while ((result.next()) != null) {
+	    count++;
+	}
+
+	Assert.assertEquals(datePartitionedRowCount, count);
+    }
+
+    private static void produceDatePartitionedData() throws IOException {
+	datePartitionedRowCount = 0;
+	datePartitionedDir = new File("testhiveColumnarLoader-dateDir-"
+		+ System.currentTimeMillis());
+	datePartitionedDir.mkdir();
+	datePartitionedDir.deleteOnExit();
+
+	int dates = 4;
+	calendar = Calendar.getInstance();
+
+	calendar.set(Calendar.DAY_OF_MONTH, Calendar.MONDAY);
+	calendar.set(Calendar.MONTH, Calendar.JANUARY);
+
+	startingDate = dateFormat.format(calendar.getTime());
+
+	datePartitionedRCFiles = new ArrayList<String>();
+	datePartitionedDirs = new ArrayList<String>();
+
+	for (int i = 0; i < dates; i++) {
+
+	    File file = new File(datePartitionedDir, "daydate="
+		    + dateFormat.format(calendar.getTime()));
+	    calendar.add(Calendar.DAY_OF_MONTH, 1);
+
+	    file.mkdir();
+	    file.deleteOnExit();
+
+	    // for each daydate write 5 partitions
+	    for (int pi = 0; pi < 5; pi++) {
+		Path path = new Path(new Path(file.getAbsolutePath()),
+			"parition" + pi);
+
+		datePartitionedRowCount += writeRCFileTest(fs, simpleRowCount,
+			path, columnCount, new DefaultCodec(), columnCount);
+
+		new File(path.toString()).deleteOnExit();
+		datePartitionedRCFiles.add(path.toString());
+		datePartitionedDirs.add(file.toString());
+
+	    }
+
+	}
+
+	endingDate = dateFormat.format(calendar.getTime());
+    }
+
+    private static void produceYearMonthDayHourPartitionedData()
+	    throws IOException {
+
+	yearMonthDayHourPartitionedDir = new File(
+		"testhiveColumnarLoader-yearMonthDayHourDir-"
+			+ System.currentTimeMillis());
+	yearMonthDayHourPartitionedDir.mkdir();
+	yearMonthDayHourPartitionedDir.deleteOnExit();
+
+	int years = 1;
+	int months = 2;
+	int days = 3;
+	int hours = 4;
+
+	yearMonthDayHourcalendar = Calendar.getInstance();
+
+	yearMonthDayHourcalendar.set(Calendar.DAY_OF_MONTH, Calendar.MONDAY);
+	yearMonthDayHourcalendar.set(Calendar.MONTH, Calendar.JANUARY);
+
+	for (int i = 0; i < years; i++) {
+
+	    File file = new File(yearMonthDayHourPartitionedDir, "year="
+		    + yearMonthDayHourcalendar.get(Calendar.YEAR));
+
+	    file.mkdir();
+	    file.deleteOnExit();
+
+	    for (int monthIndex = 0; monthIndex < months; monthIndex++) {
+
+		File monthFile = new File(file, "month="
+			+ yearMonthDayHourcalendar.get(Calendar.MONTH));
+		monthFile.mkdir();
+		monthFile.deleteOnExit();
+
+		for (int dayIndex = 0; dayIndex < days; dayIndex++) {
+		    File dayFile = new File(monthFile,
+			    "day="
+				    + yearMonthDayHourcalendar
+					    .get(Calendar.DAY_OF_MONTH));
+		    dayFile.mkdir();
+		    dayFile.deleteOnExit();
+
+		    for (int hourIndex = 0; hourIndex < hours; hourIndex++) {
+			File hourFile = new File(dayFile,
+				"hour="
+					+ yearMonthDayHourcalendar
+						.get(Calendar.HOUR_OF_DAY));
+			hourFile.mkdir();
+			hourFile.deleteOnExit();
+
+			File rcFile = new File(hourFile.getAbsolutePath()
+				+ "/attempt-00000");
+			Path hourFilePath = new Path(rcFile.getAbsolutePath());
+			rcFile.deleteOnExit();
+
+			writeRCFileTest(fs, simpleRowCount, hourFilePath,
+				columnCount, new DefaultCodec(), columnCount);
+
+			yearMonthDayHourcalendar.add(Calendar.HOUR_OF_DAY, 1);
+		    }
+
+		    yearMonthDayHourcalendar.add(Calendar.DAY_OF_MONTH, 1);
+		}
+		yearMonthDayHourcalendar.add(Calendar.MONTH, 1);
+	    }
+
+	}
+
+	endingDate = dateFormat.format(calendar.getTime());
+    }
+
+    /**
+     * Writes out a simple temporary file with 5 columns and 100 rows.<br/>
+     * Data is random numbers.
+     * 
+     * @throws SerDeException
+     * @throws IOException
+     */
+    private static final void produceSimpleData() throws SerDeException,
+	    IOException {
+	// produce on single file
+	simpleDataFile = File.createTempFile("testhiveColumnarLoader", ".txt");
+	simpleDataFile.deleteOnExit();
+
+	Path path = new Path(simpleDataFile.getPath());
+
+	writeRCFileTest(fs, simpleRowCount, path, columnCount,
+		new DefaultCodec(), columnCount);
+
+	// produce a folder of simple data
+	simpleDataDir = new File("simpleDataDir" + System.currentTimeMillis());
+	simpleDataDir.mkdir();
+
+	for (int i = 0; i < simpleDirFileCount; i++) {
+
+	    simpleDataFile = new File(simpleDataDir, "testhiveColumnarLoader-"
+		    + i + ".txt");
+
+	    Path filePath = new Path(simpleDataFile.getPath());
+
+	    writeRCFileTest(fs, simpleRowCount, filePath, columnCount,
+		    new DefaultCodec(), columnCount);
+
+	}
+
+    }
+
+    static Random randomCharGenerator = new Random(3);
+
+    static Random randColLenGenerator = new Random(20);
+
+    private static void resetRandomGenerators() {
+	randomCharGenerator = new Random(3);
+	randColLenGenerator = new Random(20);
+    }
+
+    private static int writeRCFileTest(FileSystem fs, int rowCount, Path file,
+	    int columnNum, CompressionCodec codec, int columnCount)
+	    throws IOException {
+	fs.delete(file, true);
+	int rowsWritten = 0;
+
+	resetRandomGenerators();
+
+	RCFileOutputFormat.setColumnNumber(conf, columnNum);
+	RCFile.Writer writer = new RCFile.Writer(fs, conf, file, null, codec);
+
+	byte[][] columnRandom;
+
+	BytesRefArrayWritable bytes = new BytesRefArrayWritable(columnNum);
+	columnRandom = new byte[columnNum][];
+	for (int i = 0; i < columnNum; i++) {
+	    BytesRefWritable cu = new BytesRefWritable();
+	    bytes.set(i, cu);
+	}
+
+	for (int i = 0; i < rowCount; i++) {
+	    nextRandomRow(columnRandom, bytes, columnCount);
+	    rowsWritten++;
+	    writer.append(bytes);
+	}
+	writer.close();
+
+	return rowsWritten;
+    }
+
+    private static void nextRandomRow(byte[][] row,
+	    BytesRefArrayWritable bytes, int columnCount) {
+	bytes.resetValid(row.length);
+	for (int i = 0; i < row.length; i++) {
+
+	    row[i] = new byte[columnCount];
+	    for (int j = 0; j < columnCount; j++)
+		row[i][j] = getRandomChar(randomCharGenerator);
+	    bytes.get(i).set(row[i], 0, columnCount);
+	}
+    }
+
+    private static int CHAR_END = 122 - 7;
+
+    private static byte getRandomChar(Random random) {
+	byte b = 0;
+	do {
+	    b = (byte) random.nextInt(CHAR_END);
+	} while ((b < 65));
+	if (b > 90) {
+	    b = 7;
+	}
+	return b;
+    }
+}

Added: hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestPathPartitionHelper.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestPathPartitionHelper.java?rev=983524&view=auto
==============================================================================
--- hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestPathPartitionHelper.java (added)
+++ hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestPathPartitionHelper.java Mon Aug  9 05:27:39 2010
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.pig.piggybank.test.storage;
+
+import java.io.File;
+import java.util.List;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.pig.builtin.PigStorage;
+import org.apache.pig.piggybank.storage.partition.PathPartitionHelper;
+import org.apache.pig.test.Util;
+import org.junit.Test;
+
+/**
+ * 
+ * Tests the PathPartitionHelper can:<br/>
+ * <ul>
+ * <li>Filter path partitioned files based on an expression being true</li>
+ * <li>Filter path partitioned files based on an expression being false</li>
+ * <li>Filter path partitioned files with no expression</li>
+ * </ul>
+ * 
+ */
+public class TestPathPartitionHelper extends TestCase {
+
+    private static Configuration conf = null;
+
+    File baseDir;
+    File partition1;
+    File partition2;
+    File partition3;
+
+    @Test
+    public void testListStatusPartitionFilterNotFound() throws Exception {
+
+	PathPartitionHelper partitionHelper = new PathPartitionHelper();
+
+	Job job = new Job(conf);
+	job.setJobName("TestJob");
+	job.setInputFormatClass(FileInputFormat.class);
+
+	Configuration conf = job.getConfiguration();
+	FileInputFormat.setInputPaths(job, new Path(baseDir.getAbsolutePath()));
+
+	JobContext jobContext = new JobContext(conf, job.getJobID());
+
+	partitionHelper.setPartitionFilterExpression("year < '2010'",
+		PigStorage.class, "1");
+	partitionHelper.setPartitionKeys(baseDir.getAbsolutePath(), conf,
+		PigStorage.class, "1");
+
+	List<FileStatus> files = partitionHelper.listStatus(jobContext,
+		PigStorage.class, "1");
+
+	assertEquals(0, files.size());
+
+    }
+
+    @Test
+    public void testListStatusPartitionFilterFound() throws Exception {
+
+	PathPartitionHelper partitionHelper = new PathPartitionHelper();
+
+	Job job = new Job(conf);
+	job.setJobName("TestJob");
+	job.setInputFormatClass(FileInputFormat.class);
+
+	Configuration conf = job.getConfiguration();
+	FileInputFormat.setInputPaths(job, new Path(baseDir.getAbsolutePath()));
+
+	JobContext jobContext = new JobContext(conf, job.getJobID());
+
+	partitionHelper.setPartitionFilterExpression(
+		"year<='2010' and month=='01' and day>='01'", PigStorage.class, "2");
+	partitionHelper.setPartitionKeys(baseDir.getAbsolutePath(), conf,
+		PigStorage.class, "2");
+
+	List<FileStatus> files = partitionHelper.listStatus(jobContext,
+		PigStorage.class, "2");
+
+	assertNotNull(files);
+	assertEquals(1, files.size());
+
+    }
+
+    @Test
+    public void testListStatus() throws Exception {
+
+	PathPartitionHelper partitionHelper = new PathPartitionHelper();
+
+	Job job = new Job(conf);
+	job.setJobName("TestJob");
+	job.setInputFormatClass(FileInputFormat.class);
+
+	Configuration conf = job.getConfiguration();
+	FileInputFormat.setInputPaths(job, new Path(baseDir.getAbsolutePath()));
+
+	JobContext jobContext = new JobContext(conf, job.getJobID());
+
+	partitionHelper.setPartitionKeys(baseDir.getAbsolutePath(), conf,
+		PigStorage.class, "3");
+
+	List<FileStatus> files = partitionHelper.listStatus(jobContext,
+		PigStorage.class, "3");
+
+	assertNotNull(files);
+	assertEquals(1, files.size());
+
+    }
+
+    @Override
+    protected void tearDown() throws Exception {
+
+	Util.deleteDirectory(baseDir);
+
+    }
+
+    @Override
+    protected void setUp() throws Exception {
+    File oldConf = new File(System.getProperty("user.home")+"/pigtest/conf/hadoop-site.xml");
+    oldConf.delete();
+	conf = new Configuration(false);
+
+	baseDir = createDir(null,
+		"testPathPartitioner-testGetKeys-" + System.currentTimeMillis());
+
+	partition1 = createDir(baseDir, "year=2010");
+	partition2 = createDir(partition1, "month=01");
+	partition3 = createDir(partition2, "day=01");
+
+	File file = new File(partition3, "testfile-"
+		+ System.currentTimeMillis());
+	file.createNewFile();
+
+    }
+
+    private File createDir(File parent, String name) {
+	File file = new File(parent, name);
+	file.mkdirs();
+	return file;
+    }
+
+}

Added: hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestPathPartitioner.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestPathPartitioner.java?rev=983524&view=auto
==============================================================================
--- hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestPathPartitioner.java (added)
+++ hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestPathPartitioner.java Mon Aug  9 05:27:39 2010
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.pig.piggybank.test.storage;
+
+import java.io.File;
+import java.util.Map;
+import java.util.Set;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.pig.piggybank.storage.partition.PathPartitioner;
+import org.apache.pig.test.Util;
+import org.junit.Test;
+
+/**
+ * 
+ * Tests that the PathPartitioner can:<br/>
+ * <ul>
+ *   <li>Read keys from a partitioned file path</li>
+ *   <li>Read keys and values from a partitioned file path</li>
+ * </ul>
+ *
+ */
+public class TestPathPartitioner extends TestCase {
+
+    private static Configuration conf = null;
+
+    File baseDir;
+    File partition1;
+    File partition2;
+    File partition3;
+
+    @Override
+    protected void tearDown() throws Exception {
+
+	Util.deleteDirectory(baseDir);
+
+    }
+
+    @Override
+    protected void setUp() throws Exception {
+    File oldConf = new File(System.getProperty("user.home")+"/pigtest/conf/hadoop-site.xml");
+    oldConf.delete();
+	conf = new Configuration();
+
+	baseDir = createDir(null,
+		"testPathPartitioner-testGetKeys-" + System.currentTimeMillis());
+
+	partition1 = createDir(baseDir, "year=2010");
+	partition2 = createDir(partition1, "month=01");
+	partition3 = createDir(partition2, "day=01");
+
+	File file = new File(partition3, "testfile-"
+		+ System.currentTimeMillis());
+	file.createNewFile();
+
+    }
+
+    @Test
+    public void testGetKeyValues() throws Exception {
+	PathPartitioner partitioner = new PathPartitioner();
+
+	Map<String, String> map = partitioner
+		.getPathPartitionKeyValues(partition3.getAbsolutePath());
+
+	String[] keys = map.keySet().toArray(new String[] {});
+
+	assertEquals("2010", map.get(keys[0]));
+	assertEquals("01", map.get(keys[1]));
+	assertEquals("01", map.get(keys[2]));
+
+    }
+
+    @Test
+    public void testGetKeys() throws Exception {
+
+	PathPartitioner pathPartitioner = new PathPartitioner();
+	Set<String> keys = pathPartitioner.getPartitionKeys(
+		baseDir.getAbsolutePath(), conf);
+
+	assertNotNull(keys);
+	assertEquals(3, keys.size());
+
+	String[] keyArr = keys.toArray(new String[] {});
+
+	assertEquals("year", keyArr[0]);
+	assertEquals("month", keyArr[1]);
+	assertEquals("day", keyArr[2]);
+
+    }
+
+    private File createDir(File parent, String name) {
+	File file = new File(parent, name);
+	file.mkdirs();
+	return file;
+    }
+
+}