You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ga...@apache.org on 2012/09/13 23:16:55 UTC
svn commit: r1384536 - in /pig/trunk/contrib: ./
piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/
piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/
piggybank/java/src/test/java/org/apache/pig/piggybank/test/sto...
Author: gates
Date: Thu Sep 13 21:16:54 2012
New Revision: 1384536
URL: http://svn.apache.org/viewvc?rev=1384536&view=rev
Log:
PIG-2909 Add a new option for ignoring corrupted files to AvroStorage load func
Added:
pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/expected_testCorruptedFile.avro (with props)
pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_corrupted_file.avro (with props)
Modified:
pig/trunk/contrib/CHANGES.txt
pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorage.java
pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroInputFormat.java
pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroRecordReader.java
pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/TestAvroStorage.java
Modified: pig/trunk/contrib/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/CHANGES.txt?rev=1384536&r1=1384535&r2=1384536&view=diff
==============================================================================
--- pig/trunk/contrib/CHANGES.txt (original)
+++ pig/trunk/contrib/CHANGES.txt Thu Sep 13 21:16:54 2012
@@ -24,6 +24,8 @@ INCOMPATIBLE CHANGES
IMPROVEMENTS
+PIG-2909 Add a new option for ignoring corrupted files to AvroStorage load func (cheolsoo via gates)
+
PIG-2202 AvroStorage doesn't work with Avro 1.5.1 (billgraham via gates)
PIG-1959 Penny: a framework for workflow instrumentation (olston, breed via gates)
Modified: pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorage.java
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorage.java?rev=1384536&r1=1384535&r2=1384536&view=diff
==============================================================================
--- pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorage.java (original)
+++ pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorage.java Thu Sep 13 21:16:54 2012
@@ -88,6 +88,7 @@ public class AvroStorage extends FileInp
private Schema inputAvroSchema = null; /* input avro schema */
private boolean checkSchema = true; /*whether check schema of input directories*/
+ private boolean ignoreBadFiles = false; /* whether ignore corrupted files during load */
/**
* Empty constructor. Output schema is derived from pig schema.
@@ -112,9 +113,11 @@ public class AvroStorage extends FileInp
nullable = true;
checkSchema = true;
- if (parts.length == 1 && !parts[0].equalsIgnoreCase("no_schema_check")) {
- /* If one parameter is given, and that is not 'no_schema_check',
- * then it must be a json string.
+ if (parts.length == 1
+ && !parts[0].equalsIgnoreCase("no_schema_check")
+ && !parts[0].equalsIgnoreCase("ignore_bad_files")) {
+ /* If one parameter is given, and that is neither 'no_schema_check'
+ * nor 'ignore_bad_files', then it must be a json string.
*/
init(parseJsonString(parts[0]));
} else {
@@ -259,7 +262,7 @@ public class AvroStorage extends FileInp
public InputFormat getInputFormat() throws IOException {
AvroStorageLog.funcCall("getInputFormat");
if(inputAvroSchema != null)
- return new PigAvroInputFormat(inputAvroSchema);
+ return new PigAvroInputFormat(inputAvroSchema, ignoreBadFiles);
else
return new TextInputFormat();
}
@@ -381,6 +384,10 @@ public class AvroStorage extends FileInp
checkSchema = false;
/* parameter only, so increase iteration counter by 1 */
i += 1;
+ } else if (name.equalsIgnoreCase("ignore_bad_files")) {
+ ignoreBadFiles = true;
+ /* parameter only, so increase iteration counter by 1 */
+ i += 1;
} else {
String value = parts[i+1].trim();
if (name.equalsIgnoreCase("debug")
@@ -627,6 +634,11 @@ public class AvroStorage extends FileInp
}
@Override
+ public void cleanupOnSuccess(String location, Job job) throws IOException {
+ // Nothing to do
+ }
+
+ @Override
public void putNext(Tuple t) throws IOException {
try {
this.writer.write(NullWritable.get(), t.getAll().size() == 1 ? t.get(0) : t);
@@ -634,9 +646,4 @@ public class AvroStorage extends FileInp
e.printStackTrace();
}
}
-
- @Override
- public void cleanupOnSuccess(String location, Job job) throws IOException{
-
- }
}
Modified: pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroInputFormat.java
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroInputFormat.java?rev=1384536&r1=1384535&r2=1384536&view=diff
==============================================================================
--- pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroInputFormat.java (original)
+++ pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroInputFormat.java Thu Sep 13 21:16:54 2012
@@ -38,6 +38,7 @@ import org.apache.hadoop.mapreduce.lib.i
public class PigAvroInputFormat extends FileInputFormat<NullWritable, Writable> {
private Schema schema = null; /* avro schema */
+ private boolean ignoreBadFiles = false; /* whether ignore corrupted files during load */
/**
* empty constructor
@@ -46,11 +47,13 @@ public class PigAvroInputFormat extends
}
/**
- * constructor called by AvroStorage to pass in schema
- * @param s input data schema
+ * constructor called by AvroStorage to pass in schema and ignoreBadFiles.
+ * @param schema input data schema
+ * @param ignoreBadFiles whether ignore corrupted files during load
*/
- public PigAvroInputFormat(Schema s) {
- schema = s;
+ public PigAvroInputFormat(Schema schema, boolean ignoreBadFiles) {
+ this.schema = schema;
+ this.ignoreBadFiles = ignoreBadFiles;
}
/**
@@ -63,7 +66,7 @@ public class PigAvroInputFormat extends
createRecordReader(InputSplit split, TaskAttemptContext context)
throws IOException, InterruptedException {
context.setStatus(split.toString());
- return new PigAvroRecordReader(context, (FileSplit) split, schema);
+ return new PigAvroRecordReader(context, (FileSplit) split, schema, ignoreBadFiles);
}
}
Modified: pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroRecordReader.java
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroRecordReader.java?rev=1384536&r1=1384535&r2=1384536&view=diff
==============================================================================
--- pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroRecordReader.java (original)
+++ pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroRecordReader.java Thu Sep 13 21:16:54 2012
@@ -18,8 +18,12 @@
package org.apache.pig.piggybank.storage.avro;
import java.io.IOException;
+import org.apache.avro.AvroRuntimeException;
import org.apache.avro.Schema;
import org.apache.avro.file.DataFileReader;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.InputSplit;
@@ -36,17 +40,22 @@ import org.apache.pig.data.TupleFactory;
*/
public class PigAvroRecordReader extends RecordReader<NullWritable, Writable> {
+ private static final Log LOG = LogFactory.getLog(PigAvroRecordReader.class);
+
private AvroStorageInputStream in;
private DataFileReader<Object> reader; /*reader of input avro data*/
private long start;
private long end;
+ private Path path;
+ private boolean ignoreBadFiles;
/**
* constructor to initialize input and avro data reader
*/
public PigAvroRecordReader(TaskAttemptContext context, FileSplit split,
- Schema schema) throws IOException {
- this.in = new AvroStorageInputStream(split.getPath(), context);
+ Schema schema, boolean ignoreBadFiles) throws IOException {
+ this.path = split.getPath();
+ this.in = new AvroStorageInputStream(path, context);
if(schema == null)
throw new IOException("Need to provide input avro schema");
@@ -59,6 +68,7 @@ public class PigAvroRecordReader extends
this.reader.sync(split.getStart()); // sync to start
this.start = in.tell();
this.end = split.getStart() + split.getLength();
+ this.ignoreBadFiles = ignoreBadFiles;
}
@Override
@@ -108,10 +118,21 @@ public class PigAvroRecordReader extends
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
- if (!reader.hasNext() || reader.pastSync(end))
- return false;
-
- return true;
+ try {
+ if (!reader.hasNext() || reader.pastSync(end)) {
+ return false;
+ }
+ return true;
+ } catch (AvroRuntimeException e) {
+ if (ignoreBadFiles) {
+ // For currupted files, AvroRuntimeException can be thrown.
+ // We ignore them if the option 'ignore_bad_files' is enabled.
+ LOG.warn("Ignoring bad file '" + path + "'.");
+ return false;
+ } else {
+ throw e;
+ }
+ }
}
}
Modified: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/TestAvroStorage.java
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/TestAvroStorage.java?rev=1384536&r1=1384535&r2=1384536&view=diff
==============================================================================
--- pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/TestAvroStorage.java (original)
+++ pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/TestAvroStorage.java Thu Sep 13 21:16:54 2012
@@ -158,6 +158,7 @@ public class TestAvroStorage {
" } ]" +
" } ]" +
" }";
+ final private String testCorruptedFile = getInputFile("test_corrupted_file.avro");
@BeforeClass
public static void setup() throws ExecException {
@@ -841,6 +842,35 @@ public class TestAvroStorage {
verifyResults(output, expected);
}
+ @Test
+ public void testCorruptedFile1() throws IOException {
+ // Verify that load fails when bad files are found if ignore_bad_files is disabled.
+ String output = outbasedir + "testCorruptedFile1";
+ deleteDirectory(new File(output));
+ String [] queries = {
+ " in = LOAD '" + testCorruptedFile + "' USING org.apache.pig.piggybank.storage.avro.AvroStorage ();",
+ " STORE in INTO '" + output + "' USING org.apache.pig.piggybank.storage.avro.AvroStorage ();"
+ };
+ // Job is expected to fail for bad files.
+ testAvroStorage(true, queries);
+ }
+
+ @Test
+ public void testCorruptedFile2() throws IOException {
+ // Verify that corrupted files are skipped if ignore_bad_files is enabled.
+ // Output is expected to be empty.
+ String output = outbasedir + "testCorruptedFile2";
+ String expected = basedir + "expected_testCorruptedFile.avro";
+ deleteDirectory(new File(output));
+ String [] queries = {
+ " in = LOAD '" + testCorruptedFile + "'" +
+ " USING org.apache.pig.piggybank.storage.avro.AvroStorage ('ignore_bad_files');",
+ " STORE in INTO '" + output + "' USING org.apache.pig.piggybank.storage.avro.AvroStorage ();"
+ };
+ testAvroStorage(queries);
+ verifyResults(output, expected);
+ }
+
private static void deleteDirectory (File path) {
if ( path.exists()) {
File [] files = path.listFiles();
Added: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/expected_testCorruptedFile.avro
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/expected_testCorruptedFile.avro?rev=1384536&view=auto
==============================================================================
Binary file - no diff available.
Propchange: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/expected_testCorruptedFile.avro
------------------------------------------------------------------------------
svn:mime-type = application/octet-stream
Added: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_corrupted_file.avro
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_corrupted_file.avro?rev=1384536&view=auto
==============================================================================
Binary file - no diff available.
Propchange: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_corrupted_file.avro
------------------------------------------------------------------------------
svn:mime-type = application/octet-stream