You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ol...@apache.org on 2008/09/18 03:15:04 UTC

svn commit: r696528 - in /incubator/pig/branches/types: lib-src/bzip2/org/apache/tools/bzip2r/CBZip2OutputStream.java src/org/apache/pig/data/DistinctDataBag.java test/org/apache/pig/test/TestBZip.java test/org/apache/pig/test/TestDataBag.java

Author: olga
Date: Wed Sep 17 18:15:03 2008
New Revision: 696528

URL: http://svn.apache.org/viewvc?rev=696528&view=rev
Log:
merge from trunk for PIG-342-151

Added:
    incubator/pig/branches/types/test/org/apache/pig/test/TestBZip.java
Modified:
    incubator/pig/branches/types/lib-src/bzip2/org/apache/tools/bzip2r/CBZip2OutputStream.java
    incubator/pig/branches/types/src/org/apache/pig/data/DistinctDataBag.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestDataBag.java

Modified: incubator/pig/branches/types/lib-src/bzip2/org/apache/tools/bzip2r/CBZip2OutputStream.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/lib-src/bzip2/org/apache/tools/bzip2r/CBZip2OutputStream.java?rev=696528&r1=696527&r2=696528&view=diff
==============================================================================
--- incubator/pig/branches/types/lib-src/bzip2/org/apache/tools/bzip2r/CBZip2OutputStream.java (original)
+++ incubator/pig/branches/types/lib-src/bzip2/org/apache/tools/bzip2r/CBZip2OutputStream.java Wed Sep 17 18:15:03 2008
@@ -424,6 +424,11 @@
         super.finalize();
     }
 
+    // The bytes to fillin an empty file
+    final private static byte emptyFileArray[] = {
+        0x39, 0x17, 0x72, 0x45, 0x38, 0x50, (byte) 0x90, 00, 00, 00, 00
+    };
+    
     public void close() throws IOException {
         if (closed) {
             return;
@@ -434,8 +439,10 @@
         }
         currentChar = -1;
         if (written){
-            endBlock();
-            endCompression();
+        	endBlock();
+        	endCompression();
+        } else {
+            bsStream.write(emptyFileArray);
         }
         closed = true;
         super.close();

Modified: incubator/pig/branches/types/src/org/apache/pig/data/DistinctDataBag.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/data/DistinctDataBag.java?rev=696528&r1=696527&r2=696528&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/data/DistinctDataBag.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/data/DistinctDataBag.java Wed Sep 17 18:15:03 2008
@@ -72,6 +72,28 @@
         return true;
     }
     
+    
+    public long size() {
+        if (mSpillFiles != null && mSpillFiles.size() > 0){
+            //We need to racalculate size to guarantee a count of unique 
+            //entries including those on disk
+            Iterator<Tuple> iter = iterator();
+            int newSize = 0;
+            while (iter.hasNext()) {
+                newSize++;
+                iter.next();
+            }
+            
+            synchronized(mContents) {
+                //we don't want adds to change our numbers
+                //the lock may need to cover more of the method
+                mSize = newSize;
+            }
+        }
+        return mSize;
+    }
+    
+    
     public Iterator<Tuple> iterator() {
         return new DistinctDataBagIterator();
     }
@@ -88,7 +110,6 @@
     @Override
     public void addAll(DataBag b) {
         synchronized (mContents) {
-            mSize += b.size();
             Iterator<Tuple> i = b.iterator();
             while (i.hasNext()) {
                 if (mContents.add(i.next())) {

Added: incubator/pig/branches/types/test/org/apache/pig/test/TestBZip.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestBZip.java?rev=696528&view=auto
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestBZip.java (added)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestBZip.java Wed Sep 17 18:15:03 2008
@@ -0,0 +1,122 @@
+package org.apache.pig.test;
+
+import static org.apache.pig.ExecType.MAPREDUCE;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.util.HashMap;
+import java.util.Iterator;
+
+import junit.framework.TestCase;
+
+import org.apache.pig.PigServer;
+import org.apache.pig.backend.local.datastorage.LocalSeekableInputStream;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.tools.bzip2r.CBZip2InputStream;
+import org.apache.tools.bzip2r.CBZip2OutputStream;
+import org.junit.Test;
+
+public class TestBZip extends TestCase {
+    MiniCluster cluster = MiniCluster.buildCluster();
+
+    /**
+     * Tests the end-to-end writing and reading of a BZip file.
+     */
+    @Test
+    public void testBzipInPig() throws Exception {
+        PigServer pig = new PigServer(MAPREDUCE);
+        try {
+            pig.deleteFile("junit-out.bz");
+        } catch (Exception e) {
+        }
+        File in = File.createTempFile("junit", ".bz");
+        in.deleteOnExit();
+        File out = File.createTempFile("junit", ".bz");
+        out.deleteOnExit();
+        out.delete();
+        CBZip2OutputStream cos = new CBZip2OutputStream(
+                new FileOutputStream(in));
+        for (int i = 1; i < 100; i++) {
+            cos.write((i + "\n").getBytes());
+            cos.write((-i + "\n").getBytes());
+        }
+        cos.close();
+        pig.registerQuery("AA=load 'file:" + in.getAbsolutePath() + "';");
+        pig.registerQuery("A=foreach (group (filter AA by $0 > 0) all) generate flatten($1);");
+        pig.store("A", "file:" + out.getAbsolutePath());
+        CBZip2InputStream cis = new CBZip2InputStream(
+                new LocalSeekableInputStream(new File(out, "part-00000.bz")));
+        // Just a sanity check, to make sure it was a bzip file; we
+        // will do the value verification later
+        assertEquals(100, cis.read(new byte[100]));
+        cis.close();
+        pig.registerQuery("B=load 'file:" + out.getAbsolutePath() + "';");
+        Iterator<Tuple> i = pig.openIterator("B");
+        HashMap<Integer, Integer> map = new HashMap<Integer, Integer>();
+        while (i.hasNext()) {
+            Integer val = DataType.toInteger(i.next().get(0));
+            map.put(val, val);
+            
+        }
+        assertEquals(new Integer(99), new Integer(map.keySet().size()));
+        for(int j = 1; j < 100; j++) {
+          assertEquals(new Integer(j), map.get(j));
+        }
+        in.delete();
+        out.delete();
+    }
+
+    /**
+     * Tests the end-to-end writing and reading of an empty BZip file.
+     */
+    @Test
+    public void testEmptyBzipInPig() throws Exception {
+        PigServer pig = new PigServer(MAPREDUCE);
+        try {
+            pig.deleteFile("junit-out.bz");
+        } catch (Exception e) {
+        }
+        File in = File.createTempFile("junit", ".tmp");
+        in.deleteOnExit();
+        File out = File.createTempFile("junit", ".bz");
+        out.deleteOnExit();
+        out.delete();
+        FileOutputStream fos = new FileOutputStream(in);
+        fos.write("55\n".getBytes());
+        fos.close();
+        System.out.println(in.getAbsolutePath());
+        pig.registerQuery("AA=load 'file:" + in.getAbsolutePath() + "';");
+        pig
+                .registerQuery("A=foreach (group (filter AA by $0 < '0') all) generate flatten($1);");
+        pig.store("A", "file:" + out.getAbsolutePath());
+        CBZip2InputStream cis = new CBZip2InputStream(
+                new LocalSeekableInputStream(new File(out, "part-00000.bz")));
+        assertEquals(-1, cis.read(new byte[100]));
+        cis.close();
+        pig.registerQuery("B=load 'file:" + out.getAbsolutePath() + "';");
+        pig.openIterator("B");
+        in.delete();
+        out.delete();
+    }
+
+    /**
+     * Tests the writing and reading of an empty BZip file.
+     */
+    @Test
+    public void testEmptyBzip() throws Exception {
+        File tmp = File.createTempFile("junit", ".tmp");
+        tmp.deleteOnExit();
+        CBZip2OutputStream cos = new CBZip2OutputStream(new FileOutputStream(
+                tmp));
+        cos.close();
+        assertNotSame(0, tmp.length());
+        CBZip2InputStream cis = new CBZip2InputStream(
+                new LocalSeekableInputStream(tmp));
+        assertEquals(-1, cis.read(new byte[100]));
+        cis.close();
+        tmp.delete();
+
+    }
+}

Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestDataBag.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestDataBag.java?rev=696528&r1=696527&r2=696528&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestDataBag.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestDataBag.java Wed Sep 17 18:15:03 2008
@@ -544,6 +544,8 @@
             }
             mgr.forceSpill();
         }
+        
+        assertEquals("Size of distinct data bag is incorrect", b.size(), rightAnswer.size());
 
         // Read tuples back, hopefully they come out in the same order.
         Iterator<Tuple> bIter = b.iterator();