You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by pr...@apache.org on 2009/11/24 20:54:34 UTC

svn commit: r883836 [5/23] - in /hadoop/pig/branches/load-store-redesign: ./ contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/ contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/ contrib/zebra/ contrib/zebra...

Added: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/tfile/Chunk.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/tfile/Chunk.java?rev=883836&view=auto
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/tfile/Chunk.java (added)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/tfile/Chunk.java Tue Nov 24 19:54:19 2009
@@ -0,0 +1,429 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.zebra.tfile;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+/**
+ * Several related classes to support chunk-encoded sub-streams on top of a
+ * regular stream.
+ */
+final class Chunk {
+
+  /**
+   * Prevent the instantiation of class.
+   */
+  private Chunk() {
+    // nothing
+  }
+
+  /**
+   * Decoding a chain of chunks encoded through ChunkEncoder or
+   * SingleChunkEncoder.
+   */
+  static public class ChunkDecoder extends InputStream {
+    private DataInputStream in = null;
+    private boolean lastChunk;
+    private int remain = 0;
+    private boolean closed;
+
+    public ChunkDecoder() {
+      lastChunk = true;
+      closed = true;
+    }
+
+    public void reset(DataInputStream downStream) {
+      // no need to wind forward the old input.
+      in = downStream;
+      lastChunk = false;
+      remain = 0;
+      closed = false;
+    }
+
+    /**
+     * Constructor
+     * 
+     * @param in
+     *          The source input stream which contains chunk-encoded data
+     *          stream.
+     */
+    public ChunkDecoder(DataInputStream in) {
+      this.in = in;
+      lastChunk = false;
+      closed = false;
+    }
+
+    /**
+     * Have we reached the last chunk.
+     * 
+     * @return true if we have reached the last chunk.
+     * @throws java.io.IOException
+     */
+    public boolean isLastChunk() throws IOException {
+      checkEOF();
+      return lastChunk;
+    }
+
+    /**
+     * How many bytes remain in the current chunk?
+     * 
+     * @return remaining bytes left in the current chunk.
+     * @throws java.io.IOException
+     */
+    public int getRemain() throws IOException {
+      checkEOF();
+      return remain;
+    }
+
+    /**
+     * Reading the length of next chunk.
+     * 
+     * @throws java.io.IOException
+     *           when no more data is available.
+     */
+    private void readLength() throws IOException {
+      remain = Utils.readVInt(in);
+      if (remain >= 0) {
+        lastChunk = true;
+      } else {
+        remain = -remain;
+      }
+    }
+
+    /**
+     * Check whether we reach the end of the stream.
+     * 
+     * @return false if the chunk encoded stream has more data to read (in which
+     *         case available() will be greater than 0); true otherwise.
+     * @throws java.io.IOException
+     *           on I/O errors.
+     */
+    private boolean checkEOF() throws IOException {
+      if (isClosed()) return true;
+      while (true) {
+        if (remain > 0) return false;
+        if (lastChunk) return true;
+        readLength();
+      }
+    }
+
+    @Override
+    /*
+     * This method never blocks the caller. Returning 0 does not mean we reach
+     * the end of the stream.
+     */
+    public int available() {
+      return remain;
+    }
+
+    @Override
+    public int read() throws IOException {
+      if (checkEOF()) return -1;
+      int ret = in.read();
+      if (ret < 0) throw new IOException("Corrupted chunk encoding stream");
+      --remain;
+      return ret;
+    }
+
+    @Override
+    public int read(byte[] b) throws IOException {
+      return read(b, 0, b.length);
+    }
+
+    @Override
+    public int read(byte[] b, int off, int len) throws IOException {
+      if ((off | len | (off + len) | (b.length - (off + len))) < 0) {
+        throw new IndexOutOfBoundsException();
+      }
+
+      if (!checkEOF()) {
+        int n = Math.min(remain, len);
+        int ret = in.read(b, off, n);
+        if (ret < 0) throw new IOException("Corrupted chunk encoding stream");
+        remain -= ret;
+        return ret;
+      }
+      return -1;
+    }
+
+    @Override
+    public long skip(long n) throws IOException {
+      if (!checkEOF()) {
+        long ret = in.skip(Math.min(remain, n));
+        remain -= ret;
+        return ret;
+      }
+      return 0;
+    }
+
+    @Override
+    public boolean markSupported() {
+      return false;
+    }
+
+    public boolean isClosed() {
+      return closed;
+    }
+
+    @Override
+    public void close() throws IOException {
+      if (closed == false) {
+        try {
+          while (!checkEOF()) {
+            skip(Integer.MAX_VALUE);
+          }
+        } finally {
+          closed = true;
+        }
+      }
+    }
+  }
+
+  /**
+   * Chunk Encoder. Encoding the output data into a chain of chunks in the
+   * following sequences: -len1, byte[len1], -len2, byte[len2], ... len_n,
+   * byte[len_n]. Where len1, len2, ..., len_n are the lengths of the data
+   * chunks. Non-terminal chunks have their lengths negated. Non-terminal chunks
+   * cannot have length 0. All lengths are in the range of 0 to
+   * Integer.MAX_VALUE and are encoded in Utils.VInt format.
+   */
+  static public class ChunkEncoder extends OutputStream {
+    /**
+     * The data output stream it connects to.
+     */
+    private DataOutputStream out;
+
+    /**
+     * The internal buffer that is only used when we do not know the advertised
+     * size.
+     */
+    private byte buf[];
+
+    /**
+     * The number of valid bytes in the buffer. This value is always in the
+     * range <tt>0</tt> through <tt>buf.length</tt>; elements <tt>buf[0]</tt>
+     * through <tt>buf[count-1]</tt> contain valid byte data.
+     */
+    private int count;
+
+    /**
+     * Constructor.
+     * 
+     * @param out
+     *          the underlying output stream.
+     * @param buf
+     *          user-supplied buffer. The buffer would be used exclusively by
+     *          the ChunkEncoder during its life cycle.
+     */
+    public ChunkEncoder(DataOutputStream out, byte[] buf) {
+      this.out = out;
+      this.buf = buf;
+      this.count = 0;
+    }
+
+    /**
+     * Write out a chunk.
+     * 
+     * @param chunk
+     *          The chunk buffer.
+     * @param offset
+     *          Offset to chunk buffer for the beginning of chunk.
+     * @param len
+     * @param last
+     *          Is this the last call to flushBuffer?
+     */
+    private void writeChunk(byte[] chunk, int offset, int len, boolean last)
+        throws IOException {
+      if (last) { // always write out the length for the last chunk.
+        Utils.writeVInt(out, len);
+        if (len > 0) {
+          out.write(chunk, offset, len);
+        }
+      } else {
+        if (len > 0) {
+          Utils.writeVInt(out, -len);
+          out.write(chunk, offset, len);
+        }
+      }
+    }
+
+    /**
+     * Write out a chunk that is a concatenation of the internal buffer plus
+     * user supplied data. This will never be the last block.
+     * 
+     * @param data
+     *          User supplied data buffer.
+     * @param offset
+     *          Offset to user data buffer.
+     * @param len
+     *          User data buffer size.
+     */
+    private void writeBufData(byte[] data, int offset, int len)
+        throws IOException {
+      if (count + len > 0) {
+        Utils.writeVInt(out, -(count + len));
+        out.write(buf, 0, count);
+        count = 0;
+        out.write(data, offset, len);
+      }
+    }
+
+    /**
+     * Flush the internal buffer.
+     * 
+     * Is this the last call to flushBuffer?
+     * 
+     * @throws java.io.IOException
+     */
+    private void flushBuffer() throws IOException {
+      if (count > 0) {
+        writeChunk(buf, 0, count, false);
+        count = 0;
+      }
+    }
+
+    @Override
+    public void write(int b) throws IOException {
+      if (count >= buf.length) {
+        flushBuffer();
+      }
+      buf[count++] = (byte) b;
+    }
+
+    @Override
+    public void write(byte b[]) throws IOException {
+      write(b, 0, b.length);
+    }
+
+    @Override
+    public void write(byte b[], int off, int len) throws IOException {
+      if ((len + count) >= buf.length) {
+        /*
+         * If the input data do not fit in buffer, flush the output buffer and
+         * then write the data directly. In this way buffered streams will
+         * cascade harmlessly.
+         */
+        writeBufData(b, off, len);
+        return;
+      }
+
+      System.arraycopy(b, off, buf, count, len);
+      count += len;
+    }
+
+    @Override
+    public void flush() throws IOException {
+      flushBuffer();
+      out.flush();
+    }
+
+    @Override
+    public void close() throws IOException {
+      if (buf != null) {
+        try {
+          writeChunk(buf, 0, count, true);
+        } finally {
+          buf = null;
+          out = null;
+        }
+      }
+    }
+  }
+
+  /**
+   * Encode the whole stream as a single chunk. Expecting to know the size of
+   * the chunk up-front.
+   */
+  static public class SingleChunkEncoder extends OutputStream {
+    /**
+     * The data output stream it connects to.
+     */
+    private final DataOutputStream out;
+
+    /**
+     * The remaining bytes to be written.
+     */
+    private int remain;
+    private boolean closed = false;
+
+    /**
+     * Constructor.
+     * 
+     * @param out
+     *          the underlying output stream.
+     * @param size
+     *          The total # of bytes to be written as a single chunk.
+     * @throws java.io.IOException
+     *           if an I/O error occurs.
+     */
+    public SingleChunkEncoder(DataOutputStream out, int size)
+        throws IOException {
+      this.out = out;
+      this.remain = size;
+      Utils.writeVInt(out, size);
+    }
+
+    @Override
+    public void write(int b) throws IOException {
+      if (remain > 0) {
+        out.write(b);
+        --remain;
+      } else {
+        throw new IOException("Writing more bytes than advertised size.");
+      }
+    }
+
+    @Override
+    public void write(byte b[]) throws IOException {
+      write(b, 0, b.length);
+    }
+
+    @Override
+    public void write(byte b[], int off, int len) throws IOException {
+      if (remain >= len) {
+        out.write(b, off, len);
+        remain -= len;
+      } else {
+        throw new IOException("Writing more bytes than advertised size.");
+      }
+    }
+
+    @Override
+    public void flush() throws IOException {
+      out.flush();
+    }
+
+    @Override
+    public void close() throws IOException {
+      if (closed == true) {
+        return;
+      }
+
+      try {
+        if (remain > 0) {
+          throw new IOException("Writing less bytes than advertised size.");
+        }
+      } finally {
+        closed = true;
+      }
+    }
+  }
+}

Added: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/tfile/CompareUtils.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/tfile/CompareUtils.java?rev=883836&view=auto
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/tfile/CompareUtils.java (added)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/tfile/CompareUtils.java Tue Nov 24 19:54:19 2009
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.zebra.tfile;
+
+import java.io.Serializable;
+import java.util.Comparator;
+
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.WritableComparator;
+
+class CompareUtils {
+  /**
+   * Prevent the instantiation of class.
+   */
+  private CompareUtils() {
+    // nothing
+  }
+
+  /**
+   * A comparator to compare anything that implements {@link RawComparable}
+   * using a customized comparator.
+   */
+  public static final class BytesComparator implements
+      Comparator<RawComparable> {
+    private RawComparator<Object> cmp;
+
+    public BytesComparator(RawComparator<Object> cmp) {
+      this.cmp = cmp;
+    }
+
+    @Override
+    public int compare(RawComparable o1, RawComparable o2) {
+      return compare(o1.buffer(), o1.offset(), o1.size(), o2.buffer(), o2
+          .offset(), o2.size());
+    }
+
+    public int compare(byte[] a, int off1, int len1, byte[] b, int off2,
+        int len2) {
+      return cmp.compare(a, off1, len1, b, off2, len2);
+    }
+  }
+
+  /**
+   * Interface for all objects that has a single integer magnitude.
+   */
+  static interface Scalar {
+    long magnitude();
+  }
+
+  static final class ScalarLong implements Scalar {
+    private long magnitude;
+
+    public ScalarLong(long m) {
+      magnitude = m;
+    }
+
+    public long magnitude() {
+      return magnitude;
+    }
+  }
+
+  public static final class ScalarComparator implements Comparator<Scalar>, Serializable {
+    @Override
+    public int compare(Scalar o1, Scalar o2) {
+      long diff = o1.magnitude() - o2.magnitude();
+      if (diff < 0) return -1;
+      if (diff > 0) return 1;
+      return 0;
+    }
+  }
+
+  public static final class MemcmpRawComparator implements
+      RawComparator<Object>, Serializable {
+    @Override
+    public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+      return WritableComparator.compareBytes(b1, s1, l1, b2, s2, l2);
+    }
+
+    @Override
+    public int compare(Object o1, Object o2) {
+      throw new RuntimeException("Object comparison not supported");
+    }
+  }
+}

Added: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/tfile/Compression.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/tfile/Compression.java?rev=883836&view=auto
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/tfile/Compression.java (added)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/tfile/Compression.java Tue Nov 24 19:54:19 2009
@@ -0,0 +1,361 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.zebra.tfile;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.FilterOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.compress.CodecPool;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionInputStream;
+import org.apache.hadoop.io.compress.CompressionOutputStream;
+import org.apache.hadoop.io.compress.Compressor;
+import org.apache.hadoop.io.compress.Decompressor;
+import org.apache.hadoop.io.compress.DefaultCodec;
+import org.apache.hadoop.util.ReflectionUtils;
+
+/**
+ * Compression related stuff.
+ */
+final class Compression {
+  static final Log LOG = LogFactory.getLog(Compression.class);
+
+  /**
+   * Prevent the instantiation of class.
+   */
+  private Compression() {
+    // nothing
+  }
+
+  static class FinishOnFlushCompressionStream extends FilterOutputStream {
+    public FinishOnFlushCompressionStream(CompressionOutputStream cout) {
+      super(cout);
+    }
+
+    @Override
+    public void write(byte b[], int off, int len) throws IOException {
+      out.write(b, off, len);
+    }
+
+    @Override
+    public void flush() throws IOException {
+      CompressionOutputStream cout = (CompressionOutputStream) out;
+      cout.finish();
+      cout.flush();
+      cout.resetState();
+    }
+  }
+
+  /**
+   * Compression algorithms.
+   */
+  static enum Algorithm {
+    LZO(TFile.COMPRESSION_LZO) {
+      private transient boolean checked = false;
+      private static final String defaultClazz =
+          "org.apache.hadoop.io.compress.LzoCodec";
+      private transient CompressionCodec codec = null;
+
+      @Override
+      public synchronized boolean isSupported() {
+        if (!checked) {
+          checked = true;
+          String extClazz =
+              (conf.get(CONF_LZO_CLASS) == null ? System
+                  .getProperty(CONF_LZO_CLASS) : null);
+          String clazz = (extClazz != null) ? extClazz : defaultClazz;
+          try {
+            LOG.info("Trying to load Lzo codec class: " + clazz);
+            codec =
+                (CompressionCodec) ReflectionUtils.newInstance(Class
+                    .forName(clazz), conf);
+          } catch (ClassNotFoundException e) {
+            // that is okay
+          }
+        }
+        return codec != null;
+      }
+
+      @Override
+      CompressionCodec getCodec() throws IOException {
+        if (!isSupported()) {
+          throw new IOException(
+              "LZO codec class not specified. Did you forget to set property "
+                  + CONF_LZO_CLASS + "?");
+        }
+
+        return codec;
+      }
+
+      @Override
+      public synchronized InputStream createDecompressionStream(
+          InputStream downStream, Decompressor decompressor,
+          int downStreamBufferSize) throws IOException {
+        if (!isSupported()) {
+          throw new IOException(
+              "LZO codec class not specified. Did you forget to set property "
+                  + CONF_LZO_CLASS + "?");
+        }
+        InputStream bis1 = null;
+        if (downStreamBufferSize > 0) {
+          bis1 = new BufferedInputStream(downStream, downStreamBufferSize);
+        } else {
+          bis1 = downStream;
+        }
+        conf.setInt("io.compression.codec.lzo.buffersize", 64 * 1024);
+        CompressionInputStream cis =
+            codec.createInputStream(bis1, decompressor);
+        BufferedInputStream bis2 = new BufferedInputStream(cis, DATA_IBUF_SIZE);
+        return bis2;
+      }
+
+      @Override
+      public synchronized OutputStream createCompressionStream(
+          OutputStream downStream, Compressor compressor,
+          int downStreamBufferSize) throws IOException {
+        if (!isSupported()) {
+          throw new IOException(
+              "LZO codec class not specified. Did you forget to set property "
+                  + CONF_LZO_CLASS + "?");
+        }
+        OutputStream bos1 = null;
+        if (downStreamBufferSize > 0) {
+          bos1 = new BufferedOutputStream(downStream, downStreamBufferSize);
+        } else {
+          bos1 = downStream;
+        }
+        conf.setInt("io.compression.codec.lzo.buffersize", 64 * 1024);
+        CompressionOutputStream cos =
+            codec.createOutputStream(bos1, compressor);
+        BufferedOutputStream bos2 =
+            new BufferedOutputStream(new FinishOnFlushCompressionStream(cos),
+                DATA_OBUF_SIZE);
+        return bos2;
+      }
+    },
+
+    GZ(TFile.COMPRESSION_GZ) {
+      private transient DefaultCodec codec;
+
+      @Override
+      CompressionCodec getCodec() {
+        if (codec == null) {
+          codec = new DefaultCodec();
+          codec.setConf(conf);
+        }
+
+        return codec;
+      }
+
+      @Override
+      public synchronized InputStream createDecompressionStream(
+          InputStream downStream, Decompressor decompressor,
+          int downStreamBufferSize) throws IOException {
+        // Set the internal buffer size to read from down stream.
+        if (downStreamBufferSize > 0) {
+          codec.getConf().setInt("io.file.buffer.size", downStreamBufferSize);
+        }
+        CompressionInputStream cis =
+            codec.createInputStream(downStream, decompressor);
+        BufferedInputStream bis2 = new BufferedInputStream(cis, DATA_IBUF_SIZE);
+        return bis2;
+      }
+
+      @Override
+      public synchronized OutputStream createCompressionStream(
+          OutputStream downStream, Compressor compressor,
+          int downStreamBufferSize) throws IOException {
+        OutputStream bos1 = null;
+        if (downStreamBufferSize > 0) {
+          bos1 = new BufferedOutputStream(downStream, downStreamBufferSize);
+        } else {
+          bos1 = downStream;
+        }
+        codec.getConf().setInt("io.file.buffer.size", 32 * 1024);
+        CompressionOutputStream cos =
+            codec.createOutputStream(bos1, compressor);
+        BufferedOutputStream bos2 =
+            new BufferedOutputStream(new FinishOnFlushCompressionStream(cos),
+                DATA_OBUF_SIZE);
+        return bos2;
+      }
+
+      @Override
+      public boolean isSupported() {
+        return true;
+      }
+    },
+
+    NONE(TFile.COMPRESSION_NONE) {
+      @Override
+      CompressionCodec getCodec() {
+        return null;
+      }
+
+      @Override
+      public synchronized InputStream createDecompressionStream(
+          InputStream downStream, Decompressor decompressor,
+          int downStreamBufferSize) throws IOException {
+        if (downStreamBufferSize > 0) {
+          return new BufferedInputStream(downStream, downStreamBufferSize);
+        }
+        return downStream;
+      }
+
+      @Override
+      public synchronized OutputStream createCompressionStream(
+          OutputStream downStream, Compressor compressor,
+          int downStreamBufferSize) throws IOException {
+        if (downStreamBufferSize > 0) {
+          return new BufferedOutputStream(downStream, downStreamBufferSize);
+        }
+
+        return downStream;
+      }
+
+      @Override
+      public boolean isSupported() {
+        return true;
+      }
+    };
+
+    // We require that all compression related settings are configured
+    // statically in the Configuration object.
+    protected static final Configuration conf = new Configuration();
+    private final String compressName;
+    // data input buffer size to absorb small reads from application.
+    private static final int DATA_IBUF_SIZE = 1 * 1024;
+    // data output buffer size to absorb small writes from application.
+    private static final int DATA_OBUF_SIZE = 4 * 1024;
+    public static final String CONF_LZO_CLASS =
+        "io.compression.codec.lzo.class";
+
+    Algorithm(String name) {
+      this.compressName = name;
+    }
+
+    abstract CompressionCodec getCodec() throws IOException;
+
+    public abstract InputStream createDecompressionStream(
+        InputStream downStream, Decompressor decompressor,
+        int downStreamBufferSize) throws IOException;
+
+    public abstract OutputStream createCompressionStream(
+        OutputStream downStream, Compressor compressor, int downStreamBufferSize)
+        throws IOException;
+
+    public abstract boolean isSupported();
+
+    public Compressor getCompressor() throws IOException {
+      CompressionCodec codec = getCodec();
+      if (codec != null) {
+        Compressor compressor = CodecPool.getCompressor(codec);
+        if (compressor != null) {
+          if (compressor.finished()) {
+            // Somebody returns the compressor to CodecPool but is still using
+            // it.
+            LOG.warn("Compressor obtained from CodecPool already finished()");
+          } else {
+            LOG.debug("Got a compressor: " + compressor.hashCode());
+          }
+          /**
+           * Following statement is necessary to get around bugs in 0.18 where a
+           * compressor is referenced after returned back to the codec pool.
+           */
+          compressor.reset();
+        }
+        return compressor;
+      }
+      return null;
+    }
+
+    public void returnCompressor(Compressor compressor) {
+      if (compressor != null) {
+        LOG.debug("Return a compressor: " + compressor.hashCode());
+        CodecPool.returnCompressor(compressor);
+      }
+    }
+
+    public Decompressor getDecompressor() throws IOException {
+      CompressionCodec codec = getCodec();
+      if (codec != null) {
+        Decompressor decompressor = CodecPool.getDecompressor(codec);
+        if (decompressor != null) {
+          if (decompressor.finished()) {
+            // Somebody returns the decompressor to CodecPool but is still using
+            // it.
+            LOG.warn("Deompressor obtained from CodecPool already finished()");
+          } else {
+            LOG.debug("Got a decompressor: " + decompressor.hashCode());
+          }
+          /**
+           * Following statement is necessary to get around bugs in 0.18 where a
+           * decompressor is referenced after returned back to the codec pool.
+           */
+          decompressor.reset();
+        }
+        return decompressor;
+      }
+
+      return null;
+    }
+
+    public void returnDecompressor(Decompressor decompressor) {
+      if (decompressor != null) {
+        LOG.debug("Returned a decompressor: " + decompressor.hashCode());
+        CodecPool.returnDecompressor(decompressor);
+      }
+    }
+
+    public String getName() {
+      return compressName;
+    }
+  }
+
+  static Algorithm getCompressionAlgorithmByName(String compressName) {
+    Algorithm[] algos = Algorithm.class.getEnumConstants();
+
+    for (Algorithm a : algos) {
+      if (a.getName().equals(compressName)) {
+        return a;
+      }
+    }
+
+    throw new IllegalArgumentException(
+        "Unsupported compression algorithm name: " + compressName);
+  }
+
+  static String[] getSupportedAlgorithms() {
+    Algorithm[] algos = Algorithm.class.getEnumConstants();
+
+    ArrayList<String> ret = new ArrayList<String>();
+    for (Algorithm a : algos) {
+      if (a.isSupported()) {
+        ret.add(a.getName());
+      }
+    }
+    return ret.toArray(new String[ret.size()]);
+  }
+}

Added: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/tfile/MetaBlockAlreadyExists.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/tfile/MetaBlockAlreadyExists.java?rev=883836&view=auto
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/tfile/MetaBlockAlreadyExists.java (added)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/tfile/MetaBlockAlreadyExists.java Tue Nov 24 19:54:19 2009
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.zebra.tfile;
+
+import java.io.IOException;
+
+/**
+ * Exception - Meta Block with the same name already exists.
+ */
+@SuppressWarnings("serial")
+public class MetaBlockAlreadyExists extends IOException {
+  /**
+   * Constructor
+   * 
+   * @param s
+   *          message.
+   */
+  MetaBlockAlreadyExists(String s) {
+    super(s);
+  }
+}

Added: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/tfile/MetaBlockDoesNotExist.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/tfile/MetaBlockDoesNotExist.java?rev=883836&view=auto
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/tfile/MetaBlockDoesNotExist.java (added)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/tfile/MetaBlockDoesNotExist.java Tue Nov 24 19:54:19 2009
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.zebra.tfile;
+
+import java.io.IOException;
+
+/**
+ * Exception - No such Meta Block with the given name.
+ */
+@SuppressWarnings("serial")
+public class MetaBlockDoesNotExist extends IOException {
+  /**
+   * Constructor
+   * 
+   * @param s
+   *          message.
+   */
+  MetaBlockDoesNotExist(String s) {
+    super(s);
+  }
+}

Added: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/tfile/RawComparable.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/tfile/RawComparable.java?rev=883836&view=auto
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/tfile/RawComparable.java (added)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/tfile/RawComparable.java Tue Nov 24 19:54:19 2009
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.zebra.tfile;
+
+import java.util.Collections;
+import java.util.Comparator;
+
+import org.apache.hadoop.io.RawComparator;
+
+/**
+ * Interface for objects that can be compared through {@link RawComparator}.
+ * This is useful in places where we need a single object reference to specify a
+ * range of bytes in a byte array, such as {@link Comparable} or
+ * {@link Collections#binarySearch(java.util.List, Object, Comparator)}
+ * 
+ * The actual comparison among RawComparable's requires an external
+ * RawComparator and it is applications' responsibility to ensure two
+ * RawComparable are supposed to be semantically comparable with the same
+ * RawComparator.
+ */
+public interface RawComparable {
+  /**
+   * Get the underlying byte array.
+   * 
+   * @return The underlying byte array.
+   */
+  abstract byte[] buffer();
+
+  /**
+   * Get the offset of the first byte in the byte array.
+   * 
+   * @return The offset of the first byte in the byte array.
+   */
+  abstract int offset();
+
+  /**
+   * Get the size of the byte range in the byte array.
+   * 
+   * @return The size of the byte range in the byte array.
+   */
+  abstract int size();
+}

Added: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/tfile/SimpleBufferedOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/tfile/SimpleBufferedOutputStream.java?rev=883836&view=auto
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/tfile/SimpleBufferedOutputStream.java (added)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/tfile/SimpleBufferedOutputStream.java Tue Nov 24 19:54:19 2009
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.zebra.tfile;
+
+import java.io.FilterOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+
+/**
+ * A simplified BufferedOutputStream with borrowed buffer, and allow users to
+ * see how much data have been buffered.
+ */
+class SimpleBufferedOutputStream extends FilterOutputStream {
+  protected byte buf[]; // the borrowed buffer
+  protected int count = 0; // bytes used in buffer.
+
+  // Constructor
+  public SimpleBufferedOutputStream(OutputStream out, byte[] buf) {
+    super(out);
+    this.buf = buf;
+  }
+
+  private void flushBuffer() throws IOException {
+    if (count > 0) {
+      out.write(buf, 0, count);
+      count = 0;
+    }
+  }
+
+  @Override
+  public void write(int b) throws IOException {
+    if (count >= buf.length) {
+      flushBuffer();
+    }
+    buf[count++] = (byte) b;
+  }
+
+  @Override
+  public void write(byte b[], int off, int len) throws IOException {
+    if (len >= buf.length) {
+      flushBuffer();
+      out.write(b, off, len);
+      return;
+    }
+    if (len > buf.length - count) {
+      flushBuffer();
+    }
+    System.arraycopy(b, off, buf, count, len);
+    count += len;
+  }
+
+  @Override
+  public synchronized void flush() throws IOException {
+    flushBuffer();
+    out.flush();
+  }
+
+  // Get the size of internal buffer being used.
+  public int size() {
+    return count;
+  }
+}