You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2011/07/12 19:11:37 UTC
svn commit: r1145681 - in /pig/branches/branch-0.9: ./
contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/
contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/
Author: daijy
Date: Tue Jul 12 17:11:37 2011
New Revision: 1145681
URL: http://svn.apache.org/viewvc?rev=1145681&view=rev
Log:
PIG-1890: Fix piggybank unit test TestAvroStorage
Modified:
pig/branches/branch-0.9/CHANGES.txt
pig/branches/branch-0.9/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorage.java
pig/branches/branch-0.9/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorageUtils.java
pig/branches/branch-0.9/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigSchema2Avro.java
pig/branches/branch-0.9/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/TestAvroStorage.java
Modified: pig/branches/branch-0.9/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.9/CHANGES.txt?rev=1145681&r1=1145680&r2=1145681&view=diff
==============================================================================
--- pig/branches/branch-0.9/CHANGES.txt (original)
+++ pig/branches/branch-0.9/CHANGES.txt Tue Jul 12 17:11:37 2011
@@ -192,6 +192,8 @@ PIG-1696: Performance: Use System.arrayc
BUG FIXES
+PIG-1890: Fix piggybank unit test TestAvroStorage (kengoodhope via daijy)
+
PIG-2144: ClassCastException when using IsEmpty(DIFF()) (thejas)
PIG-2139: LogicalExpressionSimplifier optimizer rule should check if udf is
Modified: pig/branches/branch-0.9/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorage.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.9/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorage.java?rev=1145681&r1=1145680&r2=1145681&view=diff
==============================================================================
--- pig/branches/branch-0.9/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorage.java (original)
+++ pig/branches/branch-0.9/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorage.java Tue Jul 12 17:11:37 2011
@@ -26,6 +26,7 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;
import java.util.Set;
+import java.util.HashSet;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Field;
import org.apache.avro.file.DataFileStream;
@@ -42,6 +43,7 @@ import org.apache.hadoop.mapreduce.Recor
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.pig.Expression;
import org.apache.pig.FileInputLoadFunc;
import org.apache.pig.LoadFunc;
@@ -128,7 +130,9 @@ public class AvroStorage extends FileInp
*/
@Override
public void setLocation(String location, Job job) throws IOException {
- if(AvroStorageUtils.addInputPaths(location, job) && inputAvroSchema == null) {
+ HashSet<Path> paths = new HashSet<Path>();
+ if(AvroStorageUtils.getAllSubDirs(new Path(location), job, paths) && inputAvroSchema == null) {
+ FileInputFormat.setInputPaths(job, paths.toArray(new Path[0]));
inputAvroSchema = getAvroSchema(location, job);
}
}
@@ -188,6 +192,8 @@ public class AvroStorage extends FileInp
if (schema == null)
System.err.println("Cannot get avro schema! Input path " + path + " might be empty.");
+
+ System.err.println(schema.toString());
return schema;
}
@@ -211,6 +217,7 @@ public class AvroStorage extends FileInp
DataFileStream<Object> avroDataStream = new DataFileStream<Object>(hdfsInputStream, avroReader);
Schema ret = avroDataStream.getSchema();
avroDataStream.close();
+
return ret;
}
@@ -267,6 +274,9 @@ public class AvroStorage extends FileInp
/* convert to pig schema */
ResourceSchema pigSchema = AvroSchema2Pig.convert(inputAvroSchema);
AvroStorageLog.details("pig input schema:" + pigSchema);
+ if (pigSchema.getFields().length == 1){
+ pigSchema = pigSchema.getFields()[0].getSchema();
+ }
return pigSchema;
} else
return null;
@@ -567,7 +577,7 @@ public class AvroStorage extends FileInp
@Override
public void putNext(Tuple t) throws IOException {
try {
- this.writer.write(NullWritable.get(), t);
+ this.writer.write(NullWritable.get(), t.getAll().size() == 1 ? t.get(0) : t);
} catch (InterruptedException e) {
e.printStackTrace();
}
Modified: pig/branches/branch-0.9/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorageUtils.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.9/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorageUtils.java?rev=1145681&r1=1145680&r2=1145681&view=diff
==============================================================================
--- pig/branches/branch-0.9/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorageUtils.java (original)
+++ pig/branches/branch-0.9/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorageUtils.java Tue Jul 12 17:11:37 2011
@@ -17,8 +17,11 @@
package org.apache.pig.piggybank.storage.avro;
+import java.io.FileNotFoundException;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
@@ -35,6 +38,8 @@ import org.apache.hadoop.mapreduce.lib.i
import org.apache.pig.ResourceSchema;
import org.apache.pig.ResourceSchema.ResourceFieldSchema;
import org.apache.pig.data.DataType;
+import org.apache.pig.piggybank.storage.avro.AvroStorageLog;
+
/**
* This is utility class for this package
@@ -87,54 +92,51 @@ public class AvroStorageUtils {
}
/**
- * get input paths to job config
+ * get input paths to job config
*/
public static boolean addInputPaths(String pathString, Job job)
- throws IOException {
- Configuration conf = job.getConfiguration();
- FileSystem fs = FileSystem.get(conf);
-
- Path path = new Path(pathString);
- FileStatus pathStatus = fs.getFileStatus(path);
-
- List<FileStatus> input = new LinkedList<FileStatus>();
- if (PATH_FILTER.accept(path)) { // remove input path with leading "." or "_"
- input.add(pathStatus);
- }
-
- boolean ret = false;
- while (!input.isEmpty()) {
-
- FileStatus status = input.remove(0);
- Path p = status.getPath();
-
- if (!status.isDir() ) {
- AvroStorageLog.details("Add input path:" + p);
- FileInputFormat.addInputPath(job, p);
- ret = true;
- }
- else {
- /*list all sub-dirs*/
- FileStatus[] ss = fs.listStatus(p, PATH_FILTER);
-
- if (ss.length > 0) {
- if (noDir(ss) ) {
- AvroStorageLog.details("Add input path:" + p);
- FileInputFormat.addInputPath(job, p);
- ret = true;
- }
- else {
- input.addAll(Arrays.asList(ss));
- ret = true;
- }
-
- }
- }
- }
+ throws IOException
+ {
+ Configuration conf = job.getConfiguration();
+ FileSystem fs = FileSystem.get(conf);
+ HashSet<Path> paths = new HashSet<Path>();
+ if (getAllSubDirs(new Path(pathString), job, paths))
+ {
+ paths.addAll(Arrays.asList(FileInputFormat.getInputPaths(job)));
+ FileInputFormat.setInputPaths(job, paths.toArray(new Path[0]));
+ return true;
+ }
+ return false;
- return ret;
}
+ /**
+ * Adds all non-hidden directories and subdirectories to set param
+ *
+ * @throws IOException
+ */
+ static boolean getAllSubDirs(Path path, Job job, Set<Path> paths) throws IOException {
+ FileSystem fs = FileSystem.get(job.getConfiguration());
+ if (PATH_FILTER.accept(path)) {
+ try {
+ FileStatus file = fs.getFileStatus(path);
+ if (file.isDir()) {
+ for (FileStatus sub : fs.listStatus(path)) {
+ getAllSubDirs(sub.getPath(), job, paths);
+ }
+ } else {
+ AvroStorageLog.details("Add input file:" + file);
+ paths.add(file.getPath());
+ }
+ } catch (FileNotFoundException e) {
+ AvroStorageLog.details("Input path does not exist: " + path);
+ return false;
+ }
+ return true;
+ }
+ return false;
+ }
+
/** check whether there is NO directory in the input file (status) list*/
public static boolean noDir(FileStatus [] ss) {
for (FileStatus s : ss) {
Modified: pig/branches/branch-0.9/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigSchema2Avro.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.9/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigSchema2Avro.java?rev=1145681&r1=1145680&r2=1145681&view=diff
==============================================================================
--- pig/branches/branch-0.9/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigSchema2Avro.java (original)
+++ pig/branches/branch-0.9/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigSchema2Avro.java Tue Jul 12 17:11:37 2011
@@ -54,15 +54,10 @@ public class PigSchema2Avro {
ResourceFieldSchema[] pigFields = pigSchema.getFields();
/* remove the pig tuple wrapper */
- if (pigFields.length == 1 && AvroStorageUtils.isTupleWrapper(pigFields[0])) {
+ if (pigFields.length == 1) {
AvroStorageLog.details("Ignore the pig tuple wrapper.");
- ResourceFieldSchema[] listSchemas = pigFields[0].getSchema()
- .getFields();
- if (listSchemas.length != 1)
- throw new IOException("Expect one subfield from " + pigFields);
-
- return convert(listSchemas[0], nullable);
+ return convert(pigFields[0], nullable);
} else
return convertRecord(pigFields, nullable);
}
Modified: pig/branches/branch-0.9/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/TestAvroStorage.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.9/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/TestAvroStorage.java?rev=1145681&r1=1145680&r2=1145681&view=diff
==============================================================================
--- pig/branches/branch-0.9/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/TestAvroStorage.java (original)
+++ pig/branches/branch-0.9/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/TestAvroStorage.java Tue Jul 12 17:11:37 2011
@@ -76,121 +76,116 @@ public class TestAvroStorage {
}
@Test
- public void testDummy() {
- // Dummy test, will remove after PIG-1890 check in. Otherwise, Junit will complain "No runnable methods"
+ public void testArrayDefault() throws IOException {
+ String output= outbasedir + "testArrayDefault";
+ String expected = basedir + "expected_testArrayDefault.avro";
+
+ deleteDirectory(new File(output));
+
+ String [] queries = {
+ " in = LOAD '" + testArrayFile + " ' USING org.apache.pig.piggybank.storage.avro.AvroStorage ();",
+ " STORE in INTO '" + output + "' USING org.apache.pig.piggybank.storage.avro.AvroStorage ();"
+ };
+ testAvroStorage( queries);
+ verifyResults(output, expected);
+ }
+
+
+ @Test
+ public void testArrayWithSchema() throws IOException {
+ String output= outbasedir + "testArrayWithSchema";
+ String expected = basedir + "expected_testArrayWithSchema.avro";
+ deleteDirectory(new File(output));
+ String [] queries = {
+ " in = LOAD '" + testArrayFile + " ' USING org.apache.pig.piggybank.storage.avro.AvroStorage ();",
+ " STORE in INTO '" + output +
+ "' USING org.apache.pig.piggybank.storage.avro.AvroStorage ( " +
+ " 'schema', '{\"type\":\"array\",\"items\":\"float\"}' );"
+ };
+ testAvroStorage( queries);
+ verifyResults(output, expected);
+ }
+
+ @Test
+ public void testArrayWithNotNull() throws IOException {
+ String output= outbasedir + "testArrayWithNotNull";
+ String expected = basedir + "expected_testArrayWithSchema.avro";
+ deleteDirectory(new File(output));
+ String [] queries = {
+ " in = LOAD '" + testArrayFile + " ' USING org.apache.pig.piggybank.storage.avro.AvroStorage ();",
+ " STORE in INTO '" + output +
+ "' USING org.apache.pig.piggybank.storage.avro.AvroStorage ( " +
+ " '{\"nullable\": false }' );"
+ };
+ testAvroStorage( queries);
+ verifyResults(output, expected);
+ }
+
+ @Test
+ public void testArrayWithSame() throws IOException {
+ String output= outbasedir + "testArrayWithSame";
+ String expected = basedir + "expected_testArrayWithSchema.avro";
+ deleteDirectory(new File(output));
+ String [] queries = {
+ " in = LOAD '" + testArrayFile + " ' USING org.apache.pig.piggybank.storage.avro.AvroStorage ();",
+ " STORE in INTO '" + output +
+ "' USING org.apache.pig.piggybank.storage.avro.AvroStorage ( " +
+ " 'same', '" + testArrayFile + "' );"
+ };
+ testAvroStorage(queries);
+ verifyResults(output, expected);
+ }
+
+ @Test
+ public void testRecordWithSplit() throws IOException {
+ String output1= outbasedir + "testRecordSplit1";
+ String output2= outbasedir + "testRecordSplit2";
+ String expected1 = basedir + "expected_testRecordSplit1.avro";
+ String expected2 = basedir + "expected_testRecordSplit2.avro";
+ deleteDirectory(new File(output1));
+ deleteDirectory(new File(output2));
+ String [] queries = {
+ " avro = LOAD '" + testRecordFile + " ' USING org.apache.pig.piggybank.storage.avro.AvroStorage ();",
+ " groups = GROUP avro BY member_id;",
+ " sc = FOREACH groups GENERATE group AS key, COUNT(avro) AS cnt;",
+ " STORE sc INTO '" + output1 + "' " +
+ " USING org.apache.pig.piggybank.storage.avro.AvroStorage (" +
+ "'{\"index\": 1, " +
+ " \"schema\": {\"type\":\"record\", " +
+ " \"name\":\"result\", " +
+ " \"fields\":[ {\"name\":\"member_id\",\"type\":\"int\"}, " +
+ "{\"name\":\"count\", \"type\":\"long\"} " +
+ "]" +
+ "}" +
+ " }');",
+ " STORE sc INTO '" + output2 +
+ " 'USING org.apache.pig.piggybank.storage.avro.AvroStorage ('index', '2');"
+ };
+ testAvroStorage( queries);
+ verifyResults(output1, expected1);
+ verifyResults(output2, expected2);
+ }
+
+ @Test
+ public void testRecordWithFieldSchema() throws IOException {
+ String output= outbasedir + "testRecordWithFieldSchema";
+ String expected = basedir + "expected_testRecordWithFieldSchema.avro";
+ deleteDirectory(new File(output));
+ String [] queries = {
+ " avro = LOAD '" + testRecordFile + " ' USING org.apache.pig.piggybank.storage.avro.AvroStorage ();",
+ " avro1 = FILTER avro BY member_id > 1211;",
+ " avro2 = FOREACH avro1 GENERATE member_id, browser_id, tracking_time, act_content ;",
+ " STORE avro2 INTO '" + output + "' " +
+ " USING org.apache.pig.piggybank.storage.avro.AvroStorage (" +
+ "'{\"data\": \"" + testRecordFile + "\" ," +
+ " \"field0\": \"int\", " +
+ " \"field1\": \"def:browser_id\", " +
+ " \"field3\": \"def:act_content\" " +
+ " }');"
+ };
+ testAvroStorage( queries);
+ verifyResults(output, expected);
}
-// Comment out all test cases until PIG-1890 fixed
-// @Test
-// public void testArrayDefault() throws IOException {
-// String output= outbasedir + "testArrayDefault";
-// String expected = basedir + "expected_testArrayDefault.avro";
-//
-// deleteDirectory(new File(output));
-//
-// String [] queries = {
-// " in = LOAD '" + testArrayFile + " ' USING org.apache.pig.piggybank.storage.avro.AvroStorage ();",
-// " STORE in INTO '" + output + "' USING org.apache.pig.piggybank.storage.avro.AvroStorage ();"
-// };
-// testAvroStorage( queries);
-// verifyResults(output, expected);
-// }
-//
-//
-// @Test
-// public void testArrayWithSchema() throws IOException {
-// String output= outbasedir + "testArrayWithSchema";
-// String expected = basedir + "expected_testArrayWithSchema.avro";
-// deleteDirectory(new File(output));
-// String [] queries = {
-// " in = LOAD '" + testArrayFile + " ' USING org.apache.pig.piggybank.storage.avro.AvroStorage ();",
-// " STORE in INTO '" + output +
-// "' USING org.apache.pig.piggybank.storage.avro.AvroStorage ( " +
-// " 'schema', '{\"type\":\"array\",\"items\":\"float\"}' );"
-// };
-// testAvroStorage( queries);
-// verifyResults(output, expected);
-// }
-//
-// @Test
-// public void testArrayWithNotNull() throws IOException {
-// String output= outbasedir + "testArrayWithNotNull";
-// String expected = basedir + "expected_testArrayWithSchema.avro";
-// deleteDirectory(new File(output));
-// String [] queries = {
-// " in = LOAD '" + testArrayFile + " ' USING org.apache.pig.piggybank.storage.avro.AvroStorage ();",
-// " STORE in INTO '" + output +
-// "' USING org.apache.pig.piggybank.storage.avro.AvroStorage ( " +
-// " '{\"nullable\": false }' );"
-// };
-// testAvroStorage( queries);
-// verifyResults(output, expected);
-// }
-//
-// @Test
-// public void testArrayWithSame() throws IOException {
-// String output= outbasedir + "testArrayWithSame";
-// String expected = basedir + "expected_testArrayWithSchema.avro";
-// deleteDirectory(new File(output));
-// String [] queries = {
-// " in = LOAD '" + testArrayFile + " ' USING org.apache.pig.piggybank.storage.avro.AvroStorage ();",
-// " STORE in INTO '" + output +
-// "' USING org.apache.pig.piggybank.storage.avro.AvroStorage ( " +
-// " 'same', '" + testArrayFile + "' );"
-// };
-// testAvroStorage(queries);
-// verifyResults(output, expected);
-// }
-//
-// @Test
-// public void testRecordWithSplit() throws IOException {
-// String output1= outbasedir + "testRecordSplit1";
-// String output2= outbasedir + "testRecordSplit2";
-// String expected1 = basedir + "expected_testRecordSplit1.avro";
-// String expected2 = basedir + "expected_testRecordSplit2.avro";
-// deleteDirectory(new File(output1));
-// deleteDirectory(new File(output2));
-// String [] queries = {
-// " avro = LOAD '" + testRecordFile + " ' USING org.apache.pig.piggybank.storage.avro.AvroStorage ();",
-// " groups = GROUP avro BY member_id;",
-// " sc = FOREACH groups GENERATE group AS key, COUNT(avro) AS cnt;",
-// " STORE sc INTO '" + output1 + "' " +
-// " USING org.apache.pig.piggybank.storage.avro.AvroStorage (" +
-// "'{\"index\": 1, " +
-// " \"schema\": {\"type\":\"record\", " +
-// " \"name\":\"result\", " +
-// " \"fields\":[ {\"name\":\"member_id\",\"type\":\"int\"}, " +
-// "{\"name\":\"count\", \"type\":\"long\"} " +
-// "]" +
-// "}" +
-// " }');",
-// " STORE sc INTO '" + output2 +
-// " 'USING org.apache.pig.piggybank.storage.avro.AvroStorage ('index', '2');"
-// };
-// testAvroStorage( queries);
-// verifyResults(output1, expected1);
-// verifyResults(output2, expected2);
-// }
-//
-// @Test
-// public void testRecordWithFieldSchema() throws IOException {
-// String output= outbasedir + "testRecordWithFieldSchema";
-// String expected = basedir + "expected_testRecordWithFieldSchema.avro";
-// deleteDirectory(new File(output));
-// String [] queries = {
-// " avro = LOAD '" + testRecordFile + " ' USING org.apache.pig.piggybank.storage.avro.AvroStorage ();",
-// " avro1 = FILTER avro BY member_id > 1211;",
-// " avro2 = FOREACH avro1 GENERATE member_id, browser_id, tracking_time, act_content ;",
-// " STORE avro2 INTO '" + output + "' " +
-// " USING org.apache.pig.piggybank.storage.avro.AvroStorage (" +
-// "'{\"data\": \"" + testRecordFile + "\" ," +
-// " \"field0\": \"int\", " +
-// " \"field1\": \"def:browser_id\", " +
-// " \"field3\": \"def:act_content\" " +
-// " }');"
-// };
-// testAvroStorage( queries);
-// verifyResults(output, expected);
-// }
private static void deleteDirectory (File path) {
if ( path.exists()) {