You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2012/06/19 05:09:19 UTC

[2/4] Cleanup a bunch of compiler warnings, mostly by adding serialVersionUIDs to DoFn and friends

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/2d25d5df/src/main/java/com/cloudera/crunch/io/text/CBZip2InputStream.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/cloudera/crunch/io/text/CBZip2InputStream.java b/src/main/java/com/cloudera/crunch/io/text/CBZip2InputStream.java
index fc9cc4a..d5bb384 100644
--- a/src/main/java/com/cloudera/crunch/io/text/CBZip2InputStream.java
+++ b/src/main/java/com/cloudera/crunch/io/text/CBZip2InputStream.java
@@ -90,1003 +90,995 @@ import org.apache.hadoop.mapreduce.InputSplit;
  * @author <a href="mailto:keiron@aftexsw.com">Keiron Liddle</a>
  */
 public class CBZip2InputStream extends InputStream implements BZip2Constants {
-    private static void cadvise(String reason) throws IOException {
-        throw new IOException(reason);
+  private static void cadvise(String reason) throws IOException {
+    throw new IOException(reason);
+  }
+
+  private static void compressedStreamEOF() throws IOException {
+    cadvise("compressedStream EOF");
+  }
+
+  private void makeMaps() {
+    int i;
+    nInUse = 0;
+    for (i = 0; i < 256; i++) {
+      if (inUse[i]) {
+        seqToUnseq[nInUse] = (char) i;
+        unseqToSeq[i] = (char) nInUse;
+        nInUse++;
+      }
     }
+  }
 
-    private static void compressedStreamEOF() throws IOException {
-        cadvise("compressedStream EOF");
-    }
-
-    private void makeMaps() {
-        int i;
-        nInUse = 0;
-        for (i = 0; i < 256; i++) {
-            if (inUse[i]) {
-                seqToUnseq[nInUse] = (char) i;
-                unseqToSeq[i] = (char) nInUse;
-                nInUse++;
-            }
-        }
-    }
-
-    /*
+  /*
       index of the last char in the block, so
       the block size == last + 1.
-    */
-    private int  last;
+   */
+  private int  last;
 
-    /*
+  /*
       index in zptr[] of original string after sorting.
-    */
-    private int  origPtr;
+   */
+  private int  origPtr;
 
-    /*
+  /*
       always: in the range 0 .. 9.
       The current block size is 100000 * this number.
-    */
-    private int blockSize100k;
+   */
+  private int blockSize100k;
 
-    private boolean blockRandomised;
+  private boolean blockRandomised;
 
-    // a buffer to keep the read byte
-    private int bsBuff;
-    
-    // since bzip is bit-aligned at block boundaries there can be a case wherein
-    // only few bits out of a read byte are consumed and the remaining bits
-    // need to be consumed while processing the next block.
-    // indicate how many bits in bsBuff have not been processed yet
-    private int bsLive;
-    private CRC mCrc = new CRC();
+  // a buffer to keep the read byte
+  private int bsBuff;
 
-    private boolean[] inUse = new boolean[256];
-    private int nInUse;
+  // since bzip is bit-aligned at block boundaries there can be a case wherein
+  // only few bits out of a read byte are consumed and the remaining bits
+  // need to be consumed while processing the next block.
+  // indicate how many bits in bsBuff have not been processed yet
+  private int bsLive;
+  private CRC mCrc = new CRC();
 
-    private char[] seqToUnseq = new char[256];
-    private char[] unseqToSeq = new char[256];
+  private boolean[] inUse = new boolean[256];
+  private int nInUse;
 
-    private char[] selector = new char[MAX_SELECTORS];
-    private char[] selectorMtf = new char[MAX_SELECTORS];
+  private char[] seqToUnseq = new char[256];
+  private char[] unseqToSeq = new char[256];
 
-    private int[] tt;
-    private char[] ll8;
+  private char[] selector = new char[MAX_SELECTORS];
+  private char[] selectorMtf = new char[MAX_SELECTORS];
 
-    /*
+  private int[] tt;
+  private char[] ll8;
+
+  /*
       freq table collected to save a pass over the data
       during decompression.
-    */
-    private int[] unzftab = new int[256];
-
-    private int[][] limit = new int[N_GROUPS][MAX_ALPHA_SIZE];
-    private int[][] base = new int[N_GROUPS][MAX_ALPHA_SIZE];
-    private int[][] perm = new int[N_GROUPS][MAX_ALPHA_SIZE];
-    private int[] minLens = new int[N_GROUPS];
-
-    private FSDataInputStream innerBsStream;
-    long readLimit = Long.MAX_VALUE;
-    public long getReadLimit() {
-        return readLimit;
+   */
+  private int[] unzftab = new int[256];
+
+  private int[][] limit = new int[N_GROUPS][MAX_ALPHA_SIZE];
+  private int[][] base = new int[N_GROUPS][MAX_ALPHA_SIZE];
+  private int[][] perm = new int[N_GROUPS][MAX_ALPHA_SIZE];
+  private int[] minLens = new int[N_GROUPS];
+
+  private FSDataInputStream innerBsStream;
+  long readLimit = Long.MAX_VALUE;
+  public long getReadLimit() {
+    return readLimit;
+  }
+  public void setReadLimit(long readLimit) {
+    this.readLimit = readLimit;
+  }
+  long readCount;
+  public long getReadCount() {
+    return readCount;
+  }
+
+  private boolean streamEnd = false;
+
+  private int currentChar = -1;
+
+  private static final int START_BLOCK_STATE = 1;
+  private static final int RAND_PART_A_STATE = 2;
+  private static final int RAND_PART_B_STATE = 3;
+  private static final int RAND_PART_C_STATE = 4;
+  private static final int NO_RAND_PART_A_STATE = 5;
+  private static final int NO_RAND_PART_B_STATE = 6;
+  private static final int NO_RAND_PART_C_STATE = 7;
+
+  private int currentState = START_BLOCK_STATE;
+
+  private int storedBlockCRC, storedCombinedCRC;
+  private int computedBlockCRC, computedCombinedCRC;
+  private boolean checkComputedCombinedCRC = true;
+
+  int i2, count, chPrev, ch2;
+  int i, tPos;
+  int rNToGo = 0;
+  int rTPos  = 0;
+  int j2;
+  char z;
+
+  // see comment in getPos()
+  private long retPos = -1;
+  // the position offset which corresponds to the end of the InputSplit that
+  // will be processed by this instance 
+  private long endOffsetOfSplit;
+
+  private boolean signalToStopReading;
+
+  public CBZip2InputStream(FSDataInputStream zStream, int blockSize, long end)
+      throws IOException {
+    endOffsetOfSplit = end;
+    // initialize retPos to the beginning of the current InputSplit
+    // see comments in getPos() to understand how this is used.
+    retPos = zStream.getPos();
+    ll8 = null;
+    tt = null;
+    checkComputedCombinedCRC = blockSize == -1;
+    bsSetStream(zStream);
+    initialize(blockSize);
+    initBlock(blockSize != -1);
+    setupBlock();
+  }
+
+  @Override
+  public int read() throws IOException {
+    if (streamEnd) {
+      return -1;
+    } else {
+
+      // if we just started reading a bzip block which starts at a position
+      // >= end of current split, then we should set up retpos such that
+      // after a record is read, future getPos() calls will get a value
+      // > end of current split - this way we will read only one record out
+      // of this bzip block - the rest of the records from this bzip block
+      // should be read by the next map task while processing the next split
+      if(signalToStopReading) {
+        retPos = endOffsetOfSplit + 1;
+      }
+
+      int retChar = currentChar;
+      switch(currentState) {
+      case START_BLOCK_STATE:
+        break;
+      case RAND_PART_A_STATE:
+        break;
+      case RAND_PART_B_STATE:
+        setupRandPartB();
+        break;
+      case RAND_PART_C_STATE:
+        setupRandPartC();
+        break;
+      case NO_RAND_PART_A_STATE:
+        break;
+      case NO_RAND_PART_B_STATE:
+        setupNoRandPartB();
+        break;
+      case NO_RAND_PART_C_STATE:
+        setupNoRandPartC();
+        break;
+      default:
+        break;
+      }
+      return retChar;
     }
-    public void setReadLimit(long readLimit) {
-        this.readLimit = readLimit;
-    }
-    long readCount;
-    public long getReadCount() {
-        return readCount;
+  }
+
+  /**
+   * getPos is used by the caller to know when the processing of the current 
+   * {@link InputSplit} is complete. In this method, as we read each bzip
+   * block, we keep returning the beginning of the {@link InputSplit} as the
+   * return value until we hit a block  which starts at a position >= end of
+   * current split. At that point we should set up retpos such that after a 
+   * record is read, future getPos() calls will get a value > end of current 
+   * split - this way we will read only one record out of that bzip block - 
+   * the rest of the records from that bzip block should be read by the next 
+   * map task while processing the next split
+   * @return
+   * @throws IOException
+   */
+  public long getPos() throws IOException{
+    return retPos;
+  }
+
+  private void initialize(int blockSize) throws IOException {
+    if (blockSize == -1) {
+      char magic1, magic2;
+      char magic3, magic4;
+      magic1 = bsGetUChar();
+      magic2 = bsGetUChar();
+      magic3 = bsGetUChar();
+      magic4 = bsGetUChar();
+      if (magic1 != 'B' || magic2 != 'Z' || 
+          magic3 != 'h' || magic4 < '1' || magic4 > '9') {
+        bsFinishedWithStream();
+        streamEnd = true;
+        return;
+      }
+      blockSize = magic4 - '0';
     }
 
-    private boolean streamEnd = false;
-
-    private int currentChar = -1;
-
-    private static final int START_BLOCK_STATE = 1;
-    private static final int RAND_PART_A_STATE = 2;
-    private static final int RAND_PART_B_STATE = 3;
-    private static final int RAND_PART_C_STATE = 4;
-    private static final int NO_RAND_PART_A_STATE = 5;
-    private static final int NO_RAND_PART_B_STATE = 6;
-    private static final int NO_RAND_PART_C_STATE = 7;
-
-    private int currentState = START_BLOCK_STATE;
-
-    private int storedBlockCRC, storedCombinedCRC;
-    private int computedBlockCRC, computedCombinedCRC;
-    private boolean checkComputedCombinedCRC = true;
-
-    int i2, count, chPrev, ch2;
-    int i, tPos;
-    int rNToGo = 0;
-    int rTPos  = 0;
-    int j2;
-    char z;
-    
-    // see comment in getPos()
-    private long retPos = -1;
-    // the position offset which corresponds to the end of the InputSplit that
-    // will be processed by this instance 
-    private long endOffsetOfSplit;
-
-    private boolean signalToStopReading;
-
-    public CBZip2InputStream(FSDataInputStream zStream, int blockSize, long end)
-    throws IOException {
-        endOffsetOfSplit = end;
-        // initialize retPos to the beginning of the current InputSplit
-        // see comments in getPos() to understand how this is used.
-        retPos = zStream.getPos();
-    	ll8 = null;
-        tt = null;
-        checkComputedCombinedCRC = blockSize == -1;
-        bsSetStream(zStream);
-        initialize(blockSize);
-        initBlock(blockSize != -1);
-        setupBlock();
-    }
+    setDecompressStructureSizes(blockSize);
+    computedCombinedCRC = 0;
+  }
 
-    @Override
-    public int read() throws IOException {
-        if (streamEnd) {
-            return -1;
-        } else {
-            
-            // if we just started reading a bzip block which starts at a position
-            // >= end of current split, then we should set up retpos such that
-            // after a record is read, future getPos() calls will get a value
-            // > end of current split - this way we will read only one record out
-            // of this bzip block - the rest of the records from this bzip block
-            // should be read by the next map task while processing the next split
-            if(signalToStopReading) {
-                retPos = endOffsetOfSplit + 1;
-            }
-            
-            int retChar = currentChar;
-            switch(currentState) {
-            case START_BLOCK_STATE:
-                break;
-            case RAND_PART_A_STATE:
-                break;
-            case RAND_PART_B_STATE:
-                setupRandPartB();
-                break;
-            case RAND_PART_C_STATE:
-                setupRandPartC();
-                break;
-            case NO_RAND_PART_A_STATE:
-                break;
-            case NO_RAND_PART_B_STATE:
-                setupNoRandPartB();
-                break;
-            case NO_RAND_PART_C_STATE:
-                setupNoRandPartC();
-                break;
-            default:
-                break;
-            }
-            return retChar;
-        }
-    }
+  private final static long mask = 0xffffffffffffL;
+  private final static long eob = 0x314159265359L & mask;
+  private final static long eos = 0x177245385090L & mask;
 
-    /**
-     * getPos is used by the caller to know when the processing of the current 
-     * {@link InputSplit} is complete. In this method, as we read each bzip
-     * block, we keep returning the beginning of the {@link InputSplit} as the
-     * return value until we hit a block  which starts at a position >= end of
-     * current split. At that point we should set up retpos such that after a 
-     * record is read, future getPos() calls will get a value > end of current 
-     * split - this way we will read only one record out of that bzip block - 
-     * the rest of the records from that bzip block should be read by the next 
-     * map task while processing the next split
-     * @return
-     * @throws IOException
-     */
-    public long getPos() throws IOException{
-        return retPos;
+  private void initBlock(boolean searchForMagic) throws IOException {
+    if (readCount >= readLimit) {
+      bsFinishedWithStream();
+      streamEnd = true;
+      return;
     }
-    
-    private void initialize(int blockSize) throws IOException {
-        if (blockSize == -1) {
-            char magic1, magic2;
-            char magic3, magic4;
-            magic1 = bsGetUChar();
-            magic2 = bsGetUChar();
-            magic3 = bsGetUChar();
-            magic4 = bsGetUChar();
-            if (magic1 != 'B' || magic2 != 'Z' || 
-                    magic3 != 'h' || magic4 < '1' || magic4 > '9') {
-                bsFinishedWithStream();
-                streamEnd = true;
-                return;
-            }
-            blockSize = magic4 - '0';
-        }
 
-        setDecompressStructureSizes(blockSize);
-        computedCombinedCRC = 0;
+    // position before beginning of bzip block header        
+    long pos = innerBsStream.getPos();
+    if (!searchForMagic) {
+      char magic1, magic2, magic3, magic4;
+      char magic5, magic6;
+      magic1 = bsGetUChar();
+      magic2 = bsGetUChar();
+      magic3 = bsGetUChar();
+      magic4 = bsGetUChar();
+      magic5 = bsGetUChar();
+      magic6 = bsGetUChar();
+      if (magic1 == 0x17 && magic2 == 0x72 && magic3 == 0x45
+          && magic4 == 0x38 && magic5 == 0x50 && magic6 == 0x90) {
+        complete();
+        return;
+      }
+
+      if (magic1 != 0x31 || magic2 != 0x41 || magic3 != 0x59
+          || magic4 != 0x26 || magic5 != 0x53 || magic6 != 0x59) {
+        badBlockHeader();
+        streamEnd = true;
+        return;
+      }
+    } else {
+      long magic = 0;
+      for(int i = 0; i < 6; i++) {
+        magic <<= 8;
+        magic |= bsGetUChar();
+      }
+      while(magic != eos && magic != eob) {
+        magic <<= 1;
+        magic &= mask;
+        magic |= bsR(1);
+        // if we just found the block header, the beginning of the bzip 
+        // header would be 6 bytes before the current stream position
+        // when we eventually break from this while(), if it is because
+        // we found a block header then pos will have the correct start
+        // of header position
+        pos = innerBsStream.getPos() - 6;
+      }
+      if (magic == eos) {
+        complete();
+        return;
+      }
+
+    }
+    // if the previous block finished a few bits into the previous byte,
+    // then we will first be reading the remaining bits from the previous
+    // byte - so logically pos needs to be one behind
+    if(bsLive > 0)  {
+      pos--;
     }
 
-    private final static long mask = 0xffffffffffffL;
-    private final static long eob = 0x314159265359L & mask;
-    private final static long eos = 0x177245385090L & mask;
-    
-    private void initBlock(boolean searchForMagic) throws IOException {
-        if (readCount >= readLimit) {
-            bsFinishedWithStream();
-            streamEnd = true;
-            return;
-        }
-        
-        // position before beginning of bzip block header        
-        long pos = innerBsStream.getPos();
-        if (!searchForMagic) {
-            char magic1, magic2, magic3, magic4;
-            char magic5, magic6;
-            magic1 = bsGetUChar();
-            magic2 = bsGetUChar();
-            magic3 = bsGetUChar();
-            magic4 = bsGetUChar();
-            magic5 = bsGetUChar();
-            magic6 = bsGetUChar();
-            if (magic1 == 0x17 && magic2 == 0x72 && magic3 == 0x45
-                    && magic4 == 0x38 && magic5 == 0x50 && magic6 == 0x90) {
-                complete();
-                return;
-            }
+    if(pos >= endOffsetOfSplit) {
+      // we have reached a block which begins exactly at the next InputSplit
+      // or >1 byte into the next InputSplit - lets record this fact
+      signalToStopReading = true;
+    }
+    storedBlockCRC = bsGetInt32();
 
-            if (magic1 != 0x31 || magic2 != 0x41 || magic3 != 0x59
-                    || magic4 != 0x26 || magic5 != 0x53 || magic6 != 0x59) {
-                badBlockHeader();
-                streamEnd = true;
-                return;
-            }
-        } else {
-            long magic = 0;
-            for(int i = 0; i < 6; i++) {
-                magic <<= 8;
-                magic |= bsGetUChar();
-            }
-            while(magic != eos && magic != eob) {
-                magic <<= 1;
-                magic &= mask;
-                magic |= bsR(1);
-                // if we just found the block header, the beginning of the bzip 
-                // header would be 6 bytes before the current stream position
-                // when we eventually break from this while(), if it is because
-                // we found a block header then pos will have the correct start
-                // of header position
-                pos = innerBsStream.getPos() - 6;
-            }
-            if (magic == eos) {
-                complete();
-                return;
-            }
-            
-        }
-        // if the previous block finished a few bits into the previous byte,
-        // then we will first be reading the remaining bits from the previous
-        // byte - so logically pos needs to be one behind
-        if(bsLive > 0)  {
-            pos--;
-        }
-        
-        if(pos >= endOffsetOfSplit) {
-            // we have reached a block which begins exactly at the next InputSplit
-            // or >1 byte into the next InputSplit - lets record this fact
-            signalToStopReading = true;
-        }
-        storedBlockCRC = bsGetInt32();
+    if (bsR(1) == 1) {
+      blockRandomised = true;
+    } else {
+      blockRandomised = false;
+    }
 
-        if (bsR(1) == 1) {
-            blockRandomised = true;
-        } else {
-            blockRandomised = false;
-        }
+    //        currBlockNo++;
+    getAndMoveToFrontDecode();
 
-        //        currBlockNo++;
-        getAndMoveToFrontDecode();
+    mCrc.initialiseCRC();
+    currentState = START_BLOCK_STATE;
+  }
 
-        mCrc.initialiseCRC();
-        currentState = START_BLOCK_STATE;
+  private void endBlock() throws IOException {
+    computedBlockCRC = mCrc.getFinalCRC();
+    /* A bad CRC is considered a fatal error. */
+    if (storedBlockCRC != computedBlockCRC) {
+      crcError();
     }
 
-    private void endBlock() throws IOException {
-        computedBlockCRC = mCrc.getFinalCRC();
-        /* A bad CRC is considered a fatal error. */
-        if (storedBlockCRC != computedBlockCRC) {
-            crcError();
-        }
+    computedCombinedCRC = (computedCombinedCRC << 1)
+        | (computedCombinedCRC >>> 31);
+    computedCombinedCRC ^= computedBlockCRC;
+  }
 
-        computedCombinedCRC = (computedCombinedCRC << 1)
-            | (computedCombinedCRC >>> 31);
-        computedCombinedCRC ^= computedBlockCRC;
+  private void complete() throws IOException {
+    storedCombinedCRC = bsGetInt32();
+    if (checkComputedCombinedCRC && 
+        storedCombinedCRC != computedCombinedCRC) {
+      crcError();
     }
-
-    private void complete() throws IOException {
-        storedCombinedCRC = bsGetInt32();
-        if (checkComputedCombinedCRC && 
-                storedCombinedCRC != computedCombinedCRC) {
-            crcError();
-        }
-        if (innerBsStream.getPos() < endOffsetOfSplit) {
-        	throw new IOException("Encountered additional bytes in the filesplit past the crc block. "
-        			+ "Loading of concatenated bz2 files is not supported");
-        }
-        bsFinishedWithStream();
-        streamEnd = true;
-    }
-
-    private static void blockOverrun() throws IOException {
-        cadvise("block overrun");
+    if (innerBsStream.getPos() < endOffsetOfSplit) {
+      throw new IOException("Encountered additional bytes in the filesplit past the crc block. "
+          + "Loading of concatenated bz2 files is not supported");
     }
-
-    private static void badBlockHeader() throws IOException {
-        cadvise("bad block header");
+    bsFinishedWithStream();
+    streamEnd = true;
+  }
+
+  private static void blockOverrun() throws IOException {
+    cadvise("block overrun");
+  }
+
+  private static void badBlockHeader() throws IOException {
+    cadvise("bad block header");
+  }
+
+  private static void crcError() throws IOException {
+    cadvise("CRC error");
+  }
+
+  private void bsFinishedWithStream() {
+    if (this.innerBsStream != null) {
+      if (this.innerBsStream != System.in) {
+        this.innerBsStream = null;
+      }
     }
-
-    private static void crcError() throws IOException {
-        cadvise("CRC error");
+  }
+
+  private void bsSetStream(FSDataInputStream f) {
+    innerBsStream = f;
+    bsLive = 0;
+    bsBuff = 0;
+  }
+
+  final private int readBs() throws IOException {
+    readCount++;
+    return innerBsStream.read();
+  }
+  private int bsR(int n) throws IOException {
+    int v;
+    while (bsLive < n) {
+      int zzi;
+      zzi = readBs();
+      if (zzi == -1) {
+        compressedStreamEOF();
+      }
+      bsBuff = (bsBuff << 8) | (zzi & 0xff);
+      bsLive += 8;
     }
 
-    private void bsFinishedWithStream() {
-        if (this.innerBsStream != null) {
-            if (this.innerBsStream != System.in) {
-                this.innerBsStream = null;
-            }
+    v = (bsBuff >> (bsLive - n)) & ((1 << n) - 1);
+    bsLive -= n;
+    return v;
+  }
+
+
+  private char bsGetUChar() throws IOException {
+    return (char) bsR(8);
+  }
+
+  private int bsGetint() throws IOException {
+    int u = 0;
+    u = (u << 8) | bsR(8);
+    u = (u << 8) | bsR(8);
+    u = (u << 8) | bsR(8);
+    u = (u << 8) | bsR(8);
+    return u;
+  }
+
+  private int bsGetIntVS(int numBits) throws IOException {
+    return bsR(numBits);
+  }
+
+  private int bsGetInt32() throws IOException {
+    return bsGetint();
+  }
+
+  private void hbCreateDecodeTables(int[] limit, int[] base,
+      int[] perm, char[] length,
+      int minLen, int maxLen, int alphaSize) {
+    int pp, i, j, vec;
+
+    pp = 0;
+    for (i = minLen; i <= maxLen; i++) {
+      for (j = 0; j < alphaSize; j++) {
+        if (length[j] == i) {
+          perm[pp] = j;
+          pp++;
         }
+      }
     }
 
-    private void bsSetStream(FSDataInputStream f) {
-        innerBsStream = f;
-        bsLive = 0;
-        bsBuff = 0;
+    for (i = 0; i < MAX_CODE_LEN; i++) {
+      base[i] = 0;
     }
-
-    final private int readBs() throws IOException {
-        readCount++;
-        return innerBsStream.read();
+    for (i = 0; i < alphaSize; i++) {
+      base[length[i] + 1]++;
     }
-    private int bsR(int n) throws IOException {
-        int v;
-        while (bsLive < n) {
-            int zzi;
-            zzi = readBs();
-            if (zzi == -1) {
-                compressedStreamEOF();
-            }
-            bsBuff = (bsBuff << 8) | (zzi & 0xff);
-            bsLive += 8;
-        }
 
-        v = (bsBuff >> (bsLive - n)) & ((1 << n) - 1);
-        bsLive -= n;
-        return v;
+    for (i = 1; i < MAX_CODE_LEN; i++) {
+      base[i] += base[i - 1];
     }
-    
 
-    private char bsGetUChar() throws IOException {
-        return (char) bsR(8);
+    for (i = 0; i < MAX_CODE_LEN; i++) {
+      limit[i] = 0;
     }
+    vec = 0;
 
-    private int bsGetint() throws IOException {
-        int u = 0;
-        u = (u << 8) | bsR(8);
-        u = (u << 8) | bsR(8);
-        u = (u << 8) | bsR(8);
-        u = (u << 8) | bsR(8);
-        return u;
+    for (i = minLen; i <= maxLen; i++) {
+      vec += (base[i + 1] - base[i]);
+      limit[i] = vec - 1;
+      vec <<= 1;
     }
-
-    private int bsGetIntVS(int numBits) throws IOException {
-        return bsR(numBits);
+    for (i = minLen + 1; i <= maxLen; i++) {
+      base[i] = ((limit[i - 1] + 1) << 1) - base[i];
     }
-
-    private int bsGetInt32() throws IOException {
-        return bsGetint();
+  }
+
+  private void recvDecodingTables() throws IOException {
+    char len[][] = new char[N_GROUPS][MAX_ALPHA_SIZE];
+    int i, j, t, nGroups, nSelectors, alphaSize;
+    int minLen, maxLen;
+    boolean[] inUse16 = new boolean[16];
+
+    /* Receive the mapping table */
+    for (i = 0; i < 16; i++) {
+      if (bsR(1) == 1) {
+        inUse16[i] = true;
+      } else {
+        inUse16[i] = false;
+      }
     }
 
-    private void hbCreateDecodeTables(int[] limit, int[] base,
-                                      int[] perm, char[] length,
-                                      int minLen, int maxLen, int alphaSize) {
-        int pp, i, j, vec;
-
-        pp = 0;
-        for (i = minLen; i <= maxLen; i++) {
-            for (j = 0; j < alphaSize; j++) {
-                if (length[j] == i) {
-                    perm[pp] = j;
-                    pp++;
-                }
-            }
-        }
-
-        for (i = 0; i < MAX_CODE_LEN; i++) {
-            base[i] = 0;
-        }
-        for (i = 0; i < alphaSize; i++) {
-            base[length[i] + 1]++;
-        }
-
-        for (i = 1; i < MAX_CODE_LEN; i++) {
-            base[i] += base[i - 1];
-        }
-
-        for (i = 0; i < MAX_CODE_LEN; i++) {
-            limit[i] = 0;
-        }
-        vec = 0;
-
-        for (i = minLen; i <= maxLen; i++) {
-            vec += (base[i + 1] - base[i]);
-            limit[i] = vec - 1;
-            vec <<= 1;
-        }
-        for (i = minLen + 1; i <= maxLen; i++) {
-            base[i] = ((limit[i - 1] + 1) << 1) - base[i];
-        }
+    for (i = 0; i < 256; i++) {
+      inUse[i] = false;
     }
 
-    private void recvDecodingTables() throws IOException {
-        char len[][] = new char[N_GROUPS][MAX_ALPHA_SIZE];
-        int i, j, t, nGroups, nSelectors, alphaSize;
-        int minLen, maxLen;
-        boolean[] inUse16 = new boolean[16];
-
-        /* Receive the mapping table */
-        for (i = 0; i < 16; i++) {
-            if (bsR(1) == 1) {
-                inUse16[i] = true;
-            } else {
-                inUse16[i] = false;
-            }
-        }
-
-        for (i = 0; i < 256; i++) {
-            inUse[i] = false;
-        }
-
-        for (i = 0; i < 16; i++) {
-            if (inUse16[i]) {
-                for (j = 0; j < 16; j++) {
-                    if (bsR(1) == 1) {
-                        inUse[i * 16 + j] = true;
-                    }
-                }
-            }
+    for (i = 0; i < 16; i++) {
+      if (inUse16[i]) {
+        for (j = 0; j < 16; j++) {
+          if (bsR(1) == 1) {
+            inUse[i * 16 + j] = true;
+          }
         }
+      }
+    }
 
-        makeMaps();
-        alphaSize = nInUse + 2;
+    makeMaps();
+    alphaSize = nInUse + 2;
+
+    /* Now the selectors */
+    nGroups = bsR(3);
+    nSelectors = bsR(15);
+    for (i = 0; i < nSelectors; i++) {
+      j = 0;
+      while (bsR(1) == 1) {
+        j++;
+      }
+      selectorMtf[i] = (char) j;
+    }
 
-        /* Now the selectors */
-        nGroups = bsR(3);
-        nSelectors = bsR(15);
-        for (i = 0; i < nSelectors; i++) {
-            j = 0;
-            while (bsR(1) == 1) {
-                j++;
-            }
-            selectorMtf[i] = (char) j;
+    /* Undo the MTF values for the selectors. */
+    {
+      char[] pos = new char[N_GROUPS];
+      char tmp, v;
+      for (v = 0; v < nGroups; v++) {
+        pos[v] = v;
+      }
+
+      for (i = 0; i < nSelectors; i++) {
+        v = selectorMtf[i];
+        tmp = pos[v];
+        while (v > 0) {
+          pos[v] = pos[v - 1];
+          v--;
         }
+        pos[0] = tmp;
+        selector[i] = tmp;
+      }
+    }
 
-        /* Undo the MTF values for the selectors. */
-        {
-            char[] pos = new char[N_GROUPS];
-            char tmp, v;
-            for (v = 0; v < nGroups; v++) {
-                pos[v] = v;
-            }
-
-            for (i = 0; i < nSelectors; i++) {
-                v = selectorMtf[i];
-                tmp = pos[v];
-                while (v > 0) {
-                    pos[v] = pos[v - 1];
-                    v--;
-                }
-                pos[0] = tmp;
-                selector[i] = tmp;
-            }
+    /* Now the coding tables */
+    for (t = 0; t < nGroups; t++) {
+      int curr = bsR(5);
+      for (i = 0; i < alphaSize; i++) {
+        while (bsR(1) == 1) {
+          if (bsR(1) == 0) {
+            curr++;
+          } else {
+            curr--;
+          }
         }
+        len[t][i] = (char) curr;
+      }
+    }
 
-        /* Now the coding tables */
-        for (t = 0; t < nGroups; t++) {
-            int curr = bsR(5);
-            for (i = 0; i < alphaSize; i++) {
-                while (bsR(1) == 1) {
-                    if (bsR(1) == 0) {
-                        curr++;
-                    } else {
-                        curr--;
-                    }
-                }
-                len[t][i] = (char) curr;
-            }
+    /* Create the Huffman decoding tables */
+    for (t = 0; t < nGroups; t++) {
+      minLen = 32;
+      maxLen = 0;
+      for (i = 0; i < alphaSize; i++) {
+        if (len[t][i] > maxLen) {
+          maxLen = len[t][i];
         }
-
-        /* Create the Huffman decoding tables */
-        for (t = 0; t < nGroups; t++) {
-            minLen = 32;
-            maxLen = 0;
-            for (i = 0; i < alphaSize; i++) {
-                if (len[t][i] > maxLen) {
-                    maxLen = len[t][i];
-                }
-                if (len[t][i] < minLen) {
-                    minLen = len[t][i];
-                }
-            }
-            hbCreateDecodeTables(limit[t], base[t], perm[t], len[t], minLen,
-                                 maxLen, alphaSize);
-            minLens[t] = minLen;
+        if (len[t][i] < minLen) {
+          minLen = len[t][i];
         }
+      }
+      hbCreateDecodeTables(limit[t], base[t], perm[t], len[t], minLen,
+          maxLen, alphaSize);
+      minLens[t] = minLen;
     }
+  }
 
-    private void getAndMoveToFrontDecode() throws IOException {
-        char[] yy = new char[256];
-        int i, j, nextSym, limitLast;
-        int EOB, groupNo, groupPos;
+  private void getAndMoveToFrontDecode() throws IOException {
+    char[] yy = new char[256];
+    int i, j, nextSym, limitLast;
+    int EOB, groupNo, groupPos;
 
-        limitLast = baseBlockSize * blockSize100k;
-        origPtr = bsGetIntVS(24);
+    limitLast = baseBlockSize * blockSize100k;
+    origPtr = bsGetIntVS(24);
 
-        recvDecodingTables();
-        EOB = nInUse + 1;
-        groupNo = -1;
-        groupPos = 0;
+    recvDecodingTables();
+    EOB = nInUse + 1;
+    groupNo = -1;
+    groupPos = 0;
 
-        /*
+    /*
           Setting up the unzftab entries here is not strictly
           necessary, but it does save having to do it later
           in a separate pass, and so saves a block's worth of
           cache misses.
-        */
-        for (i = 0; i <= 255; i++) {
-            unzftab[i] = 0;
-        }
+     */
+    for (i = 0; i <= 255; i++) {
+      unzftab[i] = 0;
+    }
+
+    for (i = 0; i <= 255; i++) {
+      yy[i] = (char) i;
+    }
 
-        for (i = 0; i <= 255; i++) {
-            yy[i] = (char) i;
+    last = -1;
+
+    {
+      int zt, zn, zvec, zj;
+      if (groupPos == 0) {
+        groupNo++;
+        groupPos = G_SIZE;
+      }
+      groupPos--;
+      zt = selector[groupNo];
+      zn = minLens[zt];
+      zvec = bsR(zn);
+      while (zvec > limit[zt][zn]) {
+        zn++;
+        {
+          {
+            while (bsLive < 1) {
+              int zzi = 0;
+              try {
+                zzi = readBs();
+              } catch (IOException e) {
+                compressedStreamEOF();
+              }
+              if (zzi == -1) {
+                compressedStreamEOF();
+              }
+              bsBuff = (bsBuff << 8) | (zzi & 0xff);
+              bsLive += 8;
+            }
+          }
+          zj = (bsBuff >> (bsLive - 1)) & 1;
+          bsLive--;
         }
+        zvec = (zvec << 1) | zj;
+      }
+      nextSym = perm[zt][zvec - base[zt][zn]];
+    }
 
-        last = -1;
+    while (true) {
 
-        {
+      if (nextSym == EOB) {
+        break;
+      }
+
+      if (nextSym == RUNA || nextSym == RUNB) {
+        char ch;
+        int s = -1;
+        int N = 1;
+        do {
+          if (nextSym == RUNA) {
+            s = s + (0 + 1) * N;
+          } else if (nextSym == RUNB) {
+            s = s + (1 + 1) * N;
+          }
+          N = N * 2;
+          {
             int zt, zn, zvec, zj;
             if (groupPos == 0) {
-                groupNo++;
-                groupPos = G_SIZE;
+              groupNo++;
+              groupPos = G_SIZE;
             }
             groupPos--;
             zt = selector[groupNo];
             zn = minLens[zt];
             zvec = bsR(zn);
             while (zvec > limit[zt][zn]) {
-                zn++;
+              zn++;
+              {
                 {
-                    {
-                        while (bsLive < 1) {
-                            int zzi = 0;
-                            try {
-                                zzi = readBs();
-                            } catch (IOException e) {
-                                compressedStreamEOF();
-                            }
-                            if (zzi == -1) {
-                                compressedStreamEOF();
-                            }
-                            bsBuff = (bsBuff << 8) | (zzi & 0xff);
-                            bsLive += 8;
-                        }
+                  while (bsLive < 1) {
+                    int zzi = 0;
+                    try {
+                      zzi = readBs();
+                    } catch (IOException e) {
+                      compressedStreamEOF();
                     }
-                    zj = (bsBuff >> (bsLive - 1)) & 1;
-                    bsLive--;
+                    if (zzi == -1) {
+                      compressedStreamEOF();
+                    }
+                    bsBuff = (bsBuff << 8) | (zzi & 0xff);
+                    bsLive += 8;
+                  }
                 }
-                zvec = (zvec << 1) | zj;
+                zj = (bsBuff >> (bsLive - 1)) & 1;
+                bsLive--;
+              }
+              zvec = (zvec << 1) | zj;
             }
             nextSym = perm[zt][zvec - base[zt][zn]];
-        }
+          }
+        } while (nextSym == RUNA || nextSym == RUNB);
 
-        while (true) {
+        s++;
+        ch = seqToUnseq[yy[0]];
+        unzftab[ch] += s;
 
-            if (nextSym == EOB) {
-                break;
-            }
-
-            if (nextSym == RUNA || nextSym == RUNB) {
-                char ch;
-                int s = -1;
-                int N = 1;
-                do {
-                    if (nextSym == RUNA) {
-                        s = s + (0 + 1) * N;
-                    } else if (nextSym == RUNB) {
-                        s = s + (1 + 1) * N;
-                           }
-                    N = N * 2;
-                    {
-                        int zt, zn, zvec, zj;
-                        if (groupPos == 0) {
-                            groupNo++;
-                            groupPos = G_SIZE;
-                        }
-                        groupPos--;
-                        zt = selector[groupNo];
-                        zn = minLens[zt];
-                        zvec = bsR(zn);
-                        while (zvec > limit[zt][zn]) {
-                            zn++;
-                            {
-                                {
-                                    while (bsLive < 1) {
-                                        int zzi = 0;
-                                        try {
-                                            zzi = readBs();
-                                        } catch (IOException e) {
-                                            compressedStreamEOF();
-                                        }
-                                        if (zzi == -1) {
-                                            compressedStreamEOF();
-                                        }
-                                        bsBuff = (bsBuff << 8) | (zzi & 0xff);
-                                        bsLive += 8;
-                                    }
-                                }
-                                zj = (bsBuff >> (bsLive - 1)) & 1;
-                                bsLive--;
-                            }
-                            zvec = (zvec << 1) | zj;
-                        }
-                        nextSym = perm[zt][zvec - base[zt][zn]];
-                    }
-                } while (nextSym == RUNA || nextSym == RUNB);
-
-                s++;
-                ch = seqToUnseq[yy[0]];
-                unzftab[ch] += s;
-
-                while (s > 0) {
-                    last++;
-                    ll8[last] = ch;
-                    s--;
-                }
+        while (s > 0) {
+          last++;
+          ll8[last] = ch;
+          s--;
+        }
 
-                if (last >= limitLast) {
-                    blockOverrun();
-                }
-                continue;
-            } else {
-                char tmp;
-                last++;
-                if (last >= limitLast) {
-                    blockOverrun();
-                }
+        if (last >= limitLast) {
+          blockOverrun();
+        }
+        continue;
+      } else {
+        char tmp;
+        last++;
+        if (last >= limitLast) {
+          blockOverrun();
+        }
 
-                tmp = yy[nextSym - 1];
-                unzftab[seqToUnseq[tmp]]++;
-                ll8[last] = seqToUnseq[tmp];
+        tmp = yy[nextSym - 1];
+        unzftab[seqToUnseq[tmp]]++;
+        ll8[last] = seqToUnseq[tmp];
 
-                /*
+        /*
                   This loop is hammered during decompression,
                   hence the unrolling.
 
                   for (j = nextSym-1; j > 0; j--) yy[j] = yy[j-1];
-                */
-
-                j = nextSym - 1;
-                for (; j > 3; j -= 4) {
-                    yy[j]     = yy[j - 1];
-                    yy[j - 1] = yy[j - 2];
-                    yy[j - 2] = yy[j - 3];
-                    yy[j - 3] = yy[j - 4];
-                }
-                for (; j > 0; j--) {
-                    yy[j] = yy[j - 1];
-                }
-
-                yy[0] = tmp;
-                {
-                    int zt, zn, zvec, zj;
-                    if (groupPos == 0) {
-                        groupNo++;
-                        groupPos = G_SIZE;
-                    }
-                    groupPos--;
-                    zt = selector[groupNo];
-                    zn = minLens[zt];
-                    zvec = bsR(zn);
-                    while (zvec > limit[zt][zn]) {
-                        zn++;
-                        {
-                            {
-                                while (bsLive < 1) {
-                                    int zzi;
-                                    char thech = 0;
-                                    try {
-                                        thech = (char) readBs();
-                                    } catch (IOException e) {
-                                        compressedStreamEOF();
-                                    }
-                                    zzi = thech;
-                                    bsBuff = (bsBuff << 8) | (zzi & 0xff);
-                                    bsLive += 8;
-                                }
-                            }
-                            zj = (bsBuff >> (bsLive - 1)) & 1;
-                            bsLive--;
-                        }
-                        zvec = (zvec << 1) | zj;
-                    }
-                    nextSym = perm[zt][zvec - base[zt][zn]];
-                }
-                continue;
-            }
-        }
+         */
+
+         j = nextSym - 1;
+         for (; j > 3; j -= 4) {
+           yy[j]     = yy[j - 1];
+           yy[j - 1] = yy[j - 2];
+           yy[j - 2] = yy[j - 3];
+           yy[j - 3] = yy[j - 4];
+         }
+         for (; j > 0; j--) {
+           yy[j] = yy[j - 1];
+         }
+
+         yy[0] = tmp;
+         {
+           int zt, zn, zvec, zj;
+           if (groupPos == 0) {
+             groupNo++;
+             groupPos = G_SIZE;
+           }
+           groupPos--;
+           zt = selector[groupNo];
+           zn = minLens[zt];
+           zvec = bsR(zn);
+           while (zvec > limit[zt][zn]) {
+             zn++;
+             {
+               {
+                 while (bsLive < 1) {
+                   int zzi;
+                   char thech = 0;
+                   try {
+                     thech = (char) readBs();
+                   } catch (IOException e) {
+                     compressedStreamEOF();
+                   }
+                   zzi = thech;
+                   bsBuff = (bsBuff << 8) | (zzi & 0xff);
+                   bsLive += 8;
+                 }
+               }
+               zj = (bsBuff >> (bsLive - 1)) & 1;
+               bsLive--;
+             }
+             zvec = (zvec << 1) | zj;
+           }
+           nextSym = perm[zt][zvec - base[zt][zn]];
+         }
+         continue;
+      }
     }
+  }
 
-    private void setupBlock() throws IOException {
-        int[] cftab = new int[257];
-        char ch;
+  private void setupBlock() throws IOException {
+    int[] cftab = new int[257];
+    char ch;
 
-        cftab[0] = 0;
-        for (i = 1; i <= 256; i++) {
-            cftab[i] = unzftab[i - 1];
-        }
-        for (i = 1; i <= 256; i++) {
-            cftab[i] += cftab[i - 1];
-        }
+    cftab[0] = 0;
+    for (i = 1; i <= 256; i++) {
+      cftab[i] = unzftab[i - 1];
+    }
+    for (i = 1; i <= 256; i++) {
+      cftab[i] += cftab[i - 1];
+    }
 
-        for (i = 0; i <= last; i++) {
-            ch = ll8[i];
-            tt[cftab[ch]] = i;
-            cftab[ch]++;
-        }
-        cftab = null;
+    for (i = 0; i <= last; i++) {
+      ch = ll8[i];
+      tt[cftab[ch]] = i;
+      cftab[ch]++;
+    }
+    cftab = null;
 
-        tPos = tt[origPtr];
+    tPos = tt[origPtr];
 
-        count = 0;
-        i2 = 0;
-        ch2 = 256;   /* not a char and not EOF */
+    count = 0;
+    i2 = 0;
+    ch2 = 256;   /* not a char and not EOF */
 
-        if (blockRandomised) {
-            rNToGo = 0;
-            rTPos = 0;
-            setupRandPartA();
-        } else {
-            setupNoRandPartA();
-        }
+    if (blockRandomised) {
+      rNToGo = 0;
+      rTPos = 0;
+      setupRandPartA();
+    } else {
+      setupNoRandPartA();
     }
-
-    private void setupRandPartA() throws IOException {
-        if (i2 <= last) {
-            chPrev = ch2;
-            ch2 = ll8[tPos];
-            tPos = tt[tPos];
-            if (rNToGo == 0) {
-                rNToGo = rNums[rTPos];
-                rTPos++;
-                if (rTPos == 512) {
-                    rTPos = 0;
-                }
-            }
-            rNToGo--;
-            ch2 ^= ((rNToGo == 1) ? 1 : 0);
-            i2++;
-
-            currentChar = ch2;
-            currentState = RAND_PART_B_STATE;
-            mCrc.updateCRC(ch2);
-        } else {
-            endBlock();
-            initBlock(false);
-            setupBlock();
+  }
+
+  private void setupRandPartA() throws IOException {
+    if (i2 <= last) {
+      chPrev = ch2;
+      ch2 = ll8[tPos];
+      tPos = tt[tPos];
+      if (rNToGo == 0) {
+        rNToGo = rNums[rTPos];
+        rTPos++;
+        if (rTPos == 512) {
+          rTPos = 0;
         }
+      }
+      rNToGo--;
+      ch2 ^= ((rNToGo == 1) ? 1 : 0);
+      i2++;
+
+      currentChar = ch2;
+      currentState = RAND_PART_B_STATE;
+      mCrc.updateCRC(ch2);
+    } else {
+      endBlock();
+      initBlock(false);
+      setupBlock();
     }
-
-    private void setupNoRandPartA() throws IOException {
-        if (i2 <= last) {
-            chPrev = ch2;
-            ch2 = ll8[tPos];
-            tPos = tt[tPos];
-            i2++;
-
-            currentChar = ch2;
-            currentState = NO_RAND_PART_B_STATE;
-            mCrc.updateCRC(ch2);
-        } else {
-            endBlock();
-            initBlock(false);
-            setupBlock();
-        }
+  }
+
+  private void setupNoRandPartA() throws IOException {
+    if (i2 <= last) {
+      chPrev = ch2;
+      ch2 = ll8[tPos];
+      tPos = tt[tPos];
+      i2++;
+
+      currentChar = ch2;
+      currentState = NO_RAND_PART_B_STATE;
+      mCrc.updateCRC(ch2);
+    } else {
+      endBlock();
+      initBlock(false);
+      setupBlock();
     }
-
-    private void setupRandPartB() throws IOException {
-        if (ch2 != chPrev) {
-            currentState = RAND_PART_A_STATE;
-            count = 1;
-            setupRandPartA();
-        } else {
-            count++;
-            if (count >= 4) {
-                z = ll8[tPos];
-                tPos = tt[tPos];
-                if (rNToGo == 0) {
-                    rNToGo = rNums[rTPos];
-                    rTPos++;
-                    if (rTPos == 512) {
-                        rTPos = 0;
-                    }
-                }
-                rNToGo--;
-                z ^= ((rNToGo == 1) ? 1 : 0);
-                j2 = 0;
-                currentState = RAND_PART_C_STATE;
-                setupRandPartC();
-            } else {
-                currentState = RAND_PART_A_STATE;
-                setupRandPartA();
-            }
+  }
+
+  private void setupRandPartB() throws IOException {
+    if (ch2 != chPrev) {
+      currentState = RAND_PART_A_STATE;
+      count = 1;
+      setupRandPartA();
+    } else {
+      count++;
+      if (count >= 4) {
+        z = ll8[tPos];
+        tPos = tt[tPos];
+        if (rNToGo == 0) {
+          rNToGo = rNums[rTPos];
+          rTPos++;
+          if (rTPos == 512) {
+            rTPos = 0;
+          }
         }
+        rNToGo--;
+        z ^= ((rNToGo == 1) ? 1 : 0);
+        j2 = 0;
+        currentState = RAND_PART_C_STATE;
+        setupRandPartC();
+      } else {
+        currentState = RAND_PART_A_STATE;
+        setupRandPartA();
+      }
     }
-
-    private void setupRandPartC() throws IOException {
-        if (j2 < (int) z) {
-            currentChar = ch2;
-            mCrc.updateCRC(ch2);
-            j2++;
-        } else {
-            currentState = RAND_PART_A_STATE;
-            i2++;
-            count = 0;
-            setupRandPartA();
-        }
+  }
+
+  private void setupRandPartC() throws IOException {
+    if (j2 < (int) z) {
+      currentChar = ch2;
+      mCrc.updateCRC(ch2);
+      j2++;
+    } else {
+      currentState = RAND_PART_A_STATE;
+      i2++;
+      count = 0;
+      setupRandPartA();
     }
-
-    private void setupNoRandPartB() throws IOException {
-        if (ch2 != chPrev) {
-            currentState = NO_RAND_PART_A_STATE;
-            count = 1;
-            setupNoRandPartA();
-        } else {
-            count++;
-            if (count >= 4) {
-                z = ll8[tPos];
-                tPos = tt[tPos];
-                currentState = NO_RAND_PART_C_STATE;
-                j2 = 0;
-                setupNoRandPartC();
-            } else {
-                currentState = NO_RAND_PART_A_STATE;
-                setupNoRandPartA();
-            }
-        }
+  }
+
+  private void setupNoRandPartB() throws IOException {
+    if (ch2 != chPrev) {
+      currentState = NO_RAND_PART_A_STATE;
+      count = 1;
+      setupNoRandPartA();
+    } else {
+      count++;
+      if (count >= 4) {
+        z = ll8[tPos];
+        tPos = tt[tPos];
+        currentState = NO_RAND_PART_C_STATE;
+        j2 = 0;
+        setupNoRandPartC();
+      } else {
+        currentState = NO_RAND_PART_A_STATE;
+        setupNoRandPartA();
+      }
     }
-
-    private void setupNoRandPartC() throws IOException {
-        if (j2 < (int) z) {
-            currentChar = ch2;
-            mCrc.updateCRC(ch2);
-            j2++;
-        } else {
-            currentState = NO_RAND_PART_A_STATE;
-            i2++;
-            count = 0;
-            setupNoRandPartA();
-        }
+  }
+
+  private void setupNoRandPartC() throws IOException {
+    if (j2 < (int) z) {
+      currentChar = ch2;
+      mCrc.updateCRC(ch2);
+      j2++;
+    } else {
+      currentState = NO_RAND_PART_A_STATE;
+      i2++;
+      count = 0;
+      setupNoRandPartA();
     }
+  }
 
-    private void setDecompressStructureSizes(int newSize100k) {
-        if (!(0 <= newSize100k && newSize100k <= 9 && 0 <= blockSize100k
-               && blockSize100k <= 9)) {
-            // throw new IOException("Invalid block size");
-        }
+  private void setDecompressStructureSizes(int newSize100k) {
+    if (!(0 <= newSize100k && newSize100k <= 9 && 0 <= blockSize100k
+        && blockSize100k <= 9)) {
+      // throw new IOException("Invalid block size");
+    }
 
-        blockSize100k = newSize100k;
+    blockSize100k = newSize100k;
 
-        if (newSize100k == 0) {
-            return;
-        }
-
-        int n = baseBlockSize * newSize100k;
-        ll8 = new char[n];
-        tt = new int[n];
+    if (newSize100k == 0) {
+      return;
     }
 
-    private static class CRC {
-      public static int crc32Table[] = {
-        0x00000000, 0x04c11db7, 0x09823b6e, 0x0d4326d9,
-        0x130476dc, 0x17c56b6b, 0x1a864db2, 0x1e475005,
-        0x2608edb8, 0x22c9f00f, 0x2f8ad6d6, 0x2b4bcb61,
-        0x350c9b64, 0x31cd86d3, 0x3c8ea00a, 0x384fbdbd,
-        0x4c11db70, 0x48d0c6c7, 0x4593e01e, 0x4152fda9,
-        0x5f15adac, 0x5bd4b01b, 0x569796c2, 0x52568b75,
-        0x6a1936c8, 0x6ed82b7f, 0x639b0da6, 0x675a1011,
-        0x791d4014, 0x7ddc5da3, 0x709f7b7a, 0x745e66cd,
-        0x9823b6e0, 0x9ce2ab57, 0x91a18d8e, 0x95609039,
-        0x8b27c03c, 0x8fe6dd8b, 0x82a5fb52, 0x8664e6e5,
-        0xbe2b5b58, 0xbaea46ef, 0xb7a96036, 0xb3687d81,
-        0xad2f2d84, 0xa9ee3033, 0xa4ad16ea, 0xa06c0b5d,
-        0xd4326d90, 0xd0f37027, 0xddb056fe, 0xd9714b49,
-        0xc7361b4c, 0xc3f706fb, 0xceb42022, 0xca753d95,
-        0xf23a8028, 0xf6fb9d9f, 0xfbb8bb46, 0xff79a6f1,
-        0xe13ef6f4, 0xe5ffeb43, 0xe8bccd9a, 0xec7dd02d,
-        0x34867077, 0x30476dc0, 0x3d044b19, 0x39c556ae,
-        0x278206ab, 0x23431b1c, 0x2e003dc5, 0x2ac12072,
-        0x128e9dcf, 0x164f8078, 0x1b0ca6a1, 0x1fcdbb16,
-        0x018aeb13, 0x054bf6a4, 0x0808d07d, 0x0cc9cdca,
-        0x7897ab07, 0x7c56b6b0, 0x71159069, 0x75d48dde,
-        0x6b93dddb, 0x6f52c06c, 0x6211e6b5, 0x66d0fb02,
-        0x5e9f46bf, 0x5a5e5b08, 0x571d7dd1, 0x53dc6066,
-        0x4d9b3063, 0x495a2dd4, 0x44190b0d, 0x40d816ba,
-        0xaca5c697, 0xa864db20, 0xa527fdf9, 0xa1e6e04e,
-        0xbfa1b04b, 0xbb60adfc, 0xb6238b25, 0xb2e29692,
-        0x8aad2b2f, 0x8e6c3698, 0x832f1041, 0x87ee0df6,
-        0x99a95df3, 0x9d684044, 0x902b669d, 0x94ea7b2a,
-        0xe0b41de7, 0xe4750050, 0xe9362689, 0xedf73b3e,
-        0xf3b06b3b, 0xf771768c, 0xfa325055, 0xfef34de2,
-        0xc6bcf05f, 0xc27dede8, 0xcf3ecb31, 0xcbffd686,
-        0xd5b88683, 0xd1799b34, 0xdc3abded, 0xd8fba05a,
-        0x690ce0ee, 0x6dcdfd59, 0x608edb80, 0x644fc637,
-        0x7a089632, 0x7ec98b85, 0x738aad5c, 0x774bb0eb,
-        0x4f040d56, 0x4bc510e1, 0x46863638, 0x42472b8f,
-        0x5c007b8a, 0x58c1663d, 0x558240e4, 0x51435d53,
-        0x251d3b9e, 0x21dc2629, 0x2c9f00f0, 0x285e1d47,
-        0x36194d42, 0x32d850f5, 0x3f9b762c, 0x3b5a6b9b,
-        0x0315d626, 0x07d4cb91, 0x0a97ed48, 0x0e56f0ff,
-        0x1011a0fa, 0x14d0bd4d, 0x19939b94, 0x1d528623,
-        0xf12f560e, 0xf5ee4bb9, 0xf8ad6d60, 0xfc6c70d7,
-        0xe22b20d2, 0xe6ea3d65, 0xeba91bbc, 0xef68060b,
-        0xd727bbb6, 0xd3e6a601, 0xdea580d8, 0xda649d6f,
-        0xc423cd6a, 0xc0e2d0dd, 0xcda1f604, 0xc960ebb3,
-        0xbd3e8d7e, 0xb9ff90c9, 0xb4bcb610, 0xb07daba7,
-        0xae3afba2, 0xaafbe615, 0xa7b8c0cc, 0xa379dd7b,
-        0x9b3660c6, 0x9ff77d71, 0x92b45ba8, 0x9675461f,
-        0x8832161a, 0x8cf30bad, 0x81b02d74, 0x857130c3,
-        0x5d8a9099, 0x594b8d2e, 0x5408abf7, 0x50c9b640,
-        0x4e8ee645, 0x4a4ffbf2, 0x470cdd2b, 0x43cdc09c,
-        0x7b827d21, 0x7f436096, 0x7200464f, 0x76c15bf8,
-        0x68860bfd, 0x6c47164a, 0x61043093, 0x65c52d24,
-        0x119b4be9, 0x155a565e, 0x18197087, 0x1cd86d30,
-        0x029f3d35, 0x065e2082, 0x0b1d065b, 0x0fdc1bec,
-        0x3793a651, 0x3352bbe6, 0x3e119d3f, 0x3ad08088,
-        0x2497d08d, 0x2056cd3a, 0x2d15ebe3, 0x29d4f654,
-        0xc5a92679, 0xc1683bce, 0xcc2b1d17, 0xc8ea00a0,
-        0xd6ad50a5, 0xd26c4d12, 0xdf2f6bcb, 0xdbee767c,
-        0xe3a1cbc1, 0xe760d676, 0xea23f0af, 0xeee2ed18,
-        0xf0a5bd1d, 0xf464a0aa, 0xf9278673, 0xfde69bc4,
-        0x89b8fd09, 0x8d79e0be, 0x803ac667, 0x84fbdbd0,
-        0x9abc8bd5, 0x9e7d9662, 0x933eb0bb, 0x97ffad0c,
-        0xafb010b1, 0xab710d06, 0xa6322bdf, 0xa2f33668,
-        0xbcb4666d, 0xb8757bda, 0xb5365d03, 0xb1f740b4
+    int n = baseBlockSize * newSize100k;
+    ll8 = new char[n];
+    tt = new int[n];
+  }
+
+  private static class CRC {
+    public static int crc32Table[] = {
+      0x00000000, 0x04c11db7, 0x09823b6e, 0x0d4326d9,
+      0x130476dc, 0x17c56b6b, 0x1a864db2, 0x1e475005,
+      0x2608edb8, 0x22c9f00f, 0x2f8ad6d6, 0x2b4bcb61,
+      0x350c9b64, 0x31cd86d3, 0x3c8ea00a, 0x384fbdbd,
+      0x4c11db70, 0x48d0c6c7, 0x4593e01e, 0x4152fda9,
+      0x5f15adac, 0x5bd4b01b, 0x569796c2, 0x52568b75,
+      0x6a1936c8, 0x6ed82b7f, 0x639b0da6, 0x675a1011,
+      0x791d4014, 0x7ddc5da3, 0x709f7b7a, 0x745e66cd,
+      0x9823b6e0, 0x9ce2ab57, 0x91a18d8e, 0x95609039,
+      0x8b27c03c, 0x8fe6dd8b, 0x82a5fb52, 0x8664e6e5,
+      0xbe2b5b58, 0xbaea46ef, 0xb7a96036, 0xb3687d81,
+      0xad2f2d84, 0xa9ee3033, 0xa4ad16ea, 0xa06c0b5d,
+      0xd4326d90, 0xd0f37027, 0xddb056fe, 0xd9714b49,
+      0xc7361b4c, 0xc3f706fb, 0xceb42022, 0xca753d95,
+      0xf23a8028, 0xf6fb9d9f, 0xfbb8bb46, 0xff79a6f1,
+      0xe13ef6f4, 0xe5ffeb43, 0xe8bccd9a, 0xec7dd02d,
+      0x34867077, 0x30476dc0, 0x3d044b19, 0x39c556ae,
+      0x278206ab, 0x23431b1c, 0x2e003dc5, 0x2ac12072,
+      0x128e9dcf, 0x164f8078, 0x1b0ca6a1, 0x1fcdbb16,
+      0x018aeb13, 0x054bf6a4, 0x0808d07d, 0x0cc9cdca,
+      0x7897ab07, 0x7c56b6b0, 0x71159069, 0x75d48dde,
+      0x6b93dddb, 0x6f52c06c, 0x6211e6b5, 0x66d0fb02,
+      0x5e9f46bf, 0x5a5e5b08, 0x571d7dd1, 0x53dc6066,
+      0x4d9b3063, 0x495a2dd4, 0x44190b0d, 0x40d816ba,
+      0xaca5c697, 0xa864db20, 0xa527fdf9, 0xa1e6e04e,
+      0xbfa1b04b, 0xbb60adfc, 0xb6238b25, 0xb2e29692,
+      0x8aad2b2f, 0x8e6c3698, 0x832f1041, 0x87ee0df6,
+      0x99a95df3, 0x9d684044, 0x902b669d, 0x94ea7b2a,
+      0xe0b41de7, 0xe4750050, 0xe9362689, 0xedf73b3e,
+      0xf3b06b3b, 0xf771768c, 0xfa325055, 0xfef34de2,
+      0xc6bcf05f, 0xc27dede8, 0xcf3ecb31, 0xcbffd686,
+      0xd5b88683, 0xd1799b34, 0xdc3abded, 0xd8fba05a,
+      0x690ce0ee, 0x6dcdfd59, 0x608edb80, 0x644fc637,
+      0x7a089632, 0x7ec98b85, 0x738aad5c, 0x774bb0eb,
+      0x4f040d56, 0x4bc510e1, 0x46863638, 0x42472b8f,
+      0x5c007b8a, 0x58c1663d, 0x558240e4, 0x51435d53,
+      0x251d3b9e, 0x21dc2629, 0x2c9f00f0, 0x285e1d47,
+      0x36194d42, 0x32d850f5, 0x3f9b762c, 0x3b5a6b9b,
+      0x0315d626, 0x07d4cb91, 0x0a97ed48, 0x0e56f0ff,
+      0x1011a0fa, 0x14d0bd4d, 0x19939b94, 0x1d528623,
+      0xf12f560e, 0xf5ee4bb9, 0xf8ad6d60, 0xfc6c70d7,
+      0xe22b20d2, 0xe6ea3d65, 0xeba91bbc, 0xef68060b,
+      0xd727bbb6, 0xd3e6a601, 0xdea580d8, 0xda649d6f,
+      0xc423cd6a, 0xc0e2d0dd, 0xcda1f604, 0xc960ebb3,
+      0xbd3e8d7e, 0xb9ff90c9, 0xb4bcb610, 0xb07daba7,
+      0xae3afba2, 0xaafbe615, 0xa7b8c0cc, 0xa379dd7b,
+      0x9b3660c6, 0x9ff77d71, 0x92b45ba8, 0x9675461f,
+      0x8832161a, 0x8cf30bad, 0x81b02d74, 0x857130c3,
+      0x5d8a9099, 0x594b8d2e, 0x5408abf7, 0x50c9b640,
+      0x4e8ee645, 0x4a4ffbf2, 0x470cdd2b, 0x43cdc09c,
+      0x7b827d21, 0x7f436096, 0x7200464f, 0x76c15bf8,
+      0x68860bfd, 0x6c47164a, 0x61043093, 0x65c52d24,
+      0x119b4be9, 0x155a565e, 0x18197087, 0x1cd86d30,
+      0x029f3d35, 0x065e2082, 0x0b1d065b, 0x0fdc1bec,
+      0x3793a651, 0x3352bbe6, 0x3e119d3f, 0x3ad08088,
+      0x2497d08d, 0x2056cd3a, 0x2d15ebe3, 0x29d4f654,
+      0xc5a92679, 0xc1683bce, 0xcc2b1d17, 0xc8ea00a0,
+      0xd6ad50a5, 0xd26c4d12, 0xdf2f6bcb, 0xdbee767c,
+      0xe3a1cbc1, 0xe760d676, 0xea23f0af, 0xeee2ed18,
+      0xf0a5bd1d, 0xf464a0aa, 0xf9278673, 0xfde69bc4,
+      0x89b8fd09, 0x8d79e0be, 0x803ac667, 0x84fbdbd0,
+      0x9abc8bd5, 0x9e7d9662, 0x933eb0bb, 0x97ffad0c,
+      0xafb010b1, 0xab710d06, 0xa6322bdf, 0xa2f33668,
+      0xbcb4666d, 0xb8757bda, 0xb5365d03, 0xb1f740b4
     };
 
     public CRC() {
-        initialiseCRC();
+      initialiseCRC();
     }
 
     void initialiseCRC() {
-        globalCrc = 0xffffffff;
+      globalCrc = 0xffffffff;
     }
 
     int getFinalCRC() {
-        return ~globalCrc;
-    }
-
-    int getGlobalCRC() {
-        return globalCrc;
-    }
-
-    void setGlobalCRC(int newCrc) {
-        globalCrc = newCrc;
+      return ~globalCrc;
     }
 
     void updateCRC(int inCh) {
-        int temp = (globalCrc >> 24) ^ inCh;
-        if (temp < 0) {
-            temp = 256 + temp;
-        }
-        globalCrc = (globalCrc << 8) ^ CRC.crc32Table[temp];
+      int temp = (globalCrc >> 24) ^ inCh;
+      if (temp < 0) {
+        temp = 256 + temp;
+      }
+      globalCrc = (globalCrc << 8) ^ CRC.crc32Table[temp];
     }
 
     int globalCrc;
-    }
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/2d25d5df/src/main/java/com/cloudera/crunch/io/text/TextFileSource.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/cloudera/crunch/io/text/TextFileSource.java b/src/main/java/com/cloudera/crunch/io/text/TextFileSource.java
index 35beee3..e11283f 100644
--- a/src/main/java/com/cloudera/crunch/io/text/TextFileSource.java
+++ b/src/main/java/com/cloudera/crunch/io/text/TextFileSource.java
@@ -37,7 +37,7 @@ public class TextFileSource<T> extends FileSourceImpl<T> implements
 	return strPath.endsWith(".bz") || strPath.endsWith(".bz2");
   }
   
-  private static Class<? extends FileInputFormat> getInputFormat(Path path, PType ptype) {
+  private static <S> Class<? extends FileInputFormat> getInputFormat(Path path, PType<S> ptype) {
 	if (ptype.getFamily().equals(AvroTypeFamily.getInstance())) {
 	  return AvroUtf8InputFormat.class;
 	} else if (isBZip2(path)){

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/2d25d5df/src/main/java/com/cloudera/crunch/lib/Aggregate.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/cloudera/crunch/lib/Aggregate.java b/src/main/java/com/cloudera/crunch/lib/Aggregate.java
index dddbdbd..411083b 100644
--- a/src/main/java/com/cloudera/crunch/lib/Aggregate.java
+++ b/src/main/java/com/cloudera/crunch/lib/Aggregate.java
@@ -47,6 +47,8 @@ public class Aggregate {
   public static <S> PTable<S, Long> count(PCollection<S> collect) {
     PTypeFamily tf = collect.getTypeFamily();
     return collect.parallelDo("Aggregate.count", new MapFn<S, Pair<S, Long>>() {
+      private static final long serialVersionUID = 1L;
+      
       @Override
       public Pair<S, Long> map(S input) {
         return Pair.of(input, 1L);
@@ -65,12 +67,14 @@ public class Aggregate {
     
     @Override
     public int compare(Pair<K, V> left, Pair<K, V> right) {
-      int cmp = ((Comparable<V>)left.second()).compareTo(right.second());
+      int cmp = ((Comparable<V>) left.second()).compareTo(right.second());
       return ascending ? cmp : -cmp;
     }
   }
   
   public static class TopKFn<K, V> extends DoFn<Pair<K, V>, Pair<Integer, Pair<K, V>>> {
+    private static final long serialVersionUID = 1L;
+    
     private final int limit;
     private final boolean maximize;
     private transient PriorityQueue<Pair<K, V>> values;
@@ -102,7 +106,8 @@ public class Aggregate {
   }
   
   public static class TopKCombineFn<K, V> extends CombineFn<Integer, Pair<K, V>> {
-
+    private static final long serialVersionUID = 1L;
+    
     private final int limit;
     private final boolean maximize;
     
@@ -140,6 +145,7 @@ public class Aggregate {
         .groupByKey(1)
         .combineValues(new TopKCombineFn<K, V>(limit, maximize))
         .parallelDo("top" + limit + "reduce", new DoFn<Pair<Integer, Pair<K, V>>, Pair<K, V>>() {
+          private static final long serialVersionUID = 1L;
           @Override
           public void process(Pair<Integer, Pair<K, V>> input,
               Emitter<Pair<K, V>> emitter) {
@@ -160,6 +166,8 @@ public class Aggregate {
     PTypeFamily tf = collect.getTypeFamily();
     return PTables.values(
         collect.parallelDo("max", new DoFn<S, Pair<Boolean, S>>() {
+          private static final long serialVersionUID = 1L;
+          
           private transient S max = null;
           
           @Override
@@ -177,6 +185,8 @@ public class Aggregate {
           }
         }, tf.tableOf(tf.booleans(), collect.getPType()))
         .groupByKey(1).combineValues(new CombineFn<Boolean, S>() {
+          private static final long serialVersionUID = 1L;
+          
           @Override
           public void process(Pair<Boolean, Iterable<S>> input,
               Emitter<Pair<Boolean, S>> emitter) {
@@ -202,6 +212,7 @@ public class Aggregate {
     PTypeFamily tf = collect.getTypeFamily();
     return PTables.values(
         collect.parallelDo("min", new DoFn<S, Pair<Boolean, S>>() {
+          private static final long serialVersionUID = 1L;
           private transient S min = null;
           
           @Override
@@ -219,6 +230,7 @@ public class Aggregate {
           }
         }, tf.tableOf(tf.booleans(), collect.getPType()))
         .groupByKey().combineValues(new CombineFn<Boolean, S>() {
+          private static final long serialVersionUID = 1L;
           @Override
           public void process(Pair<Boolean, Iterable<S>> input,
               Emitter<Pair<Boolean, S>> emitter) {
@@ -235,6 +247,7 @@ public class Aggregate {
   public static <K, V> PTable<K, Collection<V>> collectValues(PTable<K, V> collect) {
     PTypeFamily tf = collect.getTypeFamily();
     return collect.groupByKey().parallelDo("collect", new MapValuesFn<K, Iterable<V>, Collection<V>>() {
+      private static final long serialVersionUID = 1L;
       @Override
       public Collection<V> map(Iterable<V> v) {
         return Lists.newArrayList(v);

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/2d25d5df/src/main/java/com/cloudera/crunch/lib/Cogroup.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/cloudera/crunch/lib/Cogroup.java b/src/main/java/com/cloudera/crunch/lib/Cogroup.java
index b3cc81b..f8b02ed 100644
--- a/src/main/java/com/cloudera/crunch/lib/Cogroup.java
+++ b/src/main/java/com/cloudera/crunch/lib/Cogroup.java
@@ -54,6 +54,8 @@ public class Cogroup {
   }
 
   private static class CogroupFn1<K, V, U> extends MapValuesFn<K, V, Pair<V, U>> {
+    private static final long serialVersionUID = 1L;
+
     @Override
     public Pair<V, U> map(V v) {
       return Pair.of(v, null);
@@ -61,6 +63,8 @@ public class Cogroup {
   }
 
   private static class CogroupFn2<K, V, U> extends MapValuesFn<K, U, Pair<V, U>> {
+    private static final long serialVersionUID = 1L;
+    
     @Override
     public Pair<V, U> map(U u) {
       return Pair.of(null, u);
@@ -69,6 +73,8 @@ public class Cogroup {
 
   private static class PostGroupFn<K, V, U> extends
       DoFn<Pair<K, Iterable<Pair<V, U>>>, Pair<K, Pair<Collection<V>, Collection<U>>>> {
+    private static final long serialVersionUID = 1L;
+    
     @Override
     public void process(Pair<K, Iterable<Pair<V, U>>> input,
         Emitter<Pair<K, Pair<Collection<V>, Collection<U>>>> emitter) {

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/2d25d5df/src/main/java/com/cloudera/crunch/lib/Join.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/cloudera/crunch/lib/Join.java b/src/main/java/com/cloudera/crunch/lib/Join.java
index 8fe8b5b..c47ac8d 100644
--- a/src/main/java/com/cloudera/crunch/lib/Join.java
+++ b/src/main/java/com/cloudera/crunch/lib/Join.java
@@ -127,18 +127,20 @@ public class Join {
 
     PTable<Pair<K, Integer>, Pair<U, V>> tag1 = left.parallelDo("joinTagLeft",
         new MapFn<Pair<K, U>, Pair<Pair<K, Integer>, Pair<U, V>>>() {
-          @Override
-          public Pair<Pair<K, Integer>, Pair<U, V>> map(Pair<K, U> input) {
-            return Pair.of(Pair.of(input.first(), 0), Pair.of(input.second(), (V) null));
-          }
-        }, ptt);
+      private static final long serialVersionUID = 1L;
+      @Override
+      public Pair<Pair<K, Integer>, Pair<U, V>> map(Pair<K, U> input) {
+        return Pair.of(Pair.of(input.first(), 0), Pair.of(input.second(), (V) null));
+      }
+    }, ptt);
     PTable<Pair<K, Integer>, Pair<U, V>> tag2 = right.parallelDo("joinTagRight",
         new MapFn<Pair<K, V>, Pair<Pair<K, Integer>, Pair<U, V>>>() {
-          @Override
-          public Pair<Pair<K, Integer>, Pair<U, V>> map(Pair<K, V> input) {
-            return Pair.of(Pair.of(input.first(), 1), Pair.of((U) null, input.second()));
-          }
-        }, ptt);
+      private static final long serialVersionUID = 1L;
+      @Override
+      public Pair<Pair<K, Integer>, Pair<U, V>> map(Pair<K, V> input) {
+        return Pair.of(Pair.of(input.first(), 1), Pair.of((U) null, input.second()));
+      }
+    }, ptt);
     
     GroupingOptions.Builder optionsBuilder = GroupingOptions.builder();
     optionsBuilder.partitionerClass(JoinUtils.getPartitionerClass(ptf));

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/2d25d5df/src/main/java/com/cloudera/crunch/lib/PTables.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/cloudera/crunch/lib/PTables.java b/src/main/java/com/cloudera/crunch/lib/PTables.java
index 0c0f246..0e86a0b 100644
--- a/src/main/java/com/cloudera/crunch/lib/PTables.java
+++ b/src/main/java/com/cloudera/crunch/lib/PTables.java
@@ -28,6 +28,7 @@ public class PTables {
 
   public static <K, V> PCollection<K> keys(PTable<K, V> ptable) {
     return ptable.parallelDo("PTables.keys", new DoFn<Pair<K, V>, K>() {
+      private static final long serialVersionUID = 1L;
       @Override
       public void process(Pair<K, V> input, Emitter<K> emitter) {
         emitter.emit(input.first());
@@ -37,6 +38,7 @@ public class PTables {
   
   public static <K, V> PCollection<V> values(PTable<K, V> ptable) {
     return ptable.parallelDo("PTables.values", new DoFn<Pair<K, V>, V>() {
+      private static final long serialVersionUID = 1L;
       @Override
       public void process(Pair<K, V> input, Emitter<V> emitter) {
         emitter.emit(input.second());

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/2d25d5df/src/main/java/com/cloudera/crunch/lib/Sample.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/cloudera/crunch/lib/Sample.java b/src/main/java/com/cloudera/crunch/lib/Sample.java
index 8672866..faa958f 100644
--- a/src/main/java/com/cloudera/crunch/lib/Sample.java
+++ b/src/main/java/com/cloudera/crunch/lib/Sample.java
@@ -24,7 +24,8 @@ import com.google.common.base.Preconditions;
 public class Sample {
 
   public static class SamplerFn<S> extends DoFn<S, S> {
-	
+    private static final long serialVersionUID = 1L;
+    
 	private final long seed;
 	private final double acceptanceProbability;
 	private transient Random r;

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/2d25d5df/src/main/java/com/cloudera/crunch/lib/Set.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/cloudera/crunch/lib/Set.java b/src/main/java/com/cloudera/crunch/lib/Set.java
index 0896e1b..f152d86 100644
--- a/src/main/java/com/cloudera/crunch/lib/Set.java
+++ b/src/main/java/com/cloudera/crunch/lib/Set.java
@@ -40,16 +40,17 @@ public class Set {
   public static <T> PCollection<T> difference(PCollection<T> coll1,
       PCollection<T> coll2) {
     return Cogroup.cogroup(toTable(coll1), toTable(coll2))
-     .parallelDo(new DoFn<Pair<T, Pair<Collection<Boolean>, Collection<Boolean>>>, T>() {
-      @Override
-      public void process(Pair<T, Pair<Collection<Boolean>, Collection<Boolean>>> input,
-          Emitter<T> emitter) {
-        Pair<Collection<Boolean>, Collection<Boolean>> groups = input.second();
-        if (!groups.first().isEmpty() && groups.second().isEmpty()) {
-          emitter.emit(input.first());
-        }
-      }
-    }, coll1.getPType());
+        .parallelDo(new DoFn<Pair<T, Pair<Collection<Boolean>, Collection<Boolean>>>, T>() {
+          private static final long serialVersionUID = 1L;
+          @Override
+          public void process(Pair<T, Pair<Collection<Boolean>, Collection<Boolean>>> input,
+              Emitter<T> emitter) {
+            Pair<Collection<Boolean>, Collection<Boolean>> groups = input.second();
+            if (!groups.first().isEmpty() && groups.second().isEmpty()) {
+              emitter.emit(input.first());
+            }
+          }
+        }, coll1.getPType());
   }
   
   /**
@@ -61,16 +62,17 @@ public class Set {
   public static <T> PCollection<T> intersection(PCollection<T> coll1,
       PCollection<T> coll2) {
     return Cogroup.cogroup(toTable(coll1), toTable(coll2))
-     .parallelDo(new DoFn<Pair<T, Pair<Collection<Boolean>, Collection<Boolean>>>, T>() {
-      @Override
-      public void process(Pair<T, Pair<Collection<Boolean>, Collection<Boolean>>> input,
-          Emitter<T> emitter) {
-        Pair<Collection<Boolean>, Collection<Boolean>> groups = input.second();
-        if (!groups.first().isEmpty() && !groups.second().isEmpty()) {
-          emitter.emit(input.first());
-        }
-      }
-    }, coll1.getPType());
+        .parallelDo(new DoFn<Pair<T, Pair<Collection<Boolean>, Collection<Boolean>>>, T>() {
+          private static final long serialVersionUID = 1L;
+          @Override
+          public void process(Pair<T, Pair<Collection<Boolean>, Collection<Boolean>>> input,
+              Emitter<T> emitter) {
+            Pair<Collection<Boolean>, Collection<Boolean>> groups = input.second();
+            if (!groups.first().isEmpty() && !groups.second().isEmpty()) {
+              emitter.emit(input.first());
+            }
+          }
+        }, coll1.getPType());
   }
   
   /**
@@ -92,26 +94,28 @@ public class Set {
     PTypeFamily typeFamily = coll1.getTypeFamily();
     PType<T> type = coll1.getPType();
     return Cogroup.cogroup(toTable(coll1), toTable(coll2))
-     .parallelDo(new DoFn<Pair<T, Pair<Collection<Boolean>, Collection<Boolean>>>,
-         Tuple3<T, T, T>>() {
-      @Override
-      public void process(Pair<T, Pair<Collection<Boolean>, Collection<Boolean>>> input,
-          Emitter<Tuple3<T, T, T>> emitter) {
-        Pair<Collection<Boolean>, Collection<Boolean>> groups = input.second();
-        boolean inFirst = !groups.first().isEmpty();
-        boolean inSecond = !groups.second().isEmpty();
-        T t = input.first();
-        emitter.emit(Tuple3.of(
-            inFirst && !inSecond ? t : null,
-            !inFirst && inSecond ? t : null,
-            inFirst && inSecond ? t : null));
-      }
-    }, typeFamily.triples(type, type, type));
+        .parallelDo(new DoFn<Pair<T, Pair<Collection<Boolean>, Collection<Boolean>>>,
+            Tuple3<T, T, T>>() {
+          private static final long serialVersionUID = 1L;
+          @Override
+          public void process(Pair<T, Pair<Collection<Boolean>, Collection<Boolean>>> input,
+              Emitter<Tuple3<T, T, T>> emitter) {
+            Pair<Collection<Boolean>, Collection<Boolean>> groups = input.second();
+            boolean inFirst = !groups.first().isEmpty();
+            boolean inSecond = !groups.second().isEmpty();
+            T t = input.first();
+            emitter.emit(Tuple3.of(
+                inFirst && !inSecond ? t : null,
+                    !inFirst && inSecond ? t : null,
+                        inFirst && inSecond ? t : null));
+          }
+        }, typeFamily.triples(type, type, type));
   }
   
   private static <T> PTable<T, Boolean> toTable(PCollection<T> coll) {
     PTypeFamily typeFamily = coll.getTypeFamily();
     return coll.parallelDo(new DoFn<T, Pair<T, Boolean>>() {
+      private static final long serialVersionUID = 1L;
       @Override
       public void process(T input, Emitter<Pair<T, Boolean>> emitter) {
         emitter.emit(Pair.of(input, Boolean.TRUE));

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/2d25d5df/src/main/java/com/cloudera/crunch/lib/Sort.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/cloudera/crunch/lib/Sort.java b/src/main/java/com/cloudera/crunch/lib/Sort.java
index c54c343..760d37e 100644
--- a/src/main/java/com/cloudera/crunch/lib/Sort.java
+++ b/src/main/java/com/cloudera/crunch/lib/Sort.java
@@ -111,6 +111,7 @@ public class Sort {
         collection.getPType(), order);
     PTable<T, Void> pt =
       collection.parallelDo("sort-pre", new DoFn<T, Pair<T, Void>>() {
+        private static final long serialVersionUID = 1L;
         @Override
         public void process(T input,
             Emitter<Pair<T, Void>> emitter) {
@@ -119,6 +120,7 @@ public class Sort {
       }, type);
     PTable<T, Void> sortedPt = pt.groupByKey(options).ungroup();
     return sortedPt.parallelDo("sort-post", new DoFn<Pair<T, Void>, T>() {
+      private static final long serialVersionUID = 1L;
       @Override
       public void process(Pair<T, Void> input, Emitter<T> emitter) {
         emitter.emit(input.first());
@@ -166,6 +168,7 @@ public class Sort {
         tf.nulls());
     PTable<Pair<U, V>, Void> pt =
       collection.parallelDo(new DoFn<Pair<U, V>, Pair<Pair<U, V>, Void>>() {
+        private static final long serialVersionUID = 1L;
         @Override
         public void process(Pair<U, V> input,
             Emitter<Pair<Pair<U, V>, Void>> emitter) {
@@ -176,6 +179,7 @@ public class Sort {
     GroupingOptions options = buildGroupingOptions(conf, tf, pType, columnOrders);
     PTable<Pair<U, V>, Void> sortedPt = pt.groupByKey(options).ungroup();
     return sortedPt.parallelDo(new DoFn<Pair<Pair<U, V>,Void>, Pair<U, V>>() {
+      private static final long serialVersionUID = 1L;
       @Override
       public void process(Pair<Pair<U, V>, Void> input,
           Emitter<Pair<U, V>> emitter) {
@@ -200,6 +204,7 @@ public class Sort {
         tf.nulls());
     PTable<Tuple3<V1, V2, V3>, Void> pt =
       collection.parallelDo(new DoFn<Tuple3<V1, V2, V3>, Pair<Tuple3<V1, V2, V3>, Void>>() {
+        private static final long serialVersionUID = 1L;
         @Override
         public void process(Tuple3<V1, V2, V3> input,
             Emitter<Pair<Tuple3<V1, V2, V3>, Void>> emitter) {
@@ -210,6 +215,7 @@ public class Sort {
     GroupingOptions options = buildGroupingOptions(conf, tf, pType, columnOrders);
     PTable<Tuple3<V1, V2, V3>, Void> sortedPt = pt.groupByKey(options).ungroup();
     return sortedPt.parallelDo(new DoFn<Pair<Tuple3<V1, V2, V3>,Void>, Tuple3<V1, V2, V3>>() {
+      private static final long serialVersionUID = 1L;
       @Override
       public void process(Pair<Tuple3<V1, V2, V3>, Void> input,
           Emitter<Tuple3<V1, V2, V3>> emitter) {
@@ -234,6 +240,7 @@ public class Sort {
         tf.nulls());
     PTable<Tuple4<V1, V2, V3, V4>, Void> pt =
       collection.parallelDo(new DoFn<Tuple4<V1, V2, V3, V4>, Pair<Tuple4<V1, V2, V3, V4>, Void>>() {
+        private static final long serialVersionUID = 1L;
         @Override
         public void process(Tuple4<V1, V2, V3, V4> input,
             Emitter<Pair<Tuple4<V1, V2, V3, V4>, Void>> emitter) {
@@ -244,6 +251,7 @@ public class Sort {
     GroupingOptions options = buildGroupingOptions(conf, tf, pType, columnOrders);
     PTable<Tuple4<V1, V2, V3, V4>, Void> sortedPt = pt.groupByKey(options).ungroup();
     return sortedPt.parallelDo(new DoFn<Pair<Tuple4<V1, V2, V3, V4>,Void>, Tuple4<V1, V2, V3, V4>>() {
+      private static final long serialVersionUID = 1L;
       @Override
       public void process(Pair<Tuple4<V1, V2, V3, V4>, Void> input,
           Emitter<Tuple4<V1, V2, V3, V4>> emitter) {
@@ -267,6 +275,7 @@ public class Sort {
         tf.nulls());
     PTable<TupleN, Void> pt =
       collection.parallelDo(new DoFn<TupleN, Pair<TupleN, Void>>() {
+        private static final long serialVersionUID = 1L;
         @Override
         public void process(TupleN input,
             Emitter<Pair<TupleN, Void>> emitter) {
@@ -277,6 +286,7 @@ public class Sort {
     GroupingOptions options = buildGroupingOptions(conf, tf, pType, columnOrders);
     PTable<TupleN, Void> sortedPt = pt.groupByKey(options).ungroup();
     return sortedPt.parallelDo(new DoFn<Pair<TupleN,Void>, TupleN>() {
+      private static final long serialVersionUID = 1L;
       @Override
       public void process(Pair<TupleN, Void> input,
           Emitter<TupleN> emitter) {
@@ -286,14 +296,14 @@ public class Sort {
   }
   
   // TODO: move to type family?
-  private static GroupingOptions buildGroupingOptions(Configuration conf,
-      PTypeFamily tf, PType ptype, Order order) {
+  private static <T> GroupingOptions buildGroupingOptions(Configuration conf,
+      PTypeFamily tf, PType<T> ptype, Order order) {
     Builder builder = GroupingOptions.builder();
     if (order == Order.DESCENDING) {
       if (tf == WritableTypeFamily.getInstance()) {
         builder.sortComparatorClass(ReverseWritableComparator.class);
       } else if (tf == AvroTypeFamily.getInstance()) {
-        AvroType avroType = (AvroType) ptype;
+        AvroType<T> avroType = (AvroType<T>) ptype;
         Schema schema = avroType.getSchema();
         conf.set("crunch.schema", schema.toString());
         builder.sortComparatorClass(ReverseAvroComparator.class);
@@ -304,8 +314,8 @@ public class Sort {
     return builder.build();
   }
   
-  private static GroupingOptions buildGroupingOptions(Configuration conf,
-      PTypeFamily tf, PType ptype, ColumnOrder[] columnOrders) {
+  private static <T> GroupingOptions buildGroupingOptions(Configuration conf,
+      PTypeFamily tf, PType<T> ptype, ColumnOrder[] columnOrders) {
     Builder builder = GroupingOptions.builder();
     if (tf == WritableTypeFamily.getInstance()) {
       TupleWritableComparator.configureOrdering(conf, columnOrders);
@@ -355,7 +365,7 @@ public class Sort {
     public void setConf(Configuration conf) {
       super.setConf(conf);
       if (conf != null) {
-        schema = Schema.parse(conf.get("crunch.schema"));
+        schema = (new Schema.Parser()).parse(conf.get("crunch.schema"));
       }
     }
 
@@ -480,12 +490,12 @@ public class Sort {
     public void setConf(Configuration conf) {
       super.setConf(conf);
       if (conf != null) {
-        schema = Schema.parse(conf.get("crunch.schema"));
+        schema = (new Schema.Parser()).parse(conf.get("crunch.schema"));
       }
     }
 
-    public static void configureOrdering(Configuration conf, ColumnOrder[] columnOrders,
-        PType ptype) {
+    public static <S> void configureOrdering(Configuration conf, ColumnOrder[] columnOrders,
+        PType<S> ptype) {
       Schema orderedSchema = createOrderedTupleSchema(ptype, columnOrders);
       conf.set("crunch.schema", orderedSchema.toString());
     }
@@ -493,19 +503,19 @@ public class Sort {
     // TODO: move to Avros
     // TODO: need to re-order columns in map output then switch back in the reduce
     //       this will require more extensive changes in Crunch
-    private static Schema createOrderedTupleSchema(PType ptype, ColumnOrder[] orders) {
+    private static <S> Schema createOrderedTupleSchema(PType<S> ptype, ColumnOrder[] orders) {
       // Guarantee each tuple schema has a globally unique name
       String tupleName = "tuple" + UUID.randomUUID().toString().replace('-', 'x');
       Schema schema = Schema.createRecord(tupleName, "", "crunch", false);
       List<Schema.Field> fields = Lists.newArrayList();
-      AvroType parentAvroType = (AvroType) ptype;
+      AvroType<S> parentAvroType = (AvroType<S>) ptype;
       Schema parentAvroSchema = parentAvroType.getSchema();
       
       BitSet orderedColumns = new BitSet();
       // First add any fields specified by ColumnOrder
       for (ColumnOrder columnOrder : orders) {
         int index = columnOrder.column - 1;
-        AvroType atype = (AvroType) ptype.getSubTypes().get(index);
+        AvroType<?> atype = (AvroType<?>) ptype.getSubTypes().get(index);
         Schema fieldSchema = Schema.createUnion(
             ImmutableList.of(atype.getSchema(), Schema.create(Type.NULL)));
         String fieldName = parentAvroSchema.getFields().get(index).name();
@@ -518,7 +528,7 @@ public class Sort {
         if (orderedColumns.get(i)) {
           continue;
         }
-        AvroType atype = (AvroType) ptype.getSubTypes().get(i);
+        AvroType<?> atype = (AvroType<?>) ptype.getSubTypes().get(i);
         Schema fieldSchema = Schema.createUnion(
             ImmutableList.of(atype.getSchema(), Schema.create(Type.NULL)));
         String fieldName = parentAvroSchema.getFields().get(i).name();

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/2d25d5df/src/main/java/com/cloudera/crunch/lib/join/FullOuterJoinFn.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/cloudera/crunch/lib/join/FullOuterJoinFn.java b/src/main/java/com/cloudera/crunch/lib/join/FullOuterJoinFn.java
index 6f5c547..a6874a0 100644
--- a/src/main/java/com/cloudera/crunch/lib/join/FullOuterJoinFn.java
+++ b/src/main/java/com/cloudera/crunch/lib/join/FullOuterJoinFn.java
@@ -28,6 +28,8 @@ import com.google.common.collect.Lists;
  * @param <V> Type of the second {@link com.cloudera.crunch.PTable}'s values
  */
 public class FullOuterJoinFn<K, U, V> extends JoinFn<K, U, V> {
+  private static final long serialVersionUID = 1L;
+  
   private transient int lastId;
   private transient K lastKey;
   private transient List<U> leftValues;

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/2d25d5df/src/main/java/com/cloudera/crunch/lib/join/InnerJoinFn.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/cloudera/crunch/lib/join/InnerJoinFn.java b/src/main/java/com/cloudera/crunch/lib/join/InnerJoinFn.java
index a6c5ecb..91ab455 100644
--- a/src/main/java/com/cloudera/crunch/lib/join/InnerJoinFn.java
+++ b/src/main/java/com/cloudera/crunch/lib/join/InnerJoinFn.java
@@ -28,6 +28,8 @@ import com.google.common.collect.Lists;
  * @param <V> Type of the second {@link com.cloudera.crunch.PTable}'s values
  */
 public class InnerJoinFn<K, U, V> extends JoinFn<K, U, V> {
+  private static final long serialVersionUID = 1L;
+  
   private transient K lastKey;
   private transient List<U> LeftValues;
 

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/2d25d5df/src/main/java/com/cloudera/crunch/lib/join/JoinFn.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/cloudera/crunch/lib/join/JoinFn.java b/src/main/java/com/cloudera/crunch/lib/join/JoinFn.java
index f9093a2..b60a12c 100644
--- a/src/main/java/com/cloudera/crunch/lib/join/JoinFn.java
+++ b/src/main/java/com/cloudera/crunch/lib/join/JoinFn.java
@@ -27,6 +27,8 @@ import com.cloudera.crunch.Pair;
  */
 public abstract class JoinFn<K, U, V>
     extends DoFn<Pair<Pair<K, Integer>, Iterable<Pair<U, V>>>, Pair<K, Pair<U, V>>> {
+  private static final long serialVersionUID = 1L;
+  
   /** @return The name of this join type (e.g. innerJoin, leftOuterJoin). */
   public abstract String getJoinType();
 

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/2d25d5df/src/main/java/com/cloudera/crunch/lib/join/JoinUtils.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/cloudera/crunch/lib/join/JoinUtils.java b/src/main/java/com/cloudera/crunch/lib/join/JoinUtils.java
index ed2493b..4097ac2 100644
--- a/src/main/java/com/cloudera/crunch/lib/join/JoinUtils.java
+++ b/src/main/java/com/cloudera/crunch/lib/join/JoinUtils.java
@@ -90,9 +90,9 @@ public class JoinUtils {
     }
   }
   
-  public static class AvroIndexedRecordPartitioner extends Partitioner<AvroKey, AvroValue> {
+  public static class AvroIndexedRecordPartitioner<K, V> extends Partitioner<AvroKey<K>, AvroValue<V>> {
     @Override
-    public int getPartition(AvroKey key, AvroValue value, int numPartitions) {
+    public int getPartition(AvroKey<K> key, AvroValue<V> value, int numPartitions) {
       IndexedRecord record = (IndexedRecord) key.datum();
       return (Math.abs(record.get(0).hashCode()) & Integer.MAX_VALUE) % numPartitions;
     }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/2d25d5df/src/main/java/com/cloudera/crunch/lib/join/LeftOuterJoinFn.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/cloudera/crunch/lib/join/LeftOuterJoinFn.java b/src/main/java/com/cloudera/crunch/lib/join/LeftOuterJoinFn.java
index 5730362..a2c576b 100644
--- a/src/main/java/com/cloudera/crunch/lib/join/LeftOuterJoinFn.java
+++ b/src/main/java/com/cloudera/crunch/lib/join/LeftOuterJoinFn.java
@@ -28,6 +28,8 @@ import com.google.common.collect.Lists;
  * @param <V> Type of the second {@link com.cloudera.crunch.PTable}'s values
  */
 public class LeftOuterJoinFn<K, U, V> extends JoinFn<K, U, V> {
+  private static final long serialVersionUID = 1L;
+  
   private transient int lastId;
   private transient K lastKey;
   private transient List<U> leftValues;

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/2d25d5df/src/main/java/com/cloudera/crunch/lib/join/RightOuterJoinFn.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/cloudera/crunch/lib/join/RightOuterJoinFn.java b/src/main/java/com/cloudera/crunch/lib/join/RightOuterJoinFn.java
index 613714d..55f1a3c 100644
--- a/src/main/java/com/cloudera/crunch/lib/join/RightOuterJoinFn.java
+++ b/src/main/java/com/cloudera/crunch/lib/join/RightOuterJoinFn.java
@@ -28,7 +28,8 @@ import com.google.common.collect.Lists;
  * @param <V> Type of the second {@link com.cloudera.crunch.PTable}'s values
  */
 public class RightOuterJoinFn<K, U, V> extends JoinFn<K, U, V> {
-  private transient int lastId;
+  private static final long serialVersionUID = 1L;
+  
   private transient K lastKey;
   private transient List<U> leftValues;
 

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/2d25d5df/src/main/java/com/cloudera/crunch/types/PGroupedTableType.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/cloudera/crunch/types/PGroupedTableType.java b/src/main/java/com/cloudera/crunch/types/PGroupedTableType.java
index 0a90d14..b6ab4fa 100644
--- a/src/main/java/com/cloudera/crunch/types/PGroupedTableType.java
+++ b/src/main/java/com/cloudera/crunch/types/PGroupedTableType.java
@@ -65,7 +65,8 @@ public abstract class PGroupedTableType<K, V> implements PType<Pair<K, Iterable<
 
   public static class PairIterableMapFn<K, V> extends
       MapFn<Pair<Object, Iterable<Object>>, Pair<K, Iterable<V>>> {
-
+    private static final long serialVersionUID = 1L;
+    
     private final MapFn<Object, K> keys;
     private final MapFn<Object, V> values;
 
@@ -82,7 +83,7 @@ public abstract class PGroupedTableType<K, V> implements PType<Pair<K, Iterable<
 
     @Override
     public Pair<K, Iterable<V>> map(Pair<Object, Iterable<Object>> input) {
-      return Pair.<K, Iterable<V>> of(keys.map(input.first()),
+      return Pair.<K, Iterable<V>>of(keys.map(input.first()),
           new PTypeIterable(values, input.second()));
     }
   }