You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ol...@apache.org on 2008/09/18 03:15:04 UTC
svn commit: r696528 - in /incubator/pig/branches/types:
lib-src/bzip2/org/apache/tools/bzip2r/CBZip2OutputStream.java
src/org/apache/pig/data/DistinctDataBag.java
test/org/apache/pig/test/TestBZip.java
test/org/apache/pig/test/TestDataBag.java
Author: olga
Date: Wed Sep 17 18:15:03 2008
New Revision: 696528
URL: http://svn.apache.org/viewvc?rev=696528&view=rev
Log:
merge from trunk for PIG-342-151
Added:
incubator/pig/branches/types/test/org/apache/pig/test/TestBZip.java
Modified:
incubator/pig/branches/types/lib-src/bzip2/org/apache/tools/bzip2r/CBZip2OutputStream.java
incubator/pig/branches/types/src/org/apache/pig/data/DistinctDataBag.java
incubator/pig/branches/types/test/org/apache/pig/test/TestDataBag.java
Modified: incubator/pig/branches/types/lib-src/bzip2/org/apache/tools/bzip2r/CBZip2OutputStream.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/lib-src/bzip2/org/apache/tools/bzip2r/CBZip2OutputStream.java?rev=696528&r1=696527&r2=696528&view=diff
==============================================================================
--- incubator/pig/branches/types/lib-src/bzip2/org/apache/tools/bzip2r/CBZip2OutputStream.java (original)
+++ incubator/pig/branches/types/lib-src/bzip2/org/apache/tools/bzip2r/CBZip2OutputStream.java Wed Sep 17 18:15:03 2008
@@ -424,6 +424,11 @@
super.finalize();
}
+ // The bytes to fillin an empty file
+ final private static byte emptyFileArray[] = {
+ 0x39, 0x17, 0x72, 0x45, 0x38, 0x50, (byte) 0x90, 00, 00, 00, 00
+ };
+
public void close() throws IOException {
if (closed) {
return;
@@ -434,8 +439,10 @@
}
currentChar = -1;
if (written){
- endBlock();
- endCompression();
+ endBlock();
+ endCompression();
+ } else {
+ bsStream.write(emptyFileArray);
}
closed = true;
super.close();
Modified: incubator/pig/branches/types/src/org/apache/pig/data/DistinctDataBag.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/data/DistinctDataBag.java?rev=696528&r1=696527&r2=696528&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/data/DistinctDataBag.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/data/DistinctDataBag.java Wed Sep 17 18:15:03 2008
@@ -72,6 +72,28 @@
return true;
}
+
+ public long size() {
+ if (mSpillFiles != null && mSpillFiles.size() > 0){
+ //We need to racalculate size to guarantee a count of unique
+ //entries including those on disk
+ Iterator<Tuple> iter = iterator();
+ int newSize = 0;
+ while (iter.hasNext()) {
+ newSize++;
+ iter.next();
+ }
+
+ synchronized(mContents) {
+ //we don't want adds to change our numbers
+ //the lock may need to cover more of the method
+ mSize = newSize;
+ }
+ }
+ return mSize;
+ }
+
+
public Iterator<Tuple> iterator() {
return new DistinctDataBagIterator();
}
@@ -88,7 +110,6 @@
@Override
public void addAll(DataBag b) {
synchronized (mContents) {
- mSize += b.size();
Iterator<Tuple> i = b.iterator();
while (i.hasNext()) {
if (mContents.add(i.next())) {
Added: incubator/pig/branches/types/test/org/apache/pig/test/TestBZip.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestBZip.java?rev=696528&view=auto
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestBZip.java (added)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestBZip.java Wed Sep 17 18:15:03 2008
@@ -0,0 +1,122 @@
+package org.apache.pig.test;
+
+import static org.apache.pig.ExecType.MAPREDUCE;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.util.HashMap;
+import java.util.Iterator;
+
+import junit.framework.TestCase;
+
+import org.apache.pig.PigServer;
+import org.apache.pig.backend.local.datastorage.LocalSeekableInputStream;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.tools.bzip2r.CBZip2InputStream;
+import org.apache.tools.bzip2r.CBZip2OutputStream;
+import org.junit.Test;
+
+public class TestBZip extends TestCase {
+ MiniCluster cluster = MiniCluster.buildCluster();
+
+ /**
+ * Tests the end-to-end writing and reading of a BZip file.
+ */
+ @Test
+ public void testBzipInPig() throws Exception {
+ PigServer pig = new PigServer(MAPREDUCE);
+ try {
+ pig.deleteFile("junit-out.bz");
+ } catch (Exception e) {
+ }
+ File in = File.createTempFile("junit", ".bz");
+ in.deleteOnExit();
+ File out = File.createTempFile("junit", ".bz");
+ out.deleteOnExit();
+ out.delete();
+ CBZip2OutputStream cos = new CBZip2OutputStream(
+ new FileOutputStream(in));
+ for (int i = 1; i < 100; i++) {
+ cos.write((i + "\n").getBytes());
+ cos.write((-i + "\n").getBytes());
+ }
+ cos.close();
+ pig.registerQuery("AA=load 'file:" + in.getAbsolutePath() + "';");
+ pig.registerQuery("A=foreach (group (filter AA by $0 > 0) all) generate flatten($1);");
+ pig.store("A", "file:" + out.getAbsolutePath());
+ CBZip2InputStream cis = new CBZip2InputStream(
+ new LocalSeekableInputStream(new File(out, "part-00000.bz")));
+ // Just a sanity check, to make sure it was a bzip file; we
+ // will do the value verification later
+ assertEquals(100, cis.read(new byte[100]));
+ cis.close();
+ pig.registerQuery("B=load 'file:" + out.getAbsolutePath() + "';");
+ Iterator<Tuple> i = pig.openIterator("B");
+ HashMap<Integer, Integer> map = new HashMap<Integer, Integer>();
+ while (i.hasNext()) {
+ Integer val = DataType.toInteger(i.next().get(0));
+ map.put(val, val);
+
+ }
+ assertEquals(new Integer(99), new Integer(map.keySet().size()));
+ for(int j = 1; j < 100; j++) {
+ assertEquals(new Integer(j), map.get(j));
+ }
+ in.delete();
+ out.delete();
+ }
+
+ /**
+ * Tests the end-to-end writing and reading of an empty BZip file.
+ */
+ @Test
+ public void testEmptyBzipInPig() throws Exception {
+ PigServer pig = new PigServer(MAPREDUCE);
+ try {
+ pig.deleteFile("junit-out.bz");
+ } catch (Exception e) {
+ }
+ File in = File.createTempFile("junit", ".tmp");
+ in.deleteOnExit();
+ File out = File.createTempFile("junit", ".bz");
+ out.deleteOnExit();
+ out.delete();
+ FileOutputStream fos = new FileOutputStream(in);
+ fos.write("55\n".getBytes());
+ fos.close();
+ System.out.println(in.getAbsolutePath());
+ pig.registerQuery("AA=load 'file:" + in.getAbsolutePath() + "';");
+ pig
+ .registerQuery("A=foreach (group (filter AA by $0 < '0') all) generate flatten($1);");
+ pig.store("A", "file:" + out.getAbsolutePath());
+ CBZip2InputStream cis = new CBZip2InputStream(
+ new LocalSeekableInputStream(new File(out, "part-00000.bz")));
+ assertEquals(-1, cis.read(new byte[100]));
+ cis.close();
+ pig.registerQuery("B=load 'file:" + out.getAbsolutePath() + "';");
+ pig.openIterator("B");
+ in.delete();
+ out.delete();
+ }
+
+ /**
+ * Tests the writing and reading of an empty BZip file.
+ */
+ @Test
+ public void testEmptyBzip() throws Exception {
+ File tmp = File.createTempFile("junit", ".tmp");
+ tmp.deleteOnExit();
+ CBZip2OutputStream cos = new CBZip2OutputStream(new FileOutputStream(
+ tmp));
+ cos.close();
+ assertNotSame(0, tmp.length());
+ CBZip2InputStream cis = new CBZip2InputStream(
+ new LocalSeekableInputStream(tmp));
+ assertEquals(-1, cis.read(new byte[100]));
+ cis.close();
+ tmp.delete();
+
+ }
+}
Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestDataBag.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestDataBag.java?rev=696528&r1=696527&r2=696528&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestDataBag.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestDataBag.java Wed Sep 17 18:15:03 2008
@@ -544,6 +544,8 @@
}
mgr.forceSpill();
}
+
+ assertEquals("Size of distinct data bag is incorrect", b.size(), rightAnswer.size());
// Read tuples back, hopefully they come out in the same order.
Iterator<Tuple> bIter = b.iterator();