You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ro...@apache.org on 2013/08/16 01:10:46 UTC
svn commit: r1514537 - in /pig/trunk: CHANGES.txt
src/org/apache/pig/builtin/AvroStorage.java
src/org/apache/pig/builtin/TrevniStorage.java
test/org/apache/pig/builtin/TestAvroStorage.java
Author: rohini
Date: Thu Aug 15 23:10:45 2013
New Revision: 1514537
URL: http://svn.apache.org/r1514537
Log:
PIG-3422: AvroStorage failed to read paths separated by commas (yuanlid via rohini)
Modified:
pig/trunk/CHANGES.txt
pig/trunk/src/org/apache/pig/builtin/AvroStorage.java
pig/trunk/src/org/apache/pig/builtin/TrevniStorage.java
pig/trunk/test/org/apache/pig/builtin/TestAvroStorage.java
Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1514537&r1=1514536&r2=1514537&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Thu Aug 15 23:10:45 2013
@@ -212,6 +212,8 @@ PIG-3013: BinInterSedes improve chararra
BUG FIXES
+PIG-3422: AvroStorage failed to read paths separated by commas (yuanlid via rohini)
+
PIG-3420: Failed to retrieve map values from data loaded by AvroStorage (yuanlid via rohini)
PIG-3414: QueryParserDriver.parseSchema(String) silently returns a wrong result when a comma is missing in the schema definition (cheolsoo)
Modified: pig/trunk/src/org/apache/pig/builtin/AvroStorage.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/builtin/AvroStorage.java?rev=1514537&r1=1514536&r2=1514537&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/builtin/AvroStorage.java (original)
+++ pig/trunk/src/org/apache/pig/builtin/AvroStorage.java Thu Aug 15 23:10:45 2013
@@ -21,6 +21,7 @@ package org.apache.pig.builtin;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
@@ -269,9 +270,15 @@ public class AvroStorage extends LoadFun
* @throws IOException
*
*/
- protected final Schema getAvroSchema(final String location,
- final Job job) throws IOException {
- return getAvroSchema(new Path(location), job);
+ protected final Schema getAvroSchema(final String location, final Job job)
+ throws IOException {
+ String[] locations = getPathStrings(location);
+ Path[] paths = new Path[locations.length];
+ for (int i = 0; i < paths.length; ++i) {
+ paths[i] = new Path(locations[i]);
+ }
+
+ return getAvroSchema(paths, job);
}
/**
@@ -285,18 +292,24 @@ public class AvroStorage extends LoadFun
};
/**
- * Reads the avro schema at the specified location.
+ * Reads the avro schemas at the specified location.
* @param p Location of file
* @param job Hadoop job object
* @return an Avro Schema object derived from the specified file
* @throws IOException
*
*/
- public Schema getAvroSchema(final Path p, final Job job)
- throws IOException {
+ public Schema getAvroSchema(final Path[] p, final Job job) throws IOException {
GenericDatumReader<Object> avroReader = new GenericDatumReader<Object>();
- FileSystem fs = FileSystem.get(p.toUri(), job.getConfiguration());
- FileStatus[] statusArray = fs.globStatus(p);
+ ArrayList<FileStatus> statusList = new ArrayList<FileStatus>();
+ FileSystem fs = FileSystem.get(p[0].toUri(), job.getConfiguration());
+ for (Path temp : p) {
+ for (FileStatus tempf : fs.globStatus(temp)) {
+ statusList.add(tempf);
+ }
+ }
+ FileStatus[] statusArray = (FileStatus[]) statusList
+ .toArray(new FileStatus[statusList.size()]);
if (statusArray == null) {
throw new IOException("Path " + p.toString() + " does not exist.");
@@ -313,8 +326,8 @@ public class AvroStorage extends LoadFun
}
InputStream hdfsInputStream = fs.open(filePath);
- DataFileStream<Object> avroDataStream =
- new DataFileStream<Object>(hdfsInputStream, avroReader);
+ DataFileStream<Object> avroDataStream = new DataFileStream<Object>(
+ hdfsInputStream, avroReader);
Schema s = avroDataStream.getSchema();
avroDataStream.close();
return s;
Modified: pig/trunk/src/org/apache/pig/builtin/TrevniStorage.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/builtin/TrevniStorage.java?rev=1514537&r1=1514536&r2=1514537&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/builtin/TrevniStorage.java (original)
+++ pig/trunk/src/org/apache/pig/builtin/TrevniStorage.java Thu Aug 15 23:10:45 2013
@@ -20,6 +20,7 @@ package org.apache.pig.builtin;
import java.io.IOException;
import java.io.OutputStream;
+import java.util.ArrayList;
import java.util.List;
import java.util.Map.Entry;
@@ -302,10 +303,17 @@ public class TrevniStorage extends AvroS
}
@Override
- public Schema getAvroSchema(Path p, final Job job) throws IOException {
+ public Schema getAvroSchema(Path p[], final Job job) throws IOException {
- FileSystem fs = FileSystem.get(p.toUri(), job.getConfiguration());
- FileStatus[] statusArray = fs.globStatus(p, VISIBLE_FILES);
+ ArrayList<FileStatus> statusList = new ArrayList<FileStatus>();
+ FileSystem fs = FileSystem.get(p[0].toUri(), job.getConfiguration());
+ for (Path temp : p) {
+ for (FileStatus tempf : fs.globStatus(temp, VISIBLE_FILES)) {
+ statusList.add(tempf);
+ }
+ }
+ FileStatus[] statusArray = (FileStatus[]) statusList
+ .toArray(new FileStatus[statusList.size()]);
if (statusArray == null) {
throw new IOException("Path " + p.toString() + " does not exist.");
Modified: pig/trunk/test/org/apache/pig/builtin/TestAvroStorage.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/builtin/TestAvroStorage.java?rev=1514537&r1=1514536&r2=1514537&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/builtin/TestAvroStorage.java (original)
+++ pig/trunk/test/org/apache/pig/builtin/TestAvroStorage.java Thu Aug 15 23:10:45 2013
@@ -647,6 +647,25 @@ public class TestAvroStorage {
verifyResults(createOutputName(),check);
}
+ @Test public void testSeparatedByComma() throws Exception {
+ final String temp = basedir
+ + "data/avro/uncompressed/testdirectory/part-m-0000";
+ StringBuffer sb = new StringBuffer();
+ sb.append(temp + "0.avro");
+ for (int i = 1; i <= 7; ++i) {
+ sb.append(",");
+ sb.append(temp + String.valueOf(i) + ".avro");
+ }
+ final String input = sb.toString();
+ final String check = basedir
+ + "data/avro/uncompressed/testDirectoryCounts.avro";
+ testAvroStorage(true, basedir + "code/pig/directory_test.pig",
+ ImmutableMap.of("INFILE", input, "AVROSTORAGE_OUT_1", "stats",
+ "AVROSTORAGE_OUT_2", "-n org.apache.pig.test.builtin",
+ "OUTFILE", createOutputName()));
+ verifyResults(createOutputName(), check);
+ }
+
@Test public void testDoubleUnderscore() throws Exception {
final String input = basedir + "data/avro/uncompressed/records.avro";
final String check = basedir + "data/avro/uncompressed/recordsWithDoubleUnderscores.avro";