You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ch...@apache.org on 2014/04/04 23:20:54 UTC

svn commit: r1584916 - in /pig/branches/branch-0.12: ./ contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/ contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/

Author: cheolsoo
Date: Fri Apr  4 21:20:54 2014
New Revision: 1584916

URL: http://svn.apache.org/r1584916
Log:
PIG-3661: Piggybank AvroStorage fails if used in more than one load or store statement (rohini via cheolsoo)

Modified:
    pig/branches/branch-0.12/CHANGES.txt
    pig/branches/branch-0.12/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorage.java
    pig/branches/branch-0.12/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorageUtils.java
    pig/branches/branch-0.12/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroInputFormat.java
    pig/branches/branch-0.12/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroRecordReader.java
    pig/branches/branch-0.12/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/TestAvroStorage.java
    pig/branches/branch-0.12/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/TestAvroStorageUtils.java

Modified: pig/branches/branch-0.12/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.12/CHANGES.txt?rev=1584916&r1=1584915&r2=1584916&view=diff
==============================================================================
--- pig/branches/branch-0.12/CHANGES.txt (original)
+++ pig/branches/branch-0.12/CHANGES.txt Fri Apr  4 21:20:54 2014
@@ -34,6 +34,8 @@ PIG-3480: TFile-based tmpfile compressio
 
 BUG FIXES
 
+PIG-3661: Piggybank AvroStorage fails if used in more than one load or store statement (rohini via cheolsoo)
+
 PIG-3819: e2e tests containing "perl -e "print $_;" fails on Hadoop 2 (daijy)
 
 PIG-3813: Rank column is assigned different uids everytime when schema is reset (cheolsoo)

Modified: pig/branches/branch-0.12/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorage.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.12/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorage.java?rev=1584916&r1=1584915&r2=1584916&view=diff
==============================================================================
--- pig/branches/branch-0.12/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorage.java (original)
+++ pig/branches/branch-0.12/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorage.java Fri Apr  4 21:20:54 2014
@@ -21,16 +21,14 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Properties;
 import java.util.Set;
-import java.util.HashSet;
-import java.net.URI;
 import org.apache.avro.Schema;
 import org.apache.avro.Schema.Field;
-import org.apache.avro.file.DataFileStream;
 import org.apache.avro.generic.GenericDatumReader;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
@@ -105,6 +103,7 @@ public class AvroStorage extends FileInp
 
     private boolean checkSchema = true; /*whether check schema of input directories*/
     private boolean ignoreBadFiles = false; /* whether ignore corrupted files during load */
+    private String contextSignature = null;
 
     /**
      * Empty constructor. Output schema is derived from pig schema.
@@ -151,11 +150,15 @@ public class AvroStorage extends FileInp
         if (inputAvroSchema != null) {
             return;
         }
-        Set<Path> paths = new HashSet<Path>();
+
         Configuration conf = job.getConfiguration();
-        if (AvroStorageUtils.getAllSubDirs(new Path(location), conf, paths)) {
+        Set<Path> paths = AvroStorageUtils.getPaths(location, conf, true);
+        if (!paths.isEmpty()) {
+            // Set top level directories in input format. Adding all files will
+            // bloat configuration size
+            FileInputFormat.setInputPaths(job, paths.toArray(new Path[paths.size()]));
+            // Scan all directories including sub directories for schema
             setInputAvroSchema(paths, conf);
-            FileInputFormat.setInputPaths(job, paths.toArray(new Path[0]));
         } else {
             throw new IOException("Input path \'" + location + "\' is not found");
         }
@@ -193,9 +196,17 @@ public class AvroStorage extends FileInp
         if (paths == null || paths.isEmpty()) {
             return null;
         }
-        Path path = paths.iterator().next();
-        FileSystem fs = FileSystem.get(path.toUri(), conf);
-        return getAvroSchema(path, fs);
+        Iterator<Path> iterator = paths.iterator();
+        Schema schema = null;
+        while (iterator.hasNext()) {
+            Path path = iterator.next();
+            FileSystem fs = FileSystem.get(path.toUri(), conf);
+            schema = getAvroSchema(path, fs);
+            if (schema != null) {
+                break;
+            }
+        }
+        return schema;
     }
 
     /**
@@ -237,7 +248,7 @@ public class AvroStorage extends FileInp
                         System.out.println("Do not check schema; use schema of " + s.getPath());
                         return schema;
                     }
-                } else if (!schema.equals(newSchema)) {
+                } else if (newSchema != null && !schema.equals(newSchema)) {
                     throw new IOException( "Input path is " + path + ". Sub-direcotry " + s.getPath()
                                          + " contains different schema " + newSchema + " than " + schema);
                 }
@@ -254,19 +265,23 @@ public class AvroStorage extends FileInp
      * Merge multiple input avro schemas into one. Note that we can't merge arbitrary schemas.
      * Please see AvroStorageUtils.mergeSchema() for what's allowed and what's not allowed.
      *
-     * @param paths  set of input files
+     * @param basePaths  set of input dir or files
      * @param conf  configuration
      * @return avro schema
      * @throws IOException
      */
-    protected Schema getMergedSchema(Set<Path> paths, Configuration conf) throws IOException {
+    protected Schema getMergedSchema(Set<Path> basePaths, Configuration conf) throws IOException {
         Schema result = null;
         Map<Path, Schema> mergedFiles = new HashMap<Path, Schema>();
+
+        Set<Path> paths = AvroStorageUtils.getAllFilesRecursively(basePaths, conf);
         for (Path path : paths) {
             FileSystem fs = FileSystem.get(path.toUri(), conf);
             Schema schema = getSchema(path, fs);
-            result = AvroStorageUtils.mergeSchema(result, schema);
-            mergedFiles.put(path, schema);
+            if (schema != null) {
+                result = AvroStorageUtils.mergeSchema(result, schema);
+                mergedFiles.put(path, schema);
+            }
         }
         // schemaToMergedSchemaMap is only needed when merging multiple records.
         if (mergedFiles.size() > 1 && result.getType().equals(Schema.Type.RECORD)) {
@@ -302,6 +317,9 @@ public class AvroStorage extends FileInp
     protected Schema getSchemaFromFile(Path path, FileSystem fs) throws IOException {
         /* get path of the last file */
         Path lastFile = AvroStorageUtils.getLast(path, fs);
+        if (lastFile == null) {
+            return null;
+        }
 
         /* read in file and obtain schema */
         GenericDatumReader<Object> avroReader = new GenericDatumReader<Object>();
@@ -360,9 +378,14 @@ public class AvroStorage extends FileInp
         /* get avro schema */
         AvroStorageLog.funcCall("getSchema");
         if (inputAvroSchema == null) {
-            Set<Path> paths = new HashSet<Path>();
             Configuration conf = job.getConfiguration();
-            if (AvroStorageUtils.getAllSubDirs(new Path(location), conf, paths)) {
+            // If within a script, you store to one location and read from same
+            // location using AvroStorage getPaths will be empty. Since
+            // getSchema is called during script parsing we don't want to fail
+            // here if path not found
+
+            Set<Path> paths = AvroStorageUtils.getPaths(location, conf, false);
+            if (!paths.isEmpty()) {
                 setInputAvroSchema(paths, conf);
             }
         }
@@ -617,8 +640,7 @@ public class AvroStorage extends FileInp
     @Override
     public void checkSchema(ResourceSchema s) throws IOException {
         AvroStorageLog.funcCall("Check schema");
-        UDFContext context = UDFContext.getUDFContext();
-        Properties property = context.getUDFProperties(ResourceSchema.class);
+        Properties property = getUDFProperties();
         String prevSchemaStr = property.getProperty(AVRO_OUTPUT_SCHEMA_PROPERTY);
         AvroStorageLog.details("Previously defined schemas=" + prevSchemaStr);
 
@@ -651,6 +673,14 @@ public class AvroStorage extends FileInp
         AvroStorageLog.details("New schemas=" + newSchemaStr);
     }
 
+    /**
+     * Returns UDFProperties based on <code>contextSignature</code>.
+     */
+    private Properties getUDFProperties() {
+        return UDFContext.getUDFContext()
+            .getUDFProperties(this.getClass(), new String[] {contextSignature});
+    }
+
     private String getSchemaKey() {
         return Integer.toString(storeFuncIndex);
     }
@@ -678,8 +708,7 @@ public class AvroStorage extends FileInp
     public OutputFormat getOutputFormat() throws IOException {
         AvroStorageLog.funcCall("getOutputFormat");
 
-        UDFContext context = UDFContext.getUDFContext();
-        Properties property = context.getUDFProperties(ResourceSchema.class);
+        Properties property = getUDFProperties();
         String allSchemaStr = property.getProperty(AVRO_OUTPUT_SCHEMA_PROPERTY);
         Map<String, String> map = (allSchemaStr != null)  ? parseSchemaMap(allSchemaStr) : null;
 
@@ -701,7 +730,7 @@ public class AvroStorage extends FileInp
 
     @Override
     public void setStoreFuncUDFContextSignature(String signature) {
-        // Nothing to do
+        this.contextSignature = signature;
     }
 
     @Override

Modified: pig/branches/branch-0.12/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorageUtils.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.12/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorageUtils.java?rev=1584916&r1=1584915&r2=1584916&view=diff
==============================================================================
--- pig/branches/branch-0.12/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorageUtils.java (original)
+++ pig/branches/branch-0.12/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorageUtils.java Fri Apr  4 21:20:54 2014
@@ -17,12 +17,10 @@
 
 package org.apache.pig.piggybank.storage.avro;
 
-import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedHashMap;
@@ -30,7 +28,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
-import java.net.URI;
 import org.apache.avro.Schema;
 import org.apache.avro.Schema.Field;
 import org.apache.avro.generic.GenericDatumReader;
@@ -40,14 +37,10 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.pig.LoadFunc;
 import org.apache.pig.ResourceSchema;
 import org.apache.pig.ResourceSchema.ResourceFieldSchema;
 import org.apache.pig.data.DataType;
-import org.apache.pig.piggybank.storage.avro.AvroStorageLog;
-
 import org.codehaus.jackson.JsonNode;
 /**
  * This is utility class for this package
@@ -100,51 +93,59 @@ public class AvroStorageUtils {
     }
 
     /**
-     * get input paths to job config
-     */
-    public static boolean addInputPaths(String pathString, Job job) throws IOException {
-      Configuration conf = job.getConfiguration();
-      FileSystem fs = FileSystem.get(conf);
-      HashSet<Path> paths = new  HashSet<Path>();
-      if (getAllSubDirs(new Path(pathString), conf, paths)) {
-        paths.addAll(Arrays.asList(FileInputFormat.getInputPaths(job)));
-        FileInputFormat.setInputPaths(job, paths.toArray(new Path[0]));
-        return true;
-      }
-      return false;
-    }
-
-    /**
-     * Adds all non-hidden directories and subdirectories to set param
-     * it supports comma-separated input paths and glob style path
+     * Gets the list of paths from the pathString specified which may contain
+     * comma-separated paths and glob style path
      *
      * @throws IOException
      */
-    public static boolean getAllSubDirs(Path path, Configuration conf,
-            Set<Path> paths) throws IOException {
-        String[] pathStrs = LoadFunc.getPathStrings(path.toString());
+    public static Set<Path> getPaths(String pathString, Configuration conf, boolean failIfNotFound)
+            throws IOException {
+        Set<Path> paths = new HashSet<Path>();
+        String[] pathStrs = LoadFunc.getPathStrings(pathString);
         for (String pathStr : pathStrs) {
             FileSystem fs = FileSystem.get(new Path(pathStr).toUri(), conf);
             FileStatus[] matchedFiles = fs.globStatus(new Path(pathStr), PATH_FILTER);
             if (matchedFiles == null || matchedFiles.length == 0) {
-                return false;
+                if (failIfNotFound) {
+                    throw new IOException("Input Pattern " + pathStr + " matches 0 files");
+                } else {
+                    continue;
+                }
             }
             for (FileStatus file : matchedFiles) {
-                getAllSubDirsInternal(file, conf, paths, fs);
+                paths.add(file.getPath());
             }
         }
-        return true;
+        return paths;
+    }
+
+    /**
+     * Returns all non-hidden files recursively inside the base paths given
+     *
+     * @throws IOException
+     */
+    public static Set<Path> getAllFilesRecursively(Set<Path> basePaths, Configuration conf) throws IOException {
+        Set<Path> paths = new HashSet<Path>();
+        for (Path path : basePaths) {
+            FileSystem fs = FileSystem.get(path.toUri(), conf);
+            FileStatus f = fs.getFileStatus(path);
+            if (f.isDir()) {
+                getAllFilesInternal(f, conf, paths, fs);
+            } else {
+                paths.add(path);
+            }
+        }
+        return paths;
     }
 
-    private static void getAllSubDirsInternal(FileStatus file, Configuration conf,
+    private static void getAllFilesInternal(FileStatus file, Configuration conf,
             Set<Path> paths, FileSystem fs) throws IOException {
-        if (file.isDir()) {
-            for (FileStatus sub : fs.listStatus(file.getPath())) {
-                getAllSubDirsInternal(sub, conf, paths, fs);
+        for (FileStatus f : fs.listStatus(file.getPath(), PATH_FILTER)) {
+            if (f.isDir()) {
+                getAllFilesInternal(f, conf, paths, fs);
+            } else {
+                paths.add(f.getPath());
             }
-        } else {
-            AvroStorageLog.details("Add input file:" + file);
-            paths.add(file.getPath());
         }
     }
 
@@ -160,22 +161,24 @@ public class AvroStorageUtils {
     /** get last file of a hdfs path if it is  a directory;
      *   or return the file itself if path is a file
      */
-    public static Path getLast(String path, FileSystem fs) throws IOException {
-        return getLast(new Path(path), fs);
-    }
-
-    /** get last file of a hdfs path if it is  a directory;
-     *   or return the file itself if path is a file
-     */
     public static Path getLast(Path path, FileSystem fs) throws IOException {
 
+        FileStatus status = fs.getFileStatus(path);
+        if (!status.isDir()) {
+            return path;
+        }
         FileStatus[] statuses = fs.listStatus(path, PATH_FILTER);
 
         if (statuses.length == 0) {
-            return path;
+            return null;
         } else {
             Arrays.sort(statuses);
-            return statuses[statuses.length - 1].getPath();
+            for (int i = statuses.length - 1; i >= 0; i--) {
+                if (!statuses[i].isDir()) {
+                    return statuses[i].getPath();
+                }
+            }
+            return null;
         }
     }
 
@@ -705,6 +708,9 @@ public class AvroStorageUtils {
     public static Schema getSchema(Path path, FileSystem fs) throws IOException {
         /* get path of the last file */
         Path lastFile = AvroStorageUtils.getLast(path, fs);
+        if (lastFile == null) {
+            return null;
+        }
 
         /* read in file and obtain schema */
         GenericDatumReader<Object> avroReader = new GenericDatumReader<Object>();

Modified: pig/branches/branch-0.12/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroInputFormat.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.12/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroInputFormat.java?rev=1584916&r1=1584915&r2=1584916&view=diff
==============================================================================
--- pig/branches/branch-0.12/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroInputFormat.java (original)
+++ pig/branches/branch-0.12/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroInputFormat.java Fri Apr  4 21:20:54 2014
@@ -5,9 +5,9 @@
  * licenses this file to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
- * 
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
  * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
@@ -18,9 +18,9 @@
 package org.apache.pig.piggybank.storage.avro;
 
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+
 import org.apache.avro.Schema;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
@@ -32,6 +32,7 @@ import org.apache.hadoop.mapreduce.Recor
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
 
 /**
  * The InputFormat for avro data.
@@ -86,4 +87,14 @@ public class PigAvroInputFormat extends 
                 ignoreBadFiles, schemaToMergedSchemaMap, useMultipleSchemas);
     }
 
+    /*
+     * This is to support multi-level/recursive directory listing until
+     * MAPREDUCE-1577 is fixed.
+     */
+    @Override
+    protected List<FileStatus> listStatus(JobContext job) throws IOException {
+        return MapRedUtil.getAllFileRecursively(super.listStatus(job),
+                job.getConfiguration());
+    }
+
 }

Modified: pig/branches/branch-0.12/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroRecordReader.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.12/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroRecordReader.java?rev=1584916&r1=1584915&r2=1584916&view=diff
==============================================================================
--- pig/branches/branch-0.12/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroRecordReader.java (original)
+++ pig/branches/branch-0.12/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroRecordReader.java Fri Apr  4 21:20:54 2014
@@ -5,9 +5,9 @@
  * licenses this file to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
- * 
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
  * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
@@ -127,6 +127,15 @@ public class PigAvroRecordReader extends
                 JsonNode defValue = subFields.get(i).defaultValue();
                 if (defValue != null) {
                     Schema.Type type = subFields.get(i).schema().getType();
+                    if (type.equals(Schema.Type.UNION)) {
+                        List<Schema> schemas = subFields.get(i).schema().getTypes();
+                        for (Schema schema : schemas) {
+                            if (!schema.getType().equals(Schema.Type.NULL)) {
+                                type = schema.getType();
+                                break;
+                            }
+                        }
+                    }
                     switch (type) {
                         case BOOLEAN:
                             mProtoTuple.add(i, defValue.getBooleanValue());

Modified: pig/branches/branch-0.12/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/TestAvroStorage.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.12/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/TestAvroStorage.java?rev=1584916&r1=1584915&r2=1584916&view=diff
==============================================================================
--- pig/branches/branch-0.12/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/TestAvroStorage.java (original)
+++ pig/branches/branch-0.12/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/TestAvroStorage.java Fri Apr  4 21:20:54 2014
@@ -19,6 +19,16 @@ package org.apache.pig.piggybank.test.st
 import static org.apache.pig.builtin.mock.Storage.resetData;
 import static org.apache.pig.builtin.mock.Storage.schema;
 import static org.apache.pig.builtin.mock.Storage.tuple;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
 
 import org.apache.avro.file.DataFileStream;
 import org.apache.avro.generic.GenericDatumReader;
@@ -29,7 +39,6 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
-import org.apache.hadoop.mapreduce.Job;
 import org.apache.pig.ExecType;
 import org.apache.pig.LoadFunc;
 import org.apache.pig.PigServer;
@@ -41,26 +50,13 @@ import org.apache.pig.builtin.mock.Stora
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.impl.logicalLayer.FrontendException;
-import org.apache.pig.piggybank.storage.avro.AvroStorage;
 import org.apache.pig.piggybank.storage.avro.PigSchema2Avro;
-import org.apache.pig.test.MiniCluster;
 import org.apache.pig.test.Util;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Properties;
-import java.util.Set;
-
-import static junit.framework.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
 public class TestAvroStorage {
 
     protected static final Log LOG = LogFactory.getLog(TestAvroStorage.class);
@@ -594,6 +590,7 @@ public class TestAvroStorage {
 
     @Test
     public void testUserDefinedLoadSchema() throws IOException {
+        PigSchema2Avro.setTupleIndex(2);
         // Verify that user specified schema correctly maps to input schemas
         // Input Avro files have the following schemas:
         //   name:"string", address:[customField1:"int", addressLine:"string"]
@@ -604,7 +601,7 @@ public class TestAvroStorage {
         // dropping, adding, and reordering fields where needed.
         String output= outbasedir + "testUserDefinedLoadSchema";
         String expected = basedir + "expected_testUserDefinedLoadSchema.avro";
-        String customSchema = 
+        String customSchema =
                     "{\"type\": \"record\", \"name\": \"employee\", \"fields\": [ "
                         +"{ \"default\": \"***\", \"type\": \"string\", \"name\": \"name\" }, "
                         +"{ \"name\": \"address\", \"type\": { "
@@ -621,7 +618,7 @@ public class TestAvroStorage {
             " in = LOAD '" + testUserDefinedLoadSchemaFile
                 + "' USING org.apache.pig.piggybank.storage.avro.AvroStorage ('schema', '" + customSchema + "');",
             " o = ORDER in BY name;",
-            " STORE o INTO '" + output + "' USING org.apache.pig.piggybank.storage.avro.AvroStorage ();" 
+            " STORE o INTO '" + output + "' USING org.apache.pig.piggybank.storage.avro.AvroStorage ();"
            };
         testAvroStorage(queries);
         verifyResults(output, expected);
@@ -1212,30 +1209,65 @@ public class TestAvroStorage {
     // {"name":"age","type":["null","int"],"doc":"autogenerated from Pig Field Schema"},
     // {"name":"gpa","type":["null","double"],"doc":"autogenerated from Pig Field Schema"}]}]
     public void testLoadwithNullValues() throws IOException {
-    //Input is supposed to have empty tuples
-    PigSchema2Avro.setTupleIndex(0);
-    Data data = resetData(pigServerLocal);
-    String output = outbasedir + "testLoadwithNulls";
-    deleteDirectory(new File(output));
-    String [] queries = {
-       " A = load '" +  testLoadwithNullValues + "' USING " +
-          " org.apache.pig.piggybank.storage.avro.AvroStorage(); ",
-       " B = order A by name;",
-       " store B into '" +  output +"' USING mock.Storage();"
-       };
-    testAvroStorage(queries);
-    List<Tuple> out = data.get(output);
-    assertEquals(out + " size", 4, out.size());
-
-    assertEquals(schema("name:chararray,age:int,gpa:double"), data.getSchema(output));
-
-    // sorted data ordered by name
-    assertEquals(tuple((String)null),out.get(0));
-    assertEquals(tuple((String)null),out.get(1));
-    assertEquals(tuple("calvin ellison", 24, 0.71), out.get(2));
-    assertEquals(tuple("wendy johnson", 60, 0.07), out.get(3));
+        //Input is supposed to have empty tuples
+        PigSchema2Avro.setTupleIndex(0);
+        Data data = resetData(pigServerLocal);
+        String output = outbasedir + "testLoadwithNulls";
+        deleteDirectory(new File(output));
+        String [] queries = {
+           " A = load '" +  testLoadwithNullValues + "' USING " +
+              " org.apache.pig.piggybank.storage.avro.AvroStorage(); ",
+           " B = order A by name;",
+           " store B into '" +  output +"' USING mock.Storage();"
+           };
+        testAvroStorage(queries);
+        List<Tuple> out = data.get(output);
+        assertEquals(out + " size", 4, out.size());
 
-   }
+        assertEquals(schema("name:chararray,age:int,gpa:double"), data.getSchema(output));
+
+        // sorted data ordered by name
+        assertEquals(tuple((String)null),out.get(0));
+        assertEquals(tuple((String)null),out.get(1));
+        assertEquals(tuple("calvin ellison", 24, 0.71), out.get(2));
+        assertEquals(tuple("wendy johnson", 60, 0.07), out.get(3));
+
+    }
+
+    @Test
+    public void testMultipleLoadStore() throws Exception {
+        PigSchema2Avro.setTupleIndex(0);
+        Data data = resetData(pigServerLocal);
+        data.set("foo",
+                tuple(1, 2, 3),
+                tuple(4, 5, 6),
+                tuple(7, 8, 9));
+        data.set("bar",
+                tuple("a", "b", "c"),
+                tuple("d", "e", "f"),
+                tuple("g", "h", "i"));
+        String output = outbasedir + "testMultipleLoadStore";
+        deleteDirectory(new File(output));
+        String[] storeQuery = {
+                "A = LOAD 'foo' USING " + "mock.Storage() as (a1:int, a2:int, a3:int);",
+                "B = LOAD 'bar' USING " + "mock.Storage() as (b1:chararray, b2:chararray, b3:chararray);",
+                "STORE A into '"+ output +"/A' USING " + "org.apache.pig.piggybank.storage.avro.AvroStorage();",
+                "STORE B into '"+ output +"/B' USING " + "org.apache.pig.piggybank.storage.avro.AvroStorage();"
+                };
+        testAvroStorage(storeQuery);
+        String[] loadQuery = {
+                "C = LOAD '"+ output +"/A' USING " + "org.apache.pig.piggybank.storage.avro.AvroStorage();",
+                "D = LOAD '"+ output +"/B' USING " + "org.apache.pig.piggybank.storage.avro.AvroStorage();",
+                "STORE C into 'foo-actual' USING mock.Storage();",
+                "STORE D into 'bar-actual' USING mock.Storage();"
+                };
+        testAvroStorage(loadQuery);
+
+        assertEquals(data.get("foo"), data.get("foo-actual"));
+        assertEquals(data.get("bar"), data.get("bar-actual"));
+        assertEquals("{a1: int,a2: int,a3: int}", data.getSchema("foo-actual").toString());
+        assertEquals("{b1: chararray,b2: chararray,b3: chararray}", data.getSchema("bar-actual").toString());
+    }
 
     private static void deleteDirectory (File path) {
         if ( path.exists()) {

Modified: pig/branches/branch-0.12/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/TestAvroStorageUtils.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.12/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/TestAvroStorageUtils.java?rev=1584916&r1=1584915&r2=1584916&view=diff
==============================================================================
--- pig/branches/branch-0.12/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/TestAvroStorageUtils.java (original)
+++ pig/branches/branch-0.12/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/TestAvroStorageUtils.java Fri Apr  4 21:20:54 2014
@@ -19,15 +19,12 @@ package org.apache.pig.piggybank.test.st
 import org.apache.avro.Schema;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.Job;
 import org.apache.pig.piggybank.storage.avro.AvroStorageUtils;
-
 import org.junit.Assert;
 import org.junit.Test;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 
@@ -93,37 +90,45 @@ public class TestAvroStorageUtils {
     }
 
     @Test
-    public void testGetAllSubDirs() throws IOException {
-        final String basedir = System.getProperty("user.dir");
+    public void testGetPaths() throws IOException {
+        final String basedir = "file://" + System.getProperty("user.dir");
         final String tempdir = Long.toString(System.currentTimeMillis());
         final String nonexistentpath = basedir + "/" + tempdir + "/this_path_does_not_exist";
 
         String locationStr = null;
-        Set<Path> paths = new HashSet<Path>();
+        Set<Path> paths;
         Configuration conf = new Configuration();
 
         // existent path
         locationStr = basedir;
-        assertTrue(AvroStorageUtils.getAllSubDirs(new Path(locationStr), conf, paths));
+        paths = AvroStorageUtils.getPaths(locationStr, conf, true);
         assertFalse(paths.isEmpty());
-        paths.clear();
 
         // non-existent path
         locationStr = nonexistentpath;
-        assertFalse(AvroStorageUtils.getAllSubDirs(new Path(locationStr), conf, paths));
-        assertTrue(paths.isEmpty());
-        paths.clear();
+        try {
+            paths = AvroStorageUtils.getPaths(locationStr, conf, true);
+            fail();
+        } catch (IOException e) {
+            assertTrue(e.getMessage().contains("matches 0 files"));
+        }
 
         // empty glob pattern
         locationStr = basedir + "/{}";
-        assertFalse(AvroStorageUtils.getAllSubDirs(new Path(locationStr), conf, paths));
+        try {
+            paths = AvroStorageUtils.getPaths(locationStr, conf, true);
+            fail();
+        } catch (IOException e) {
+            assertTrue(e.getMessage().contains("matches 0 files"));
+        }
+
+        paths = AvroStorageUtils.getPaths(locationStr, conf, false);
         assertTrue(paths.isEmpty());
-        paths.clear();
 
         // bad glob pattern
         locationStr = basedir + "/{1,";
         try {
-            AvroStorageUtils.getAllSubDirs(new Path(locationStr), conf, paths);
+            AvroStorageUtils.getPaths(locationStr, conf, true);
             Assert.fail("Negative test to test illegal file pattern. Should not be succeeding!");
         } catch (IOException e) {
             // The message of the exception for illegal file pattern is rather long,