You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ro...@apache.org on 2013/08/16 01:10:46 UTC

svn commit: r1514537 - in /pig/trunk: CHANGES.txt src/org/apache/pig/builtin/AvroStorage.java src/org/apache/pig/builtin/TrevniStorage.java test/org/apache/pig/builtin/TestAvroStorage.java

Author: rohini
Date: Thu Aug 15 23:10:45 2013
New Revision: 1514537

URL: http://svn.apache.org/r1514537
Log:
PIG-3422: AvroStorage failed to read paths separated by commas (yuanlid via rohini)

Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/org/apache/pig/builtin/AvroStorage.java
    pig/trunk/src/org/apache/pig/builtin/TrevniStorage.java
    pig/trunk/test/org/apache/pig/builtin/TestAvroStorage.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1514537&r1=1514536&r2=1514537&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Thu Aug 15 23:10:45 2013
@@ -212,6 +212,8 @@ PIG-3013: BinInterSedes improve chararra
 
 BUG FIXES
 
+PIG-3422: AvroStorage failed to read paths separated by commas (yuanlid via rohini)
+
 PIG-3420: Failed to retrieve map values from data loaded by AvroStorage (yuanlid via rohini)
 
 PIG-3414: QueryParserDriver.parseSchema(String) silently returns a wrong result when a comma is missing in the schema definition (cheolsoo)

Modified: pig/trunk/src/org/apache/pig/builtin/AvroStorage.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/builtin/AvroStorage.java?rev=1514537&r1=1514536&r2=1514537&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/builtin/AvroStorage.java (original)
+++ pig/trunk/src/org/apache/pig/builtin/AvroStorage.java Thu Aug 15 23:10:45 2013
@@ -21,6 +21,7 @@ package org.apache.pig.builtin;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InputStream;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Comparator;
 import java.util.List;
@@ -269,9 +270,15 @@ public class AvroStorage extends LoadFun
    * @throws IOException
    *
    */
-  protected final  Schema getAvroSchema(final String location,
-      final Job job) throws IOException {
-    return getAvroSchema(new Path(location), job);
+  protected final Schema getAvroSchema(final String location, final Job job)
+      throws IOException {
+    String[] locations = getPathStrings(location);
+    Path[] paths = new Path[locations.length];
+    for (int i = 0; i < paths.length; ++i) {
+      paths[i] = new Path(locations[i]);
+    }
+
+    return getAvroSchema(paths, job);
   }
 
   /**
@@ -285,18 +292,24 @@ public class AvroStorage extends LoadFun
   };
 
   /**
-   * Reads the avro schema at the specified location.
+   * Reads the avro schemas at the specified location.
    * @param p Location of file
    * @param job Hadoop job object
    * @return an Avro Schema object derived from the specified file
    * @throws IOException
    *
    */
-  public Schema getAvroSchema(final Path p, final Job job)
-      throws IOException {
+  public Schema getAvroSchema(final Path[] p, final Job job) throws IOException {
     GenericDatumReader<Object> avroReader = new GenericDatumReader<Object>();
-    FileSystem fs = FileSystem.get(p.toUri(), job.getConfiguration());
-    FileStatus[] statusArray = fs.globStatus(p);
+    ArrayList<FileStatus> statusList = new ArrayList<FileStatus>();
+    FileSystem fs = FileSystem.get(p[0].toUri(), job.getConfiguration());
+    for (Path temp : p) {
+      for (FileStatus tempf : fs.globStatus(temp)) {
+        statusList.add(tempf);
+      }
+    }
+    FileStatus[] statusArray = (FileStatus[]) statusList
+        .toArray(new FileStatus[statusList.size()]);
 
     if (statusArray == null) {
       throw new IOException("Path " + p.toString() + " does not exist.");
@@ -313,8 +326,8 @@ public class AvroStorage extends LoadFun
     }
 
     InputStream hdfsInputStream = fs.open(filePath);
-    DataFileStream<Object> avroDataStream =
-        new DataFileStream<Object>(hdfsInputStream, avroReader);
+    DataFileStream<Object> avroDataStream = new DataFileStream<Object>(
+        hdfsInputStream, avroReader);
     Schema s = avroDataStream.getSchema();
     avroDataStream.close();
     return s;

Modified: pig/trunk/src/org/apache/pig/builtin/TrevniStorage.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/builtin/TrevniStorage.java?rev=1514537&r1=1514536&r2=1514537&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/builtin/TrevniStorage.java (original)
+++ pig/trunk/src/org/apache/pig/builtin/TrevniStorage.java Thu Aug 15 23:10:45 2013
@@ -20,6 +20,7 @@ package org.apache.pig.builtin;
 
 import java.io.IOException;
 import java.io.OutputStream;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map.Entry;
 
@@ -302,10 +303,17 @@ public class TrevniStorage extends AvroS
   }
 
   @Override
-  public  Schema getAvroSchema(Path p, final Job job) throws IOException {
+  public  Schema getAvroSchema(Path p[], final Job job) throws IOException {
 
-    FileSystem fs = FileSystem.get(p.toUri(), job.getConfiguration());
-    FileStatus[] statusArray = fs.globStatus(p, VISIBLE_FILES);
+    ArrayList<FileStatus> statusList = new ArrayList<FileStatus>();
+    FileSystem fs = FileSystem.get(p[0].toUri(), job.getConfiguration());
+    for (Path temp : p) {
+      for (FileStatus tempf : fs.globStatus(temp, VISIBLE_FILES)) {
+        statusList.add(tempf);
+      }
+    }
+    FileStatus[] statusArray = (FileStatus[]) statusList
+        .toArray(new FileStatus[statusList.size()]);
 
     if (statusArray == null) {
       throw new IOException("Path " + p.toString() + " does not exist.");

Modified: pig/trunk/test/org/apache/pig/builtin/TestAvroStorage.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/builtin/TestAvroStorage.java?rev=1514537&r1=1514536&r2=1514537&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/builtin/TestAvroStorage.java (original)
+++ pig/trunk/test/org/apache/pig/builtin/TestAvroStorage.java Thu Aug 15 23:10:45 2013
@@ -647,6 +647,25 @@ public class TestAvroStorage {
       verifyResults(createOutputName(),check);
     }
 
+    @Test public void testSeparatedByComma() throws Exception {
+        final String temp = basedir
+                + "data/avro/uncompressed/testdirectory/part-m-0000";
+        StringBuffer sb = new StringBuffer();
+        sb.append(temp + "0.avro");
+        for (int i = 1; i <= 7; ++i) {
+            sb.append(",");
+            sb.append(temp + String.valueOf(i) + ".avro");
+        }
+        final String input = sb.toString();
+        final String check = basedir
+                + "data/avro/uncompressed/testDirectoryCounts.avro";
+        testAvroStorage(true, basedir + "code/pig/directory_test.pig",
+                ImmutableMap.of("INFILE", input, "AVROSTORAGE_OUT_1", "stats",
+                        "AVROSTORAGE_OUT_2", "-n org.apache.pig.test.builtin",
+                        "OUTFILE", createOutputName()));
+        verifyResults(createOutputName(), check);
+    }
+
     @Test public void testDoubleUnderscore() throws Exception {
       final String input = basedir + "data/avro/uncompressed/records.avro";
       final String check = basedir + "data/avro/uncompressed/recordsWithDoubleUnderscores.avro";