You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2011/07/12 19:11:37 UTC

svn commit: r1145681 - in /pig/branches/branch-0.9: ./ contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/ contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/

Author: daijy
Date: Tue Jul 12 17:11:37 2011
New Revision: 1145681

URL: http://svn.apache.org/viewvc?rev=1145681&view=rev
Log:
PIG-1890: Fix piggybank unit test TestAvroStorage

Modified:
    pig/branches/branch-0.9/CHANGES.txt
    pig/branches/branch-0.9/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorage.java
    pig/branches/branch-0.9/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorageUtils.java
    pig/branches/branch-0.9/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigSchema2Avro.java
    pig/branches/branch-0.9/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/TestAvroStorage.java

Modified: pig/branches/branch-0.9/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.9/CHANGES.txt?rev=1145681&r1=1145680&r2=1145681&view=diff
==============================================================================
--- pig/branches/branch-0.9/CHANGES.txt (original)
+++ pig/branches/branch-0.9/CHANGES.txt Tue Jul 12 17:11:37 2011
@@ -192,6 +192,8 @@ PIG-1696: Performance: Use System.arrayc
 
 BUG FIXES
 
+PIG-1890: Fix piggybank unit test TestAvroStorage (kengoodhope via daijy)
+
 PIG-2144: ClassCastException when using IsEmpty(DIFF()) (thejas)
 
 PIG-2139: LogicalExpressionSimplifier optimizer rule should check if udf is

Modified: pig/branches/branch-0.9/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorage.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.9/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorage.java?rev=1145681&r1=1145680&r2=1145681&view=diff
==============================================================================
--- pig/branches/branch-0.9/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorage.java (original)
+++ pig/branches/branch-0.9/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorage.java Tue Jul 12 17:11:37 2011
@@ -26,6 +26,7 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Properties;
 import java.util.Set;
+import java.util.HashSet;
 import org.apache.avro.Schema;
 import org.apache.avro.Schema.Field;
 import org.apache.avro.file.DataFileStream;
@@ -42,6 +43,7 @@ import org.apache.hadoop.mapreduce.Recor
 import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.pig.Expression;
 import org.apache.pig.FileInputLoadFunc;
 import org.apache.pig.LoadFunc;
@@ -128,7 +130,9 @@ public class AvroStorage extends FileInp
      */
     @Override
     public void setLocation(String location, Job job) throws IOException {
-        if(AvroStorageUtils.addInputPaths(location, job) && inputAvroSchema == null) {
+        HashSet<Path> paths = new HashSet<Path>();
+    	if(AvroStorageUtils.getAllSubDirs(new Path(location), job, paths) && inputAvroSchema == null) {
+            FileInputFormat.setInputPaths(job, paths.toArray(new Path[0]));
             inputAvroSchema = getAvroSchema(location, job);
         }
     }
@@ -188,6 +192,8 @@ public class AvroStorage extends FileInp
         
         if (schema == null)
             System.err.println("Cannot get avro schema! Input path " + path + " might be empty.");
+        
+        System.err.println(schema.toString());
         return schema;
     }
 
@@ -211,6 +217,7 @@ public class AvroStorage extends FileInp
         DataFileStream<Object> avroDataStream = new DataFileStream<Object>(hdfsInputStream, avroReader);
         Schema ret = avroDataStream.getSchema();
         avroDataStream.close();
+
         return ret;
     }
 
@@ -267,6 +274,9 @@ public class AvroStorage extends FileInp
             /* convert to pig schema */
             ResourceSchema pigSchema = AvroSchema2Pig.convert(inputAvroSchema);
             AvroStorageLog.details("pig input schema:" + pigSchema);
+            if (pigSchema.getFields().length == 1){
+            	pigSchema = pigSchema.getFields()[0].getSchema();
+            }
             return pigSchema;
         } else
             return null;
@@ -567,7 +577,7 @@ public class AvroStorage extends FileInp
     @Override
     public void putNext(Tuple t) throws IOException {
         try {
-            this.writer.write(NullWritable.get(), t);
+            this.writer.write(NullWritable.get(), t.getAll().size() == 1 ? t.get(0) : t);
         } catch (InterruptedException e) {
             e.printStackTrace();
         }

Modified: pig/branches/branch-0.9/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorageUtils.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.9/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorageUtils.java?rev=1145681&r1=1145680&r2=1145681&view=diff
==============================================================================
--- pig/branches/branch-0.9/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorageUtils.java (original)
+++ pig/branches/branch-0.9/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorageUtils.java Tue Jul 12 17:11:37 2011
@@ -17,8 +17,11 @@
 
 package org.apache.pig.piggybank.storage.avro;
 
+import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
@@ -35,6 +38,8 @@ import org.apache.hadoop.mapreduce.lib.i
 import org.apache.pig.ResourceSchema;
 import org.apache.pig.ResourceSchema.ResourceFieldSchema;
 import org.apache.pig.data.DataType;
+import org.apache.pig.piggybank.storage.avro.AvroStorageLog;
+
 
 /**
  * This is utility class for this package
@@ -87,54 +92,51 @@ public class AvroStorageUtils {
     }
 
     /**
-     * get input paths to job config 
+     * get input paths to job config
      */
     public static boolean addInputPaths(String pathString, Job job)
-                                    throws IOException {
-        Configuration conf = job.getConfiguration();
-        FileSystem fs = FileSystem.get(conf);
-        
-        Path path = new Path(pathString);
-        FileStatus pathStatus = fs.getFileStatus(path);
-        
-        List<FileStatus> input = new LinkedList<FileStatus>();
-        if (PATH_FILTER.accept(path)) { // remove input path with leading "." or "_"
-            input.add(pathStatus);
-        }
-
-        boolean ret = false;
-        while (!input.isEmpty()) {
-
-            FileStatus status = input.remove(0);
-            Path p = status.getPath();
-            
-            if (!status.isDir() ) {
-                AvroStorageLog.details("Add input path:" + p);
-                FileInputFormat.addInputPath(job, p);
-                ret = true;
-            } 
-            else {
-                /*list all sub-dirs*/
-                FileStatus[] ss = fs.listStatus(p, PATH_FILTER); 
-            
-                if (ss.length > 0) {
-                    if (noDir(ss) ) {
-                        AvroStorageLog.details("Add input path:" + p);
-                        FileInputFormat.addInputPath(job, p);
-                        ret = true;
-                    }
-                    else {
-                        input.addAll(Arrays.asList(ss));
-                        ret = true;
-                    }
-
-                }
-            }
-        }
+        throws IOException
+    {
+      Configuration conf = job.getConfiguration();
+      FileSystem fs = FileSystem.get(conf);
+      HashSet<Path> paths = new  HashSet<Path>();
+      if (getAllSubDirs(new Path(pathString), job, paths))
+      {
+        paths.addAll(Arrays.asList(FileInputFormat.getInputPaths(job)));
+        FileInputFormat.setInputPaths(job, paths.toArray(new Path[0]));
+        return true;
+      }
+      return false;
 
-        return ret;
     }
 
+    /**
+     * Adds all non-hidden directories and subdirectories to set param
+     * 
+     * @throws IOException
+     */
+     static boolean getAllSubDirs(Path path, Job job, Set<Path> paths) throws IOException {
+  		FileSystem fs = FileSystem.get(job.getConfiguration());
+  		if (PATH_FILTER.accept(path)) {
+  			try {
+  				FileStatus file = fs.getFileStatus(path);
+  				if (file.isDir()) {
+  					for (FileStatus sub : fs.listStatus(path)) {
+  						getAllSubDirs(sub.getPath(), job, paths);
+  					}
+  				} else {
+  					AvroStorageLog.details("Add input file:" + file);
+  					paths.add(file.getPath());
+  				}
+  			} catch (FileNotFoundException e) {
+  				AvroStorageLog.details("Input path does not exist: " + path);
+  				return false;
+  			}
+  			return true;
+  		}
+  		return false;
+  	}
+     
     /** check whether there is NO directory in the input file (status) list*/
     public static boolean noDir(FileStatus [] ss) {
         for (FileStatus s : ss) {

Modified: pig/branches/branch-0.9/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigSchema2Avro.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.9/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigSchema2Avro.java?rev=1145681&r1=1145680&r2=1145681&view=diff
==============================================================================
--- pig/branches/branch-0.9/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigSchema2Avro.java (original)
+++ pig/branches/branch-0.9/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigSchema2Avro.java Tue Jul 12 17:11:37 2011
@@ -54,15 +54,10 @@ public class PigSchema2Avro {
         ResourceFieldSchema[] pigFields = pigSchema.getFields();
 
         /* remove the pig tuple wrapper */
-        if (pigFields.length == 1 && AvroStorageUtils.isTupleWrapper(pigFields[0])) {
+        if (pigFields.length == 1) {
 
             AvroStorageLog.details("Ignore the pig tuple wrapper.");
-            ResourceFieldSchema[] listSchemas = pigFields[0].getSchema()
-                                            .getFields();
-            if (listSchemas.length != 1)
-                throw new IOException("Expect one subfield from " + pigFields);
-
-            return convert(listSchemas[0], nullable);
+            return convert(pigFields[0], nullable);
         } else
             return convertRecord(pigFields, nullable);
     }

Modified: pig/branches/branch-0.9/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/TestAvroStorage.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.9/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/TestAvroStorage.java?rev=1145681&r1=1145680&r2=1145681&view=diff
==============================================================================
--- pig/branches/branch-0.9/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/TestAvroStorage.java (original)
+++ pig/branches/branch-0.9/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/TestAvroStorage.java Tue Jul 12 17:11:37 2011
@@ -76,121 +76,116 @@ public class TestAvroStorage {
     }
 
     @Test
-    public void testDummy() {
-        // Dummy test, will remove after PIG-1890 check in. Otherwise, Junit will complain "No runnable methods"
+    public void testArrayDefault() throws IOException {
+        String output= outbasedir + "testArrayDefault";
+        String expected = basedir + "expected_testArrayDefault.avro";
+        
+        deleteDirectory(new File(output));
+        
+        String [] queries = {
+           " in = LOAD '" + testArrayFile + " ' USING org.apache.pig.piggybank.storage.avro.AvroStorage ();",
+           " STORE in INTO '" + output + "' USING org.apache.pig.piggybank.storage.avro.AvroStorage ();"
+            };
+        testAvroStorage( queries);
+        verifyResults(output, expected);
+    }
+
+
+    @Test
+    public void testArrayWithSchema() throws IOException {
+        String output= outbasedir + "testArrayWithSchema";
+        String expected = basedir + "expected_testArrayWithSchema.avro";
+        deleteDirectory(new File(output));
+        String [] queries = {
+           " in = LOAD '" + testArrayFile + " ' USING org.apache.pig.piggybank.storage.avro.AvroStorage ();",
+           " STORE in INTO '" + output +
+               "' USING org.apache.pig.piggybank.storage.avro.AvroStorage ( "  +
+               "   'schema', '{\"type\":\"array\",\"items\":\"float\"}'  );"
+            };
+        testAvroStorage( queries);
+        verifyResults(output, expected);
+    }
+    
+    @Test
+    public void testArrayWithNotNull() throws IOException {
+        String output= outbasedir + "testArrayWithNotNull";
+        String expected = basedir + "expected_testArrayWithSchema.avro";
+        deleteDirectory(new File(output));
+        String [] queries = {
+           " in = LOAD '" + testArrayFile + " ' USING org.apache.pig.piggybank.storage.avro.AvroStorage ();",
+           " STORE in INTO '" + output +
+               "' USING org.apache.pig.piggybank.storage.avro.AvroStorage ( "  +
+               "   '{\"nullable\": false }'  );"
+            };
+        testAvroStorage( queries);
+        verifyResults(output, expected);
+    }
+    
+    @Test
+    public void testArrayWithSame() throws IOException {
+        String output= outbasedir + "testArrayWithSame";
+        String expected = basedir + "expected_testArrayWithSchema.avro";
+        deleteDirectory(new File(output));
+        String [] queries = {
+           " in = LOAD '" + testArrayFile + " ' USING org.apache.pig.piggybank.storage.avro.AvroStorage ();",
+           " STORE in INTO '" + output +
+               "' USING org.apache.pig.piggybank.storage.avro.AvroStorage ( "  +
+               "   'same', '" + testArrayFile + "'  );"
+            };
+        testAvroStorage(queries);
+        verifyResults(output, expected);
+    }
+
+    @Test
+    public void testRecordWithSplit() throws IOException {
+        String output1= outbasedir + "testRecordSplit1";
+        String output2= outbasedir + "testRecordSplit2";
+        String expected1 = basedir + "expected_testRecordSplit1.avro";
+        String expected2 = basedir + "expected_testRecordSplit2.avro";
+        deleteDirectory(new File(output1));
+        deleteDirectory(new File(output2));
+        String [] queries = {
+           " avro = LOAD '" + testRecordFile + " ' USING org.apache.pig.piggybank.storage.avro.AvroStorage ();",
+           " groups = GROUP avro BY member_id;",
+           " sc = FOREACH groups GENERATE group AS key, COUNT(avro) AS cnt;",
+           " STORE sc INTO '" + output1 + "' " +
+                 " USING org.apache.pig.piggybank.storage.avro.AvroStorage (" +
+                 "'{\"index\": 1, " +
+                 "  \"schema\": {\"type\":\"record\", " +
+                                        " \"name\":\"result\", " +
+                                       "  \"fields\":[ {\"name\":\"member_id\",\"type\":\"int\"}, " +
+                                                             "{\"name\":\"count\", \"type\":\"long\"} " +
+                                                          "]" +
+                                         "}" +
+                " }');",
+            " STORE sc INTO '" + output2 +
+                    " 'USING org.apache.pig.piggybank.storage.avro.AvroStorage ('index', '2');"
+            };
+        testAvroStorage( queries);
+        verifyResults(output1, expected1);
+        verifyResults(output2, expected2);
+    }
+    
+    @Test
+    public void testRecordWithFieldSchema() throws IOException {
+        String output= outbasedir + "testRecordWithFieldSchema";
+        String expected = basedir + "expected_testRecordWithFieldSchema.avro";
+        deleteDirectory(new File(output));
+        String [] queries = {
+           " avro = LOAD '" + testRecordFile + " ' USING org.apache.pig.piggybank.storage.avro.AvroStorage ();",
+           " avro1 = FILTER avro BY member_id > 1211;",
+           " avro2 = FOREACH avro1 GENERATE member_id, browser_id, tracking_time, act_content ;",
+           " STORE avro2 INTO '" + output + "' " +
+                 " USING org.apache.pig.piggybank.storage.avro.AvroStorage (" +
+                 "'{\"data\":  \"" + testRecordFile + "\" ," +
+                 "  \"field0\": \"int\", " +
+                  " \"field1\":  \"def:browser_id\", " +
+                 "  \"field3\": \"def:act_content\" " +
+                " }');"
+            };
+        testAvroStorage( queries);
+        verifyResults(output, expected);
     }
-// Comment out all test cases until PIG-1890 fixed
-//    @Test
-//    public void testArrayDefault() throws IOException {
-//        String output= outbasedir + "testArrayDefault";
-//        String expected = basedir + "expected_testArrayDefault.avro";
-//        
-//        deleteDirectory(new File(output));
-//        
-//        String [] queries = {
-//           " in = LOAD '" + testArrayFile + " ' USING org.apache.pig.piggybank.storage.avro.AvroStorage ();",
-//           " STORE in INTO '" + output + "' USING org.apache.pig.piggybank.storage.avro.AvroStorage ();"
-//            };
-//        testAvroStorage( queries);
-//        verifyResults(output, expected);
-//    }
-//
-//
-//    @Test
-//    public void testArrayWithSchema() throws IOException {
-//        String output= outbasedir + "testArrayWithSchema";
-//        String expected = basedir + "expected_testArrayWithSchema.avro";
-//        deleteDirectory(new File(output));
-//        String [] queries = {
-//           " in = LOAD '" + testArrayFile + " ' USING org.apache.pig.piggybank.storage.avro.AvroStorage ();",
-//           " STORE in INTO '" + output +
-//               "' USING org.apache.pig.piggybank.storage.avro.AvroStorage ( "  +
-//               "   'schema', '{\"type\":\"array\",\"items\":\"float\"}'  );"
-//            };
-//        testAvroStorage( queries);
-//        verifyResults(output, expected);
-//    }
-//    
-//    @Test
-//    public void testArrayWithNotNull() throws IOException {
-//        String output= outbasedir + "testArrayWithNotNull";
-//        String expected = basedir + "expected_testArrayWithSchema.avro";
-//        deleteDirectory(new File(output));
-//        String [] queries = {
-//           " in = LOAD '" + testArrayFile + " ' USING org.apache.pig.piggybank.storage.avro.AvroStorage ();",
-//           " STORE in INTO '" + output +
-//               "' USING org.apache.pig.piggybank.storage.avro.AvroStorage ( "  +
-//               "   '{\"nullable\": false }'  );"
-//            };
-//        testAvroStorage( queries);
-//        verifyResults(output, expected);
-//    }
-//    
-//    @Test
-//    public void testArrayWithSame() throws IOException {
-//        String output= outbasedir + "testArrayWithSame";
-//        String expected = basedir + "expected_testArrayWithSchema.avro";
-//        deleteDirectory(new File(output));
-//        String [] queries = {
-//           " in = LOAD '" + testArrayFile + " ' USING org.apache.pig.piggybank.storage.avro.AvroStorage ();",
-//           " STORE in INTO '" + output +
-//               "' USING org.apache.pig.piggybank.storage.avro.AvroStorage ( "  +
-//               "   'same', '" + testArrayFile + "'  );"
-//            };
-//        testAvroStorage(queries);
-//        verifyResults(output, expected);
-//    }
-//
-//    @Test
-//    public void testRecordWithSplit() throws IOException {
-//        String output1= outbasedir + "testRecordSplit1";
-//        String output2= outbasedir + "testRecordSplit2";
-//        String expected1 = basedir + "expected_testRecordSplit1.avro";
-//        String expected2 = basedir + "expected_testRecordSplit2.avro";
-//        deleteDirectory(new File(output1));
-//        deleteDirectory(new File(output2));
-//        String [] queries = {
-//           " avro = LOAD '" + testRecordFile + " ' USING org.apache.pig.piggybank.storage.avro.AvroStorage ();",
-//           " groups = GROUP avro BY member_id;",
-//           " sc = FOREACH groups GENERATE group AS key, COUNT(avro) AS cnt;",
-//           " STORE sc INTO '" + output1 + "' " +
-//                 " USING org.apache.pig.piggybank.storage.avro.AvroStorage (" +
-//                 "'{\"index\": 1, " +
-//                 "  \"schema\": {\"type\":\"record\", " +
-//                                        " \"name\":\"result\", " +
-//                                       "  \"fields\":[ {\"name\":\"member_id\",\"type\":\"int\"}, " +
-//                                                             "{\"name\":\"count\", \"type\":\"long\"} " +
-//                                                          "]" +
-//                                         "}" +
-//                " }');",
-//            " STORE sc INTO '" + output2 +
-//                    " 'USING org.apache.pig.piggybank.storage.avro.AvroStorage ('index', '2');"
-//            };
-//        testAvroStorage( queries);
-//        verifyResults(output1, expected1);
-//        verifyResults(output2, expected2);
-//    }
-//    
-//    @Test
-//    public void testRecordWithFieldSchema() throws IOException {
-//        String output= outbasedir + "testRecordWithFieldSchema";
-//        String expected = basedir + "expected_testRecordWithFieldSchema.avro";
-//        deleteDirectory(new File(output));
-//        String [] queries = {
-//           " avro = LOAD '" + testRecordFile + " ' USING org.apache.pig.piggybank.storage.avro.AvroStorage ();",
-//           " avro1 = FILTER avro BY member_id > 1211;",
-//           " avro2 = FOREACH avro1 GENERATE member_id, browser_id, tracking_time, act_content ;",
-//           " STORE avro2 INTO '" + output + "' " +
-//                 " USING org.apache.pig.piggybank.storage.avro.AvroStorage (" +
-//                 "'{\"data\":  \"" + testRecordFile + "\" ," +
-//                 "  \"field0\": \"int\", " +
-//                  " \"field1\":  \"def:browser_id\", " +
-//                 "  \"field3\": \"def:act_content\" " +
-//                " }');"
-//            };
-//        testAvroStorage( queries);
-//        verifyResults(output, expected);
-//    }
     
     private static void deleteDirectory (File path) {
         if ( path.exists()) {