You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ga...@apache.org on 2012/09/13 23:16:55 UTC

svn commit: r1384536 - in /pig/trunk/contrib: ./ piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/ piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/ piggybank/java/src/test/java/org/apache/pig/piggybank/test/sto...

Author: gates
Date: Thu Sep 13 21:16:54 2012
New Revision: 1384536

URL: http://svn.apache.org/viewvc?rev=1384536&view=rev
Log:
PIG-2909 Add a new option for ignoring corrupted files to AvroStorage load func

Added:
    pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/expected_testCorruptedFile.avro   (with props)
    pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_corrupted_file.avro   (with props)
Modified:
    pig/trunk/contrib/CHANGES.txt
    pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorage.java
    pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroInputFormat.java
    pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroRecordReader.java
    pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/TestAvroStorage.java

Modified: pig/trunk/contrib/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/CHANGES.txt?rev=1384536&r1=1384535&r2=1384536&view=diff
==============================================================================
--- pig/trunk/contrib/CHANGES.txt (original)
+++ pig/trunk/contrib/CHANGES.txt Thu Sep 13 21:16:54 2012
@@ -24,6 +24,8 @@ INCOMPATIBLE CHANGES
 
 IMPROVEMENTS
 
+PIG-2909 Add a new option for ignoring corrupted files to AvroStorage load func (cheolsoo via gates)
+
 PIG-2202 AvroStorage doesn't work with Avro 1.5.1 (billgraham via gates)
 
 PIG-1959 Penny: a framework for workflow instrumentation (olston, breed via gates)

Modified: pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorage.java
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorage.java?rev=1384536&r1=1384535&r2=1384536&view=diff
==============================================================================
--- pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorage.java (original)
+++ pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorage.java Thu Sep 13 21:16:54 2012
@@ -88,6 +88,7 @@ public class AvroStorage extends FileInp
     private Schema inputAvroSchema = null;    /* input avro schema */
 
     private boolean checkSchema = true; /*whether check schema of input directories*/
+    private boolean ignoreBadFiles = false; /* whether ignore corrupted files during load */
 
     /**
      * Empty constructor. Output schema is derived from pig schema.
@@ -112,9 +113,11 @@ public class AvroStorage extends FileInp
         nullable = true;
         checkSchema = true;
 
-        if (parts.length == 1 && !parts[0].equalsIgnoreCase("no_schema_check")) {
-            /* If one parameter is given, and that is not 'no_schema_check',
-             * then it must be a json string.
+        if (parts.length == 1
+                && !parts[0].equalsIgnoreCase("no_schema_check")
+                && !parts[0].equalsIgnoreCase("ignore_bad_files")) {
+            /* If one parameter is given, and that is neither 'no_schema_check'
+             * nor 'ignore_bad_files', then it must be a json string.
              */
             init(parseJsonString(parts[0]));
         } else {
@@ -259,7 +262,7 @@ public class AvroStorage extends FileInp
     public InputFormat getInputFormat() throws IOException {
         AvroStorageLog.funcCall("getInputFormat");
         if(inputAvroSchema != null)
-            return new PigAvroInputFormat(inputAvroSchema);
+            return new PigAvroInputFormat(inputAvroSchema, ignoreBadFiles);
         else
             return new TextInputFormat();
     }
@@ -381,6 +384,10 @@ public class AvroStorage extends FileInp
                 checkSchema = false;
                 /* parameter only, so increase iteration counter by 1 */
                 i += 1;
+            } else if (name.equalsIgnoreCase("ignore_bad_files")) {
+                ignoreBadFiles = true;
+                /* parameter only, so increase iteration counter by 1 */
+                i += 1;
             } else {
                 String value = parts[i+1].trim();
                 if (name.equalsIgnoreCase("debug")
@@ -627,6 +634,11 @@ public class AvroStorage extends FileInp
     }
 
     @Override
+    public void cleanupOnSuccess(String location, Job job) throws IOException {
+        // Nothing to do
+    }
+
+    @Override
     public void putNext(Tuple t) throws IOException {
         try {
             this.writer.write(NullWritable.get(), t.getAll().size() == 1 ? t.get(0) : t);
@@ -634,9 +646,4 @@ public class AvroStorage extends FileInp
             e.printStackTrace();
         }
     }
-
-    @Override
-    public void cleanupOnSuccess(String location, Job job) throws IOException{
-
-    }
 }

Modified: pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroInputFormat.java
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroInputFormat.java?rev=1384536&r1=1384535&r2=1384536&view=diff
==============================================================================
--- pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroInputFormat.java (original)
+++ pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroInputFormat.java Thu Sep 13 21:16:54 2012
@@ -38,6 +38,7 @@ import org.apache.hadoop.mapreduce.lib.i
 public class PigAvroInputFormat extends FileInputFormat<NullWritable, Writable> {
 
     private Schema schema = null;  /* avro schema */
+    private boolean ignoreBadFiles = false; /* whether ignore corrupted files during load */
 
     /**
      * empty constructor
@@ -46,11 +47,13 @@ public class PigAvroInputFormat extends 
     }
 
     /**
-     * constructor called by AvroStorage to pass in schema
-     * @param s input data schema
+     * constructor called by AvroStorage to pass in schema and ignoreBadFiles.
+     * @param schema input data schema
+     * @param ignoreBadFiles whether ignore corrupted files during load
      */
-    public PigAvroInputFormat(Schema s) {
-        schema = s;
+    public PigAvroInputFormat(Schema schema, boolean ignoreBadFiles) {
+        this.schema = schema;
+        this.ignoreBadFiles = ignoreBadFiles;
     }
 
     /**
@@ -63,7 +66,7 @@ public class PigAvroInputFormat extends 
     createRecordReader(InputSplit split, TaskAttemptContext context)
     throws IOException,  InterruptedException {
         context.setStatus(split.toString());
-        return new PigAvroRecordReader(context, (FileSplit) split, schema);
+        return new PigAvroRecordReader(context, (FileSplit) split, schema, ignoreBadFiles);
     }
 
 }

Modified: pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroRecordReader.java
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroRecordReader.java?rev=1384536&r1=1384535&r2=1384536&view=diff
==============================================================================
--- pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroRecordReader.java (original)
+++ pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroRecordReader.java Thu Sep 13 21:16:54 2012
@@ -18,8 +18,12 @@
 package org.apache.pig.piggybank.storage.avro;
 
 import java.io.IOException;
+import org.apache.avro.AvroRuntimeException;
 import org.apache.avro.Schema;
 import org.apache.avro.file.DataFileReader;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapreduce.InputSplit;
@@ -36,17 +40,22 @@ import org.apache.pig.data.TupleFactory;
  */
 public class PigAvroRecordReader extends RecordReader<NullWritable, Writable> {
 
+    private static final Log LOG = LogFactory.getLog(PigAvroRecordReader.class);
+
     private AvroStorageInputStream in;
     private DataFileReader<Object> reader;   /*reader of input avro data*/
     private long start;
     private long end;
+    private Path path;
+    private boolean ignoreBadFiles;
 
     /**
      * constructor to initialize input and avro data reader
      */
     public PigAvroRecordReader(TaskAttemptContext context, FileSplit split,
-                                    Schema schema) throws IOException {
-        this.in = new AvroStorageInputStream(split.getPath(), context);
+                               Schema schema, boolean ignoreBadFiles) throws IOException {
+        this.path = split.getPath();
+        this.in = new AvroStorageInputStream(path, context);
         if(schema == null)
             throw new IOException("Need to provide input avro schema");
 
@@ -59,6 +68,7 @@ public class PigAvroRecordReader extends
         this.reader.sync(split.getStart()); // sync to start
         this.start = in.tell();
         this.end = split.getStart() + split.getLength();
+        this.ignoreBadFiles = ignoreBadFiles;
     }
 
     @Override
@@ -108,10 +118,21 @@ public class PigAvroRecordReader extends
 
     @Override
     public boolean nextKeyValue() throws IOException, InterruptedException {
-        if (!reader.hasNext() || reader.pastSync(end))
-            return false;
-
-        return true;
+        try {
+            if (!reader.hasNext() || reader.pastSync(end)) {
+                return false;
+            }
+            return true;
+        } catch (AvroRuntimeException e) {
+            if (ignoreBadFiles) {
+                // For currupted files, AvroRuntimeException can be thrown.
+                // We ignore them if the option 'ignore_bad_files' is enabled.
+                LOG.warn("Ignoring bad file '" + path + "'.");
+                return false;
+            } else {
+                throw e;
+            }
+        }
     }
 
 }

Modified: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/TestAvroStorage.java
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/TestAvroStorage.java?rev=1384536&r1=1384535&r2=1384536&view=diff
==============================================================================
--- pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/TestAvroStorage.java (original)
+++ pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/TestAvroStorage.java Thu Sep 13 21:16:54 2012
@@ -158,6 +158,7 @@ public class TestAvroStorage {
         "     } ]" +
         "   } ]" +
         " }";
+    final private String testCorruptedFile = getInputFile("test_corrupted_file.avro");
 
     @BeforeClass
     public static void setup() throws ExecException {
@@ -841,6 +842,35 @@ public class TestAvroStorage {
         verifyResults(output, expected);
     }
 
+    @Test
+    public void testCorruptedFile1() throws IOException {
+        // Verify that load fails when bad files are found if ignore_bad_files is disabled.
+        String output = outbasedir + "testCorruptedFile1";
+        deleteDirectory(new File(output));
+        String [] queries = {
+           " in = LOAD '" + testCorruptedFile + "' USING org.apache.pig.piggybank.storage.avro.AvroStorage ();",
+           " STORE in INTO '" + output + "' USING org.apache.pig.piggybank.storage.avro.AvroStorage ();"
+            };
+        // Job is expected to fail for bad files.
+        testAvroStorage(true, queries);
+    }
+
+    @Test
+    public void testCorruptedFile2() throws IOException {
+        // Verify that corrupted files are skipped if ignore_bad_files is enabled.
+        // Output is expected to be empty.
+        String output = outbasedir + "testCorruptedFile2";
+        String expected = basedir + "expected_testCorruptedFile.avro";
+        deleteDirectory(new File(output));
+        String [] queries = {
+           " in = LOAD '" + testCorruptedFile + "'" +
+                  " USING org.apache.pig.piggybank.storage.avro.AvroStorage ('ignore_bad_files');",
+           " STORE in INTO '" + output + "' USING org.apache.pig.piggybank.storage.avro.AvroStorage ();"
+            };
+        testAvroStorage(queries);
+        verifyResults(output, expected);
+    }
+
     private static void deleteDirectory (File path) {
         if ( path.exists()) {
             File [] files = path.listFiles();

Added: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/expected_testCorruptedFile.avro
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/expected_testCorruptedFile.avro?rev=1384536&view=auto
==============================================================================
Binary file - no diff available.

Propchange: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/expected_testCorruptedFile.avro
------------------------------------------------------------------------------
    svn:mime-type = application/octet-stream

Added: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_corrupted_file.avro
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_corrupted_file.avro?rev=1384536&view=auto
==============================================================================
Binary file - no diff available.

Propchange: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_corrupted_file.avro
------------------------------------------------------------------------------
    svn:mime-type = application/octet-stream