You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2011/12/08 23:04:36 UTC
svn commit: r1212163 - in /pig/trunk: CHANGES.txt
src/org/apache/pig/builtin/PigStorage.java
test/org/apache/pig/test/TestBZip.java
Author: daijy
Date: Thu Dec 8 22:04:36 2011
New Revision: 1212163
URL: http://svn.apache.org/viewvc?rev=1212163&view=rev
Log:
PIG-2391: Bzip_2 test is broken
Modified:
pig/trunk/CHANGES.txt
pig/trunk/src/org/apache/pig/builtin/PigStorage.java
pig/trunk/test/org/apache/pig/test/TestBZip.java
Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1212163&r1=1212162&r2=1212163&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Thu Dec 8 22:04:36 2011
@@ -190,6 +190,8 @@ PIG-2228: support partial aggregation in
BUG FIXES
+PIG-2391: Bzip_2 test is broken (xutingz via daijy)
+
PIG-2358: JobStats.getHadoopCounters() is never set and always returns null (xutingz via daijy)
PIG-2184: Not able to provide positional reference to macro invocations (xutingz via daijy)
Modified: pig/trunk/src/org/apache/pig/builtin/PigStorage.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/builtin/PigStorage.java?rev=1212163&r1=1212162&r2=1212163&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/builtin/PigStorage.java (original)
+++ pig/trunk/src/org/apache/pig/builtin/PigStorage.java Thu Dec 8 22:04:36 2011
@@ -33,8 +33,10 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.compress.BZip2Codec;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.OutputFormat;
@@ -380,13 +382,15 @@ LoadPushDown, LoadMetadata, StoreMetadat
}
private void setCompression(Path path, Job job) {
- CompressionCodecFactory codecFactory = new CompressionCodecFactory(job.getConfiguration());
- CompressionCodec codec = codecFactory.getCodec(path);
- if (codec != null) {
+ String location=path.getName();
+ if (location.endsWith(".bz2") || location.endsWith(".bz")) {
FileOutputFormat.setCompressOutput(job, true);
- FileOutputFormat.setOutputCompressorClass(job, codec.getClass());
- }else {
- FileOutputFormat.setCompressOutput(job, false);
+ FileOutputFormat.setOutputCompressorClass(job, BZip2Codec.class);
+ } else if (location.endsWith(".gz")) {
+ FileOutputFormat.setCompressOutput(job, true);
+ FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
+ } else {
+ FileOutputFormat.setCompressOutput( job, false);
}
}
Modified: pig/trunk/test/org/apache/pig/test/TestBZip.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestBZip.java?rev=1212163&r1=1212162&r2=1212163&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestBZip.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestBZip.java Thu Dec 8 22:04:36 2011
@@ -181,6 +181,55 @@ public class TestBZip {
out.delete();
}
+ //see PIG-2391
+ @Test
+ public void testBz2() throws Exception {
+ String[] inputData = new String[] {
+ "1\t2\r3\t4", // '\r' case - this will be split into two tuples
+ "5\t6\r", // '\r\n' case
+ "7\t8", // '\n' case
+ "9\t10\r" // '\r\n' at the end of file
+ };
+
+ // bzip compressed input
+ File in = File.createTempFile("junit", ".bz2");
+ String compressedInputFileName = in.getAbsolutePath();
+ in.deleteOnExit();
+
+ try {
+ CBZip2OutputStream cos =
+ new CBZip2OutputStream(new FileOutputStream(in));
+ for (int i = 0; i < inputData.length; i++) {
+ StringBuffer sb = new StringBuffer();
+ sb.append(inputData[i]).append("\n");
+ byte bytes[] = sb.toString().getBytes();
+ cos.write(bytes);
+ }
+ cos.close();
+
+ Util.copyFromLocalToCluster(cluster, compressedInputFileName,
+ compressedInputFileName);
+
+ // pig script to read compressed input
+ PigServer pig = new PigServer(ExecType.MAPREDUCE, cluster
+ .getProperties());
+
+ // pig script to read compressed input
+ String script ="a = load '" + compressedInputFileName +"';";
+ pig.registerQuery(script);
+
+ pig.registerQuery("store a into 'intermediate.bz';");
+ pig.registerQuery("b = load 'intermediate.bz';");
+ Iterator<Tuple> it2 = pig.openIterator("b");
+ while (it2.hasNext()) {
+ it2.next();
+ }
+ } finally {
+ in.delete();
+ Util.deleteFile(cluster, "intermediate.bz");
+ Util.deleteFile(cluster, "final.bz");
+ }
+ }
/**
* Tests that '\n', '\r' and '\r\n' are treated as record delims when using
* bzip just like they are when using uncompressed text