You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2010/08/09 07:27:40 UTC
svn commit: r983524 - in /hadoop/pig/trunk/contrib/piggybank/java: ./
src/test/java/org/apache/pig/piggybank/test/storage/
Author: daijy
Date: Mon Aug 9 05:27:39 2010
New Revision: 983524
URL: http://svn.apache.org/viewvc?rev=983524&view=rev
Log:
PIG-1526: HiveColumnarLoader Partitioning Support (fix piggybank unit test failure)
Added:
hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestHiveColumnarLoader.java
hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestPathPartitionHelper.java
hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestPathPartitioner.java
Modified:
hadoop/pig/trunk/contrib/piggybank/java/build.xml
Modified: hadoop/pig/trunk/contrib/piggybank/java/build.xml
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/piggybank/java/build.xml?rev=983524&r1=983523&r2=983524&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/piggybank/java/build.xml (original)
+++ hadoop/pig/trunk/contrib/piggybank/java/build.xml Mon Aug 9 05:27:39 2010
@@ -116,7 +116,7 @@
<echo> *** Running UDF tests ***</echo>
<delete dir="${test.logs}"/>
<mkdir dir="${test.logs}"/>
- <junit printsummary="yes" haltonfailure="no" fork="yes" maxmemory="256m" dir="${basedir}" timeout="${test.timeout}" errorProperty="tests.failed" failureProperty="tests.failed">
+ <junit printsummary="yes" haltonfailure="no" fork="yes" maxmemory="512m" dir="${basedir}" timeout="${test.timeout}" errorProperty="tests.failed" failureProperty="tests.failed">
<sysproperty key="hadoop.log.dir" value="${test.logs}"/>
<classpath refid="test.classpath"/>
<formatter type="${test.junit.output.format}" />
Added: hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestHiveColumnarLoader.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestHiveColumnarLoader.java?rev=983524&view=auto
==============================================================================
--- hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestHiveColumnarLoader.java (added)
+++ hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestHiveColumnarLoader.java Mon Aug 9 05:27:39 2010
@@ -0,0 +1,575 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.pig.piggybank.test.storage;
+
+import java.io.File;
+import java.io.IOException;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Random;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.io.RCFile;
+import org.apache.hadoop.hive.ql.io.RCFileOutputFormat;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable;
+import org.apache.hadoop.hive.serde2.columnar.BytesRefWritable;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.DefaultCodec;
+import org.apache.pig.ExecType;
+import org.apache.pig.FuncSpec;
+import org.apache.pig.PigServer;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.test.Util;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ *
+ * Tests that the HiveColumnLoader can:
+ * <ul>
+ * <li>Load files without partitioning</li>
+ * <li>Load files with partitioning and dates defined in constructor, or as a
+ * filter</li>
+ * <li>Load files using pig's push down loader capabilities.</li>
+ * </ul>
+ *
+ */
+public class TestHiveColumnarLoader extends TestCase {
+
+ static Configuration conf = null;
+
+ // for single non partitioned file testing
+ static File simpleDataFile = null;
+ // for multiple non partitioned file testing
+ static File simpleDataDir = null;
+
+ static File datePartitionedDir = null;
+ static File yearMonthDayHourPartitionedDir = null;
+
+ // used for cleanup
+ static List<String> datePartitionedRCFiles;
+ static List<String> datePartitionedDirs;
+
+ static private FileSystem fs;
+
+ static int columnMaxSize = 30;
+
+ static int columnCount = 3;
+
+ static int simpleDirFileCount = 3;
+ static int simpleRowCount = 10;
+
+ static String endingDate = null;
+ static String startingDate = null;
+ static DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd");
+ static Calendar calendar = null;
+ static int datePartitionedRowCount;
+
+ private static Calendar yearMonthDayHourcalendar;
+
+ @Override
+ public synchronized void setUp() throws Exception {
+
+ conf = new Configuration();
+
+ fs = LocalFileSystem.getLocal(conf);
+
+ produceSimpleData();
+
+ produceDatePartitionedData();
+
+ produceYearMonthDayHourPartitionedData();
+
+ }
+
+ @Override
+ public void tearDown() {
+
+ Util.deleteDirectory(datePartitionedDir);
+
+ Util.deleteDirectory(yearMonthDayHourPartitionedDir);
+
+ Util.deleteDirectory(simpleDataDir);
+
+ simpleDataFile.delete();
+
+ }
+
+ @Test
+ public void testReadingSingleFileNoProjections() throws IOException {
+ String funcSpecString = "org.apache.pig.piggybank.storage.HiveColumnarLoader('f1 string,f2 string,f3 string')";
+
+ String singlePartitionedFile = simpleDataFile.getAbsolutePath();
+
+ PigServer server = new PigServer(ExecType.LOCAL);
+ server.setBatchOn();
+ server.registerFunction(
+ "org.apache.pig.piggybank.storage.HiveColumnarLoader",
+ new FuncSpec(funcSpecString));
+
+ server.registerQuery("a = LOAD '" + singlePartitionedFile + "' using "
+ + funcSpecString + ";");
+
+ Iterator<Tuple> result = server.openIterator("a");
+
+ int count = 0;
+ Tuple t = null;
+ while ((t = result.next()) != null) {
+ assertEquals(3, t.size());
+ assertEquals(DataType.CHARARRAY, t.getType(0));
+ count++;
+ }
+
+ Assert.assertEquals(simpleRowCount, count);
+ }
+
+ @Test
+ public void testReadingMultipleNonPartitionedFiles() throws IOException {
+ String funcSpecString = "org.apache.pig.piggybank.storage.HiveColumnarLoader('f1 string,f2 string,f3 string')";
+
+ String singlePartitionedDir = simpleDataDir.getAbsolutePath();
+
+ PigServer server = new PigServer(ExecType.LOCAL);
+ server.setBatchOn();
+ server.registerFunction(
+ "org.apache.pig.piggybank.storage.HiveColumnarLoader",
+ new FuncSpec(funcSpecString));
+
+ server.registerQuery("a = LOAD '" + singlePartitionedDir + "' using "
+ + funcSpecString + ";");
+
+ server.registerQuery("b = foreach a generate f1;");
+
+ Iterator<Tuple> result = server.openIterator("b");
+
+ int count = 0;
+ Tuple t = null;
+ while ((t = result.next()) != null) {
+ assertEquals(1, t.size());
+ assertEquals(DataType.CHARARRAY, t.getType(0));
+ count++;
+ }
+
+ Assert.assertEquals(simpleDirFileCount * simpleRowCount, count);
+ }
+
+ @Test
+ public void testReadingSingleFile() throws IOException {
+ String funcSpecString = "org.apache.pig.piggybank.storage.HiveColumnarLoader('f1 string,f2 string,f3 string')";
+
+ String singlePartitionedFile = simpleDataFile.getAbsolutePath();
+
+ PigServer server = new PigServer(ExecType.LOCAL);
+ server.setBatchOn();
+ server.registerFunction(
+ "org.apache.pig.piggybank.storage.HiveColumnarLoader",
+ new FuncSpec(funcSpecString));
+
+ server.registerQuery("a = LOAD '" + singlePartitionedFile + "' using "
+ + funcSpecString + ";");
+
+ server.registerQuery("b = foreach a generate f1;");
+
+ Iterator<Tuple> result = server.openIterator("b");
+
+ int count = 0;
+ Tuple t = null;
+ while ((t = result.next()) != null) {
+ assertEquals(1, t.size());
+ assertEquals(DataType.CHARARRAY, t.getType(0));
+ count++;
+ }
+
+ Assert.assertEquals(simpleRowCount, count);
+ }
+
+ @Test
+ public void testYearMonthDayHourPartitionedFilesWithProjection()
+ throws IOException {
+ int count = 0;
+
+ String funcSpecString = "org.apache.pig.piggybank.storage.HiveColumnarLoader('f1 string,f2 string,f3 string')";
+
+ PigServer server = new PigServer(ExecType.LOCAL);
+ server.setBatchOn();
+ server.registerFunction(
+ "org.apache.pig.piggybank.storage.HiveColumnarLoader",
+ new FuncSpec(funcSpecString));
+
+ server.registerQuery("a = LOAD '"
+ + yearMonthDayHourPartitionedDir.getAbsolutePath() + "' using "
+ + funcSpecString + ";");
+ server.registerQuery("f = FILTER a by year=='2010';");
+ server.registerQuery("b = foreach f generate f1;");
+
+ Iterator<Tuple> result = server.openIterator("b");
+
+ Tuple t = null;
+ while ((t = result.next()) != null) {
+ assertEquals(1, t.size());
+ assertEquals(DataType.CHARARRAY, t.getType(0));
+ count++;
+ }
+
+ Assert.assertEquals(240, count);
+
+ }
+
+ @Test
+ public void testYearMonthDayHourPartitionedFilesWithProjectionAndPartitionColumns()
+ throws IOException {
+ int count = 0;
+
+ String funcSpecString = "org.apache.pig.piggybank.storage.HiveColumnarLoader('f1 string,f2 string,f3 string')";
+
+ PigServer server = new PigServer(ExecType.LOCAL);
+ server.setBatchOn();
+ server.registerFunction(
+ "org.apache.pig.piggybank.storage.HiveColumnarLoader",
+ new FuncSpec(funcSpecString));
+
+ server.registerQuery("a = LOAD '"
+ + yearMonthDayHourPartitionedDir.getAbsolutePath() + "' using "
+ + funcSpecString + ";");
+ server.registerQuery("f = FILTER a by year=='2010';");
+ server.registerQuery("r = foreach f generate year, f2, f3, month, day, hour;");
+ server.registerQuery("b = ORDER r BY year, month, day, hour;");
+ Iterator<Tuple> result = server.openIterator("b");
+
+ Tuple t = null;
+ while ((t = result.next()) != null) {
+ System.out.println("Tuple: " + t);
+ assertEquals(6, t.size());
+ count++;
+ }
+ System.out.println("Count: " + count);
+ Assert.assertEquals(240, count);
+ }
+
+ @Test
+ public void test1DayDatePartitionedFilesWithProjection() throws IOException {
+ int count = 0;
+
+ String funcSpecString = "org.apache.pig.piggybank.storage.HiveColumnarLoader('f1 string,f2 string,f3 string'"
+ + ", '" + startingDate + ":" + startingDate + "')";
+
+ System.out.println(funcSpecString);
+
+ PigServer server = new PigServer(ExecType.LOCAL);
+ server.setBatchOn();
+ server.registerFunction(
+ "org.apache.pig.piggybank.storage.HiveColumnarLoader",
+ new FuncSpec(funcSpecString));
+
+ server.registerQuery("a = LOAD '"
+ + datePartitionedDir.getAbsolutePath() + "' using "
+ + funcSpecString + ";");
+ server.registerQuery("b = FOREACH a GENERATE f2 as p;");
+ Iterator<Tuple> result = server.openIterator("b");
+
+ Tuple t = null;
+ while ((t = result.next()) != null) {
+ assertEquals(1, t.size());
+ assertEquals(DataType.CHARARRAY, t.getType(0));
+ count++;
+ }
+
+ Assert.assertEquals(50, count);
+ }
+
+ @Test
+ public void test1DayDatePartitionedFiles() throws IOException {
+ int count = 0;
+
+ String funcSpecString = "org.apache.pig.piggybank.storage.HiveColumnarLoader('f1 string,f2 string,f3 string'"
+ + ", '" + startingDate + ":" + startingDate + "')";
+
+ System.out.println(funcSpecString);
+
+ PigServer server = new PigServer(ExecType.LOCAL);
+ server.setBatchOn();
+ server.registerFunction(
+ "org.apache.pig.piggybank.storage.HiveColumnarLoader",
+ new FuncSpec(funcSpecString));
+
+ server.registerQuery("a = LOAD '"
+ + datePartitionedDir.getAbsolutePath() + "' using "
+ + funcSpecString + ";");
+ Iterator<Tuple> result = server.openIterator("a");
+
+ while ((result.next()) != null) {
+ count++;
+ }
+
+ Assert.assertEquals(50, count);
+ }
+
+ @Test
+ public void testDatePartitionedFiles() throws IOException {
+ int count = 0;
+
+ String funcSpecString = "org.apache.pig.piggybank.storage.HiveColumnarLoader('f1 string,f2 string,f3 string'"
+ + ", '" + startingDate + ":" + endingDate + "')";
+
+ System.out.println(funcSpecString);
+
+ PigServer server = new PigServer(ExecType.LOCAL);
+ server.setBatchOn();
+ server.registerFunction(
+ "org.apache.pig.piggybank.storage.HiveColumnarLoader",
+ new FuncSpec(funcSpecString));
+
+ server.registerQuery("a = LOAD '"
+ + datePartitionedDir.getAbsolutePath() + "' using "
+ + funcSpecString + ";");
+ Iterator<Tuple> result = server.openIterator("a");
+
+ while ((result.next()) != null) {
+ count++;
+ }
+
+ Assert.assertEquals(datePartitionedRowCount, count);
+ }
+
+ private static void produceDatePartitionedData() throws IOException {
+ datePartitionedRowCount = 0;
+ datePartitionedDir = new File("testhiveColumnarLoader-dateDir-"
+ + System.currentTimeMillis());
+ datePartitionedDir.mkdir();
+ datePartitionedDir.deleteOnExit();
+
+ int dates = 4;
+ calendar = Calendar.getInstance();
+
+ calendar.set(Calendar.DAY_OF_MONTH, Calendar.MONDAY);
+ calendar.set(Calendar.MONTH, Calendar.JANUARY);
+
+ startingDate = dateFormat.format(calendar.getTime());
+
+ datePartitionedRCFiles = new ArrayList<String>();
+ datePartitionedDirs = new ArrayList<String>();
+
+ for (int i = 0; i < dates; i++) {
+
+ File file = new File(datePartitionedDir, "daydate="
+ + dateFormat.format(calendar.getTime()));
+ calendar.add(Calendar.DAY_OF_MONTH, 1);
+
+ file.mkdir();
+ file.deleteOnExit();
+
+ // for each daydate write 5 partitions
+ for (int pi = 0; pi < 5; pi++) {
+ Path path = new Path(new Path(file.getAbsolutePath()),
+ "parition" + pi);
+
+ datePartitionedRowCount += writeRCFileTest(fs, simpleRowCount,
+ path, columnCount, new DefaultCodec(), columnCount);
+
+ new File(path.toString()).deleteOnExit();
+ datePartitionedRCFiles.add(path.toString());
+ datePartitionedDirs.add(file.toString());
+
+ }
+
+ }
+
+ endingDate = dateFormat.format(calendar.getTime());
+ }
+
+ private static void produceYearMonthDayHourPartitionedData()
+ throws IOException {
+
+ yearMonthDayHourPartitionedDir = new File(
+ "testhiveColumnarLoader-yearMonthDayHourDir-"
+ + System.currentTimeMillis());
+ yearMonthDayHourPartitionedDir.mkdir();
+ yearMonthDayHourPartitionedDir.deleteOnExit();
+
+ int years = 1;
+ int months = 2;
+ int days = 3;
+ int hours = 4;
+
+ yearMonthDayHourcalendar = Calendar.getInstance();
+
+ yearMonthDayHourcalendar.set(Calendar.DAY_OF_MONTH, Calendar.MONDAY);
+ yearMonthDayHourcalendar.set(Calendar.MONTH, Calendar.JANUARY);
+
+ for (int i = 0; i < years; i++) {
+
+ File file = new File(yearMonthDayHourPartitionedDir, "year="
+ + yearMonthDayHourcalendar.get(Calendar.YEAR));
+
+ file.mkdir();
+ file.deleteOnExit();
+
+ for (int monthIndex = 0; monthIndex < months; monthIndex++) {
+
+ File monthFile = new File(file, "month="
+ + yearMonthDayHourcalendar.get(Calendar.MONTH));
+ monthFile.mkdir();
+ monthFile.deleteOnExit();
+
+ for (int dayIndex = 0; dayIndex < days; dayIndex++) {
+ File dayFile = new File(monthFile,
+ "day="
+ + yearMonthDayHourcalendar
+ .get(Calendar.DAY_OF_MONTH));
+ dayFile.mkdir();
+ dayFile.deleteOnExit();
+
+ for (int hourIndex = 0; hourIndex < hours; hourIndex++) {
+ File hourFile = new File(dayFile,
+ "hour="
+ + yearMonthDayHourcalendar
+ .get(Calendar.HOUR_OF_DAY));
+ hourFile.mkdir();
+ hourFile.deleteOnExit();
+
+ File rcFile = new File(hourFile.getAbsolutePath()
+ + "/attempt-00000");
+ Path hourFilePath = new Path(rcFile.getAbsolutePath());
+ rcFile.deleteOnExit();
+
+ writeRCFileTest(fs, simpleRowCount, hourFilePath,
+ columnCount, new DefaultCodec(), columnCount);
+
+ yearMonthDayHourcalendar.add(Calendar.HOUR_OF_DAY, 1);
+ }
+
+ yearMonthDayHourcalendar.add(Calendar.DAY_OF_MONTH, 1);
+ }
+ yearMonthDayHourcalendar.add(Calendar.MONTH, 1);
+ }
+
+ }
+
+ endingDate = dateFormat.format(calendar.getTime());
+ }
+
+ /**
+ * Writes out a simple temporary file with 5 columns and 100 rows.<br/>
+ * Data is random numbers.
+ *
+ * @throws SerDeException
+ * @throws IOException
+ */
+ private static final void produceSimpleData() throws SerDeException,
+ IOException {
+ // produce on single file
+ simpleDataFile = File.createTempFile("testhiveColumnarLoader", ".txt");
+ simpleDataFile.deleteOnExit();
+
+ Path path = new Path(simpleDataFile.getPath());
+
+ writeRCFileTest(fs, simpleRowCount, path, columnCount,
+ new DefaultCodec(), columnCount);
+
+ // produce a folder of simple data
+ simpleDataDir = new File("simpleDataDir" + System.currentTimeMillis());
+ simpleDataDir.mkdir();
+
+ for (int i = 0; i < simpleDirFileCount; i++) {
+
+ simpleDataFile = new File(simpleDataDir, "testhiveColumnarLoader-"
+ + i + ".txt");
+
+ Path filePath = new Path(simpleDataFile.getPath());
+
+ writeRCFileTest(fs, simpleRowCount, filePath, columnCount,
+ new DefaultCodec(), columnCount);
+
+ }
+
+ }
+
+ static Random randomCharGenerator = new Random(3);
+
+ static Random randColLenGenerator = new Random(20);
+
+ private static void resetRandomGenerators() {
+ randomCharGenerator = new Random(3);
+ randColLenGenerator = new Random(20);
+ }
+
+ private static int writeRCFileTest(FileSystem fs, int rowCount, Path file,
+ int columnNum, CompressionCodec codec, int columnCount)
+ throws IOException {
+ fs.delete(file, true);
+ int rowsWritten = 0;
+
+ resetRandomGenerators();
+
+ RCFileOutputFormat.setColumnNumber(conf, columnNum);
+ RCFile.Writer writer = new RCFile.Writer(fs, conf, file, null, codec);
+
+ byte[][] columnRandom;
+
+ BytesRefArrayWritable bytes = new BytesRefArrayWritable(columnNum);
+ columnRandom = new byte[columnNum][];
+ for (int i = 0; i < columnNum; i++) {
+ BytesRefWritable cu = new BytesRefWritable();
+ bytes.set(i, cu);
+ }
+
+ for (int i = 0; i < rowCount; i++) {
+ nextRandomRow(columnRandom, bytes, columnCount);
+ rowsWritten++;
+ writer.append(bytes);
+ }
+ writer.close();
+
+ return rowsWritten;
+ }
+
+ private static void nextRandomRow(byte[][] row,
+ BytesRefArrayWritable bytes, int columnCount) {
+ bytes.resetValid(row.length);
+ for (int i = 0; i < row.length; i++) {
+
+ row[i] = new byte[columnCount];
+ for (int j = 0; j < columnCount; j++)
+ row[i][j] = getRandomChar(randomCharGenerator);
+ bytes.get(i).set(row[i], 0, columnCount);
+ }
+ }
+
+ private static int CHAR_END = 122 - 7;
+
+ private static byte getRandomChar(Random random) {
+ byte b = 0;
+ do {
+ b = (byte) random.nextInt(CHAR_END);
+ } while ((b < 65));
+ if (b > 90) {
+ b = 7;
+ }
+ return b;
+ }
+}
Added: hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestPathPartitionHelper.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestPathPartitionHelper.java?rev=983524&view=auto
==============================================================================
--- hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestPathPartitionHelper.java (added)
+++ hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestPathPartitionHelper.java Mon Aug 9 05:27:39 2010
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.pig.piggybank.test.storage;
+
+import java.io.File;
+import java.util.List;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.pig.builtin.PigStorage;
+import org.apache.pig.piggybank.storage.partition.PathPartitionHelper;
+import org.apache.pig.test.Util;
+import org.junit.Test;
+
+/**
+ *
+ * Tests the PathPartitionHelper can:<br/>
+ * <ul>
+ * <li>Filter path partitioned files based on an expression being true</li>
+ * <li>Filter path partitioned files based on an expression being false</li>
+ * <li>Filter path partitioned files with no expression</li>
+ * </ul>
+ *
+ */
+public class TestPathPartitionHelper extends TestCase {
+
+ private static Configuration conf = null;
+
+ File baseDir;
+ File partition1;
+ File partition2;
+ File partition3;
+
+ @Test
+ public void testListStatusPartitionFilterNotFound() throws Exception {
+
+ PathPartitionHelper partitionHelper = new PathPartitionHelper();
+
+ Job job = new Job(conf);
+ job.setJobName("TestJob");
+ job.setInputFormatClass(FileInputFormat.class);
+
+ Configuration conf = job.getConfiguration();
+ FileInputFormat.setInputPaths(job, new Path(baseDir.getAbsolutePath()));
+
+ JobContext jobContext = new JobContext(conf, job.getJobID());
+
+ partitionHelper.setPartitionFilterExpression("year < '2010'",
+ PigStorage.class, "1");
+ partitionHelper.setPartitionKeys(baseDir.getAbsolutePath(), conf,
+ PigStorage.class, "1");
+
+ List<FileStatus> files = partitionHelper.listStatus(jobContext,
+ PigStorage.class, "1");
+
+ assertEquals(0, files.size());
+
+ }
+
+ @Test
+ public void testListStatusPartitionFilterFound() throws Exception {
+
+ PathPartitionHelper partitionHelper = new PathPartitionHelper();
+
+ Job job = new Job(conf);
+ job.setJobName("TestJob");
+ job.setInputFormatClass(FileInputFormat.class);
+
+ Configuration conf = job.getConfiguration();
+ FileInputFormat.setInputPaths(job, new Path(baseDir.getAbsolutePath()));
+
+ JobContext jobContext = new JobContext(conf, job.getJobID());
+
+ partitionHelper.setPartitionFilterExpression(
+ "year<='2010' and month=='01' and day>='01'", PigStorage.class, "2");
+ partitionHelper.setPartitionKeys(baseDir.getAbsolutePath(), conf,
+ PigStorage.class, "2");
+
+ List<FileStatus> files = partitionHelper.listStatus(jobContext,
+ PigStorage.class, "2");
+
+ assertNotNull(files);
+ assertEquals(1, files.size());
+
+ }
+
+ @Test
+ public void testListStatus() throws Exception {
+
+ PathPartitionHelper partitionHelper = new PathPartitionHelper();
+
+ Job job = new Job(conf);
+ job.setJobName("TestJob");
+ job.setInputFormatClass(FileInputFormat.class);
+
+ Configuration conf = job.getConfiguration();
+ FileInputFormat.setInputPaths(job, new Path(baseDir.getAbsolutePath()));
+
+ JobContext jobContext = new JobContext(conf, job.getJobID());
+
+ partitionHelper.setPartitionKeys(baseDir.getAbsolutePath(), conf,
+ PigStorage.class, "3");
+
+ List<FileStatus> files = partitionHelper.listStatus(jobContext,
+ PigStorage.class, "3");
+
+ assertNotNull(files);
+ assertEquals(1, files.size());
+
+ }
+
+ @Override
+ protected void tearDown() throws Exception {
+
+ Util.deleteDirectory(baseDir);
+
+ }
+
+ @Override
+ protected void setUp() throws Exception {
+ File oldConf = new File(System.getProperty("user.home")+"/pigtest/conf/hadoop-site.xml");
+ oldConf.delete();
+ conf = new Configuration(false);
+
+ baseDir = createDir(null,
+ "testPathPartitioner-testGetKeys-" + System.currentTimeMillis());
+
+ partition1 = createDir(baseDir, "year=2010");
+ partition2 = createDir(partition1, "month=01");
+ partition3 = createDir(partition2, "day=01");
+
+ File file = new File(partition3, "testfile-"
+ + System.currentTimeMillis());
+ file.createNewFile();
+
+ }
+
+ private File createDir(File parent, String name) {
+ File file = new File(parent, name);
+ file.mkdirs();
+ return file;
+ }
+
+}
Added: hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestPathPartitioner.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestPathPartitioner.java?rev=983524&view=auto
==============================================================================
--- hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestPathPartitioner.java (added)
+++ hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestPathPartitioner.java Mon Aug 9 05:27:39 2010
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.pig.piggybank.test.storage;
+
+import java.io.File;
+import java.util.Map;
+import java.util.Set;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.pig.piggybank.storage.partition.PathPartitioner;
+import org.apache.pig.test.Util;
+import org.junit.Test;
+
+/**
+ *
+ * Tests that the PathPartitioner can:<br/>
+ * <ul>
+ * <li>Read keys from a partitioned file path</li>
+ * <li>Read keys and values from a partitioned file path</li>
+ * </ul>
+ *
+ */
+public class TestPathPartitioner extends TestCase {
+
+ private static Configuration conf = null;
+
+ File baseDir;
+ File partition1;
+ File partition2;
+ File partition3;
+
+ @Override
+ protected void tearDown() throws Exception {
+
+ Util.deleteDirectory(baseDir);
+
+ }
+
+ @Override
+ protected void setUp() throws Exception {
+ File oldConf = new File(System.getProperty("user.home")+"/pigtest/conf/hadoop-site.xml");
+ oldConf.delete();
+ conf = new Configuration();
+
+ baseDir = createDir(null,
+ "testPathPartitioner-testGetKeys-" + System.currentTimeMillis());
+
+ partition1 = createDir(baseDir, "year=2010");
+ partition2 = createDir(partition1, "month=01");
+ partition3 = createDir(partition2, "day=01");
+
+ File file = new File(partition3, "testfile-"
+ + System.currentTimeMillis());
+ file.createNewFile();
+
+ }
+
+ @Test
+ public void testGetKeyValues() throws Exception {
+ PathPartitioner partitioner = new PathPartitioner();
+
+ Map<String, String> map = partitioner
+ .getPathPartitionKeyValues(partition3.getAbsolutePath());
+
+ String[] keys = map.keySet().toArray(new String[] {});
+
+ assertEquals("2010", map.get(keys[0]));
+ assertEquals("01", map.get(keys[1]));
+ assertEquals("01", map.get(keys[2]));
+
+ }
+
+ @Test
+ public void testGetKeys() throws Exception {
+
+ PathPartitioner pathPartitioner = new PathPartitioner();
+ Set<String> keys = pathPartitioner.getPartitionKeys(
+ baseDir.getAbsolutePath(), conf);
+
+ assertNotNull(keys);
+ assertEquals(3, keys.size());
+
+ String[] keyArr = keys.toArray(new String[] {});
+
+ assertEquals("year", keyArr[0]);
+ assertEquals("month", keyArr[1]);
+ assertEquals("day", keyArr[2]);
+
+ }
+
+ private File createDir(File parent, String name) {
+ File file = new File(parent, name);
+ file.mkdirs();
+ return file;
+ }
+
+}