You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by pr...@apache.org on 2009/11/24 20:54:34 UTC

svn commit: r883836 [4/23] - in /hadoop/pig/branches/load-store-redesign: ./ contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/ contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/ contrib/zebra/ contrib/zebra...

Added: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/LeafGenerator.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/LeafGenerator.java?rev=883836&view=auto
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/LeafGenerator.java (added)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/LeafGenerator.java Tue Nov 24 19:54:19 2009
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.zebra.pig.comparator;
+
+import java.io.PrintStream;
+
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.Tuple;
+
+/**
+ * Generate partial binary key for a leaf expression.
+ */
+public final class LeafGenerator {
+  final LeafExpr leaf; // may be null to represent adding '0'
+  final int escapeLevel;
+  final int comescLevel;
+  final boolean complement;
+
+  /**
+   * @param leaf
+   *          The leaf expression
+   * @param el
+   *          escape level
+   * @param cel
+   *          complement escape level
+   * @param complement
+   *          complement the value
+   */
+  public LeafGenerator(LeafExpr leaf, int el, int cel, boolean complement) {
+    this.leaf = leaf;
+    this.escapeLevel = el;
+    if(escapeLevel > 1) {
+    	System.out.println("gaurav");
+    }
+    this.comescLevel = cel;
+    this.complement = complement;
+  }
+
+  void append(EncodingOutputStream out, Tuple tuple) throws ExecException {
+    out.setEscapeParams(escapeLevel, comescLevel, complement);
+    if (leaf == null) { // add a '\0'
+      out.write(0x0);
+    } else {
+      leaf.append(out, tuple);
+    }
+  }
+
+  void illustrate(PrintStream out) {
+    if (leaf == null) {
+      out.printf("(%s, NA, %d, %d, %b)", "NULL", escapeLevel, comescLevel,
+          complement);
+    } else {
+      leaf.illustrate(out, escapeLevel, comescLevel, complement);
+    }
+  }
+}

Added: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/LongExpr.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/LongExpr.java?rev=883836&view=auto
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/LongExpr.java (added)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/LongExpr.java Tue Nov 24 19:54:19 2009
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.zebra.pig.comparator;
+
+
+public final class LongExpr extends FixedLengthPrimitive {
+  public LongExpr(int index) {
+    super(index, Long.SIZE / Byte.SIZE);
+  }
+
+  protected void convertValue(byte[] b, Object o) {
+    if (b.length != Long.SIZE / Byte.SIZE) {
+      throw new IllegalArgumentException("Incorrect buffer size");
+    }
+    long l = (Long) o;
+    l ^= 1L << (Long.SIZE - 1);
+    b[7] = (byte) l;
+    b[6] = (byte) (l >> 8);
+    b[5] = (byte) (l >> 16);
+    b[4] = (byte) (l >> 24);
+    b[3] = (byte) (l >> 32);
+    b[2] = (byte) (l >> 40);
+    b[1] = (byte) (l >> 48);
+    b[0] = (byte) (l >> 56);
+  }
+
+  @Override
+  protected String getType() {
+    return "Long";
+  }
+}

Added: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/NegateExpr.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/NegateExpr.java?rev=883836&view=auto
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/NegateExpr.java (added)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/NegateExpr.java Tue Nov 24 19:54:19 2009
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.zebra.pig.comparator;
+
+import java.io.PrintStream;
+import java.util.List;
+
+/**
+ * Negate expression
+ */
+public class NegateExpr extends ComparatorExpr {
+  protected final ComparatorExpr expr;
+
+  /**
+   * constructor
+   * 
+   * @param expr
+   */
+  protected NegateExpr(ComparatorExpr expr) {
+    this.expr = expr;
+  }
+
+  @Override
+  protected void appendLeafGenerator(List<LeafGenerator> list, int el, int cel,
+      boolean c, boolean explicitBound) {
+    expr.appendLeafGenerator(list, cel, el, !c, true);
+  }
+
+  /**
+   * @return the child expression
+   */
+  public ComparatorExpr childExpr() {
+    return expr;
+  }
+
+  @Override
+  protected void toString(PrintStream out) {
+    out.print("Negate(");
+    expr.toString(out);
+    out.print(")");
+  }
+  
+  /**
+   * Make a negate expression
+   */
+  public static ComparatorExpr makeNegateExpr(ComparatorExpr expr) {
+    if (expr instanceof NegateExpr) {
+      NegateExpr negateExpr = (NegateExpr) expr;
+      return negateExpr.childExpr();
+    }
+
+    return new NegateExpr(expr);
+  }
+}

Added: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/ShortExpr.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/ShortExpr.java?rev=883836&view=auto
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/ShortExpr.java (added)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/ShortExpr.java Tue Nov 24 19:54:19 2009
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.zebra.pig.comparator;
+
+
+public final class ShortExpr extends FixedLengthPrimitive {
+  public ShortExpr(int index) {
+    super(index, Short.SIZE / Byte.SIZE);
+  }
+
+  protected void convertValue(byte[] b, Object o) {
+    if (b.length != Short.SIZE / Byte.SIZE) {
+      throw new IllegalArgumentException("Incorrect buffer size");
+    }
+    short l = (Short) o;
+    l ^= 1 << (Short.SIZE - 1);
+    b[1] = (byte) l;
+    b[0] = (byte) (l >> 8);
+  }
+
+  @Override
+  protected String getType() {
+    return "Short";
+  }
+}

Added: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/StringExpr.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/StringExpr.java?rev=883836&view=auto
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/StringExpr.java (added)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/StringExpr.java Tue Nov 24 19:54:19 2009
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.zebra.pig.comparator;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.io.Text;
+
+public final class StringExpr extends LeafExpr {
+  public StringExpr(int index) {
+    super(index);
+  }
+
+  @Override
+  protected void appendObject(EncodingOutputStream out, Object o) {
+    String s = (String) o;
+    try {
+      ByteBuffer bytes = Text.encode(s);
+      out.write(bytes.array(), 0, bytes.limit());
+    } catch (IOException e) {
+      throw new RuntimeException("Conversion error", e);
+    }
+  }
+
+  @Override
+  protected boolean implicitBound() {
+    return true;
+  }
+
+  @Override
+  protected String getType() {
+    return "String";
+  }
+}

Added: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/TupleExpr.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/TupleExpr.java?rev=883836&view=auto
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/TupleExpr.java (added)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/TupleExpr.java Tue Nov 24 19:54:19 2009
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.zebra.pig.comparator;
+
+import java.io.PrintStream;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+
+public class TupleExpr extends ComparatorExpr {
+  protected final List<ComparatorExpr> exprs;
+
+  protected TupleExpr(List<ComparatorExpr> exprs) {
+    this.exprs = exprs;
+  }
+
+  @Override
+  protected void appendLeafGenerator(List<LeafGenerator> list, int el, int cel,
+      boolean c, boolean explicitBound) {
+    int length = exprs.size();
+    for (int i = 0; i < length - 1; ++i) {
+      ComparatorExpr e = exprs.get(i);
+      e.appendLeafGenerator(list, el, cel, c, true);
+    }
+    exprs.get(length - 1).appendLeafGenerator(list, el, cel, c, explicitBound);
+  }
+
+  /**
+   * Get the children expressions.
+   * 
+   * @return The children expressions.
+   */
+  public List<ComparatorExpr> childrenExpr() {
+    return exprs;
+  }
+
+  @Override
+  protected void toString(PrintStream out) {
+    out.print("Tuple");
+    String sep = "(";
+    for (Iterator<ComparatorExpr> it = exprs.iterator(); it.hasNext();) {
+      out.printf(sep);
+      it.next().toString(out);
+      sep = ", ";
+    }
+
+    out.print(")");
+  }
+  
+  /**
+   * Make a tuple expression
+   * 
+   * @param exprs
+   *          children expressions
+   * @return a comparator expression
+   */
+  public static ComparatorExpr makeTupleExpr(ComparatorExpr... exprs) {
+    if (exprs.length == 0)
+      throw new IllegalArgumentException("Zero-length expression list");
+    if (exprs.length == 1) return exprs[0];
+    List<ComparatorExpr> aggr = new ArrayList<ComparatorExpr>(exprs.length);
+    for (ComparatorExpr e : exprs) {
+      if (e instanceof TupleExpr) {
+        aggr.addAll(((TupleExpr) e).childrenExpr());
+      } else {
+        aggr.add(e);
+      }
+    }
+    return new TupleExpr(aggr);
+  }
+  
+  /**
+   * Make a tuple expression
+   * 
+   * @param exprs
+   *          children expressions
+   * @return a comparator expression
+   */
+  public static ComparatorExpr makeTupleComparator(
+      Collection<? extends ComparatorExpr> exprs) {
+    if (exprs.size() == 0)
+      throw new IllegalArgumentException("Zero-length expression list");
+    if (exprs.size() == 1) return exprs.iterator().next();
+    List<ComparatorExpr> aggr = new ArrayList<ComparatorExpr>(exprs.size());
+    for (Iterator<? extends ComparatorExpr> it = exprs.iterator(); it.hasNext();) {
+      ComparatorExpr e = it.next();
+      if (e instanceof TupleExpr) {
+        aggr.addAll(((TupleExpr) e).childrenExpr());
+      } else {
+        aggr.add(e);
+      }
+    }
+    return new TupleExpr(aggr);
+  }
+
+}

Added: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/package-info.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/package-info.java?rev=883836&view=auto
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/package-info.java (added)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/comparator/package-info.java Tue Nov 24 19:54:19 2009
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+/**
+ * Utilities to allow PIG Storer to generate keys for sorted Zebra tables
+ */
+package org.apache.hadoop.zebra.pig.comparator;
+

Added: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/package-info.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/package-info.java?rev=883836&view=auto
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/package-info.java (added)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/package-info.java Tue Nov 24 19:54:19 2009
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+/**
+ * Implementation of PIG Storer/Loader Interfaces
+ */
+package org.apache.hadoop.zebra.pig;
+

Modified: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/schema/ColumnType.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/schema/ColumnType.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/schema/ColumnType.java (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/schema/ColumnType.java Tue Nov 24 19:54:19 2009
@@ -23,60 +23,96 @@
 import java.io.IOException;
 
 import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.file.tfile.Utils;
+import org.apache.hadoop.zebra.tfile.Utils;
 import org.apache.pig.data.DataType;
 
+/**
+ * Zebra Column Type
+ */
 public enum ColumnType {
+  /**
+   * Any type
+   */
   ANY("any") {
 	  public byte pigDataType() {
 		  return DataType.UNKNOWN;
 	  }
   },
+  /**
+   * Integer
+   */
   INT("int") {
     public byte pigDataType() {
       return DataType.INTEGER;
     }
   },
+  /**
+   * Long
+   */
   LONG("long") {
     public byte pigDataType() {
       return DataType.LONG;
     }
   },
+  /**
+   * Float
+   */
   FLOAT("float") {
     public byte pigDataType() {
       return DataType.FLOAT;
     }
   },
+  /**
+   * Double
+   */
   DOUBLE("double") {
     public byte pigDataType() {
       return DataType.DOUBLE;
     }
   },
+  /**
+   * Boolean
+   */
   BOOL("bool") {
     public byte pigDataType() {
       return DataType.BOOLEAN;
     }
   },
+  /**
+   * Collection
+   */
   COLLECTION("collection") {
     public byte pigDataType() {
       return DataType.BAG;
     }
   },
+  /**
+   * Map
+   */
   MAP("map") {
     public byte pigDataType() {
       return DataType.MAP;
     }
   },
+  /**
+   * Record
+   */
   RECORD("record") {
     public byte pigDataType() {
       return DataType.TUPLE;
     }
   },
+  /**
+   * String
+   */
   STRING("string") {
     public byte pigDataType() {
       return DataType.CHARARRAY;
     }
   },
+  /**
+   * Bytes
+   */
   BYTES("bytes") {
     public byte pigDataType() {
       return DataType.BYTEARRAY;
@@ -102,6 +138,13 @@
     return ColumnType.valueOf(name.toUpperCase());
   }
 
+  /**
+   * Get the Zebra type from a Pig type
+   *
+   * @param dt Pig type
+   *
+   * @return Zebra type
+   */
   public static ColumnType getTypeByPigDataType(byte dt) {
     for (ColumnType ct : ColumnType.values()) {
       if (ct.pigDataType() == dt) {
@@ -111,18 +154,40 @@
     return null;
   }
 
+  /**
+   * Get the Zebra type name for the passed in Zebra column type
+   *
+   * @param columntype Zebra column type
+   *
+   * @return string representation of the Zebra column type
+   */
   public static String findTypeName(ColumnType columntype) {
 	  return columntype.getName();
   }
 
+  /**
+   * Get the Zebra type name for this Zebra column type
+   *
+   * @return string representation of the Zebra column type
+   */
   public String getName() {
     return name;
   }
 
+  /**
+   * Same as getType
+   */
   public String toString() {
     return name;
   }
 
+  /**
+   * check if a column type contains internal schema
+   *
+   * @param columnType Zebra column type
+   *
+   * @return true if the type is RECORD, MAP or COLLECTION
+   */
   public static boolean isSchemaType(ColumnType columnType) {
 	  return ((columnType == RECORD) || (columnType == MAP) || (columnType == COLLECTION));
   }

Modified: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/schema/Schema.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/schema/Schema.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/schema/Schema.java (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/schema/Schema.java Tue Nov 24 19:54:19 2009
@@ -26,6 +26,7 @@
 import java.util.HashSet;
 
 import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.zebra.types.Projection;
 import org.apache.hadoop.zebra.parser.ParseException;
 import org.apache.hadoop.zebra.parser.TableSchemaParser;
 
@@ -37,6 +38,9 @@
 
   private static final long schemaVersion = 1L;
 
+  /**
+   * Column Schema in Schema
+   */
   public static class ColumnSchema {
     private String name;
     private ColumnType type;
@@ -73,6 +77,8 @@
 
     /**
      * access function to get the column name 
+     *
+     * @return name of the column
      */
     public String getName() {
       return name;
@@ -80,6 +86,8 @@
     
     /**
      * access function to get the column type 
+     *
+     * @return column type
      */
     public ColumnType getType() {
       return type;
@@ -87,6 +95,8 @@
     
     /**
      * access function to get the column name 
+     *
+     * @return column index in the parent schema
      */
     public int getIndex() {
       return index;
@@ -128,8 +138,8 @@
     /**
      * Compare two field schema for equality
      * 
-     * @param fschema
-     * @param fother
+     * @param fschema one column schema to be compared
+     * @param fother the other column schema to be compared
      * @return true if ColumnSchema are equal, false otherwise
      */
     public static boolean equals(ColumnSchema fschema, ColumnSchema fother) {
@@ -195,28 +205,47 @@
     /*
      * Returns the schema for the next level structure, in record, collection
      * and map.
+     *
+     * @return Schema of the column
      */
     public Schema getSchema() {
       return schema;
     }
   }
 
-  /*
-   * helper class to parse a column name string one section at a time and find
+  /**
+   * Helper class to parse a column name string one section at a time and find
    * the required type for the parsed part.
    */
   public static class ParsedName {
-    public String mName;
-    int mKeyOffset; // the offset where the keysstring starts
-    public ColumnType mDT = ColumnType.ANY; // parent's type
+    private String mName;
+    private int mKeyOffset; // the offset where the keysstring starts
+    private ColumnType mDT = ColumnType.ANY; // parent's type
 
+    /**
+     * Default ctor
+     */
     public ParsedName() {
     }
 
+    /**
+     * Set the name
+     *
+     * @param name
+     *            column name string
+     */
     public void setName(String name) {
       mName = name;
     }
 
+    /**
+     * Set the name and type
+     *
+     * @param name
+     *            column name string
+     * @param pdt
+     *            column type
+     */
     public void setName(String name, ColumnType pdt) {
       mName = name;
       mDT = pdt;
@@ -227,21 +256,42 @@
       mKeyOffset = keyStrOffset;
     }
 
-    void setDT(ColumnType dt) {
+    /**
+     * Set the column type
+     *
+     * @param dt
+     *          column type to be set with
+     */
+    public void setDT(ColumnType dt) {
       mDT = dt;
     }
 
-    ColumnType getDT() {
+    /**
+     * Get the column type
+     * 
+     * @return column type
+     */
+    public ColumnType getDT() {
       return mDT;
     }
 
-    String getName() {
+    /**
+     * Get the column name
+     *
+     * @return column name
+     */
+    public String getName() {
       return mName;
     }
 
+    /**
+     * Parse one sector of a fully qualified column name; also checks validity
+     * of use of the MAP and RECORD delimiters
+     *
+     * @param fs
+     *          column schema this column name is checked against with          
+     */
     public String parseName(Schema.ColumnSchema fs) throws ParseException {
-      // parse one sector of a fq name, also checks sanity of MAP and RECORD
-      // fields
       int fieldIndex, hashIndex;
       fieldIndex = mName.indexOf('.');
       hashIndex = mName.indexOf('#');
@@ -302,12 +352,13 @@
    *          alpha-numeric characters in column names.
    */
   public Schema(String schema) throws ParseException {
-    init(schema);
+    init(schema, false);
   }
 
   public Schema(String schema, boolean dupAllowed) throws ParseException {
     dupColNameAllowed = dupAllowed;
-    init(schema);
+    // suppose if duplicate is allowed, then it's from projection and hence virtual column is allowed
+    init(schema, dupAllowed);
   }
 
   public Schema(ColumnSchema fs) throws ParseException {
@@ -323,7 +374,7 @@
    *          please use only alpha-numeric characters in column names.
    */
   public Schema(String[] columns) throws ParseException {
-    init(columns);
+    init(columns, false);
   }
 
   /**
@@ -430,7 +481,7 @@
    */
   public static Schema parse(String schema) throws ParseException {
     Schema s = new Schema();
-    s.init(schema);
+    s.init(schema, false);
     return s;
   }
 
@@ -609,15 +660,15 @@
    */
   @Override
   public void readFields(DataInput in) throws IOException {
-    long version = org.apache.hadoop.io.file.tfile.Utils.readVLong(in);
+    long version = org.apache.hadoop.zebra.tfile.Utils.readVLong(in);
 
     if (version > schemaVersion)
       throw new IOException("Schema version is newer than that in software.");
 
     // check-ups are needed for future versions for backward-compatibility
-    String strSchema = org.apache.hadoop.io.file.tfile.Utils.readString(in);
+    String strSchema = org.apache.hadoop.zebra.tfile.Utils.readString(in);
     try {
-      init(strSchema);
+      init(strSchema, false);
     }
     catch (Exception e) {
       throw new IOException(e.getMessage());
@@ -629,11 +680,11 @@
    */
   @Override
   public void write(DataOutput out) throws IOException {
-    org.apache.hadoop.io.file.tfile.Utils.writeVLong(out, schemaVersion);
-    org.apache.hadoop.io.file.tfile.Utils.writeString(out, toString());
+    org.apache.hadoop.zebra.tfile.Utils.writeVLong(out, schemaVersion);
+    org.apache.hadoop.zebra.tfile.Utils.writeString(out, toString());
   }
 
-  private void init(String[] columnNames) throws ParseException {
+  private void init(String[] columnNames, boolean virtualColAllowed) throws ParseException {
     // the arg must be of type or they will be treated as the default type
     mFields = new ArrayList<ColumnSchema>();
     mNames = new HashMap<String, ColumnSchema>();
@@ -647,7 +698,7 @@
     }
     TableSchemaParser parser =
         new TableSchemaParser(new StringReader(sb.toString()));
-    parser.RecordSchema(this);
+    parser.RecordSchema(this, virtualColAllowed);
   }
 
   private void init() {
@@ -655,7 +706,7 @@
     mNames = new HashMap<String, ColumnSchema>();
   }
 
-  private void init(String columnString) throws ParseException {
+  private void init(String columnString, boolean virtualColAllowed) throws ParseException {
     String trimmedColumnStr;
     if (columnString == null || (trimmedColumnStr = columnString.trim()).isEmpty()) {
       init();
@@ -666,7 +717,7 @@
     for (int nx = 0; nx < parts.length; nx++) {
       parts[nx] = parts[nx].trim();
     }
-    init(parts);
+    init(parts, virtualColAllowed);
   }
 
   /**
@@ -683,6 +734,11 @@
     ParsedName pn = new ParsedName();
     HashSet<String> keyentries;
     for (int i = 0; i < ncols; i++) {
+	    if (Projection.isVirtualColumn(projcols[i]))
+	    {
+	      result.add(null);
+	      continue;
+	    }
       pn.setName(projcols[i]);
       if ((cs = getColumnSchemaOnParsedName(pn)) != null) {
         mycs = new ColumnSchema(pn.mName, cs.schema, cs.type);
@@ -944,7 +1000,7 @@
       else {
         if (!ColumnSchema.equals(fs, otherfs))
           throw new ParseException("Different types of column " + fs.name
-              + " in uioned tables");
+              + " in tables of a union");
       }
     }
   }

Modified: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/schema/SchemaParser.jjt
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/schema/SchemaParser.jjt?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/schema/SchemaParser.jjt (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/schema/SchemaParser.jjt Tue Nov 24 19:54:19 2009
@@ -26,6 +26,7 @@
 import java.io.*;
 import org.apache.hadoop.zebra.schema.Schema;
 import org.apache.hadoop.zebra.schema.ColumnType;
+import org.apache.hadoop.zebra.types.Projection;
 
 public class TableSchemaParser {
            public static void main( String[] args )
@@ -39,6 +40,10 @@
 					objout.close();
 					System.out.println(schema.toString());
       }
+      public Schema RecordSchema(Schema list) throws ParseException
+      {
+        return RecordSchema(list, false);
+      }
 }
 PARSER_END(TableSchemaParser)
 
@@ -288,7 +293,7 @@
 	{ return list; }
 }
 
-Schema RecordSchema(Schema list) throws ParseException : 
+Schema RecordSchema(Schema list, boolean virtualColAllowed) throws ParseException : 
 {
 	Schema item = null; 
 	if (list == null)
@@ -298,8 +303,8 @@
 {
 	(
 	(
-		fs = ColumnSchema() {list.add(fs);}
-		( "," fs = ColumnSchema() {list.add(fs);})* <EOF>
+		fs = ColumnSchema() { if (!virtualColAllowed && fs != null && Projection.isVirtualColumn(fs.getName())) throw new ParseException("["+fs.getName()+"] is a reserved virtual column name"); list.add(fs);}
+		( "," fs = ColumnSchema() { if (!virtualColAllowed && fs != null && Projection.isVirtualColumn(fs.getName())) throw new ParseException("["+fs.getName()+"] is a reserved virtual column name"); list.add(fs);})* <EOF>
 	)	
 	)
 	{ return (list.getNumColumns() == 0 || (list.getNumColumns() == 1 && list.getColumn(0) == null) ? null : list); }

Added: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/schema/package-info.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/schema/package-info.java?rev=883836&view=auto
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/schema/package-info.java (added)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/schema/package-info.java Tue Nov 24 19:54:19 2009
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+/**
+ * Zebra Schema
+ */
+package org.apache.hadoop.zebra.schema;
+

Added: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/tfile/BCFile.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/tfile/BCFile.java?rev=883836&view=auto
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/tfile/BCFile.java (added)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/tfile/BCFile.java Tue Nov 24 19:54:19 2009
@@ -0,0 +1,979 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.zebra.tfile;
+
+import java.io.Closeable;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.compress.Compressor;
+import org.apache.hadoop.io.compress.Decompressor;
+import org.apache.hadoop.zebra.tfile.CompareUtils.Scalar;
+import org.apache.hadoop.zebra.tfile.CompareUtils.ScalarComparator;
+import org.apache.hadoop.zebra.tfile.CompareUtils.ScalarLong;
+import org.apache.hadoop.zebra.tfile.Compression.Algorithm;
+import org.apache.hadoop.zebra.tfile.Utils.Version;
+
+/**
+ * Block Compressed file, the underlying physical storage layer for TFile.
+ * BCFile provides the basic block level compression for the data block and meta
+ * blocks. It is separated from TFile as it may be used for other
+ * block-compressed file implementation.
+ */
+final class BCFile {
+  // the current version of BCFile impl, increment them (major or minor) made
+  // enough changes
+  static final Version API_VERSION = new Version((short) 1, (short) 0);
+  static final Log LOG = LogFactory.getLog(BCFile.class);
+
+  /**
+   * Prevent the instantiation of BCFile objects.
+   */
+  private BCFile() {
+    // nothing
+  }
+
+  /**
+   * BCFile writer, the entry point for creating a new BCFile.
+   */
+  static public class Writer implements Closeable {
+    private final FSDataOutputStream out;
+    private final Configuration conf;
+    // the single meta block containing index of compressed data blocks
+    final DataIndex dataIndex;
+    // index for meta blocks
+    final MetaIndex metaIndex;
+    boolean blkInProgress = false;
+    private boolean metaBlkSeen = false;
+    private boolean closed = false;
+    long errorCount = 0;
+    // reusable buffers.
+    private BytesWritable fsOutputBuffer;
+
+    /**
+     * Call-back interface to register a block after a block is closed.
+     */
+    private static interface BlockRegister {
+      /**
+       * Register a block that is fully closed.
+       * 
+       * @param raw
+       *          The size of block in terms of uncompressed bytes.
+       * @param offsetStart
+       *          The start offset of the block.
+       * @param offsetEnd
+       *          One byte after the end of the block. Compressed block size is
+       *          offsetEnd - offsetStart.
+       */
+      public void register(long raw, long offsetStart, long offsetEnd);
+    }
+
+    /**
+     * Intermediate class that maintain the state of a Writable Compression
+     * Block.
+     */
+    private static final class WBlockState {
+      private final Algorithm compressAlgo;
+      private Compressor compressor; // !null only if using native
+      // Hadoop compression
+      private final FSDataOutputStream fsOut;
+      private final long posStart;
+      private final SimpleBufferedOutputStream fsBufferedOutput;
+      private OutputStream out;
+
+      /**
+       * @param compressionAlgo
+       *          The compression algorithm to be used to for compression.
+       * @throws IOException
+       */
+      public WBlockState(Algorithm compressionAlgo, FSDataOutputStream fsOut,
+          BytesWritable fsOutputBuffer, Configuration conf) throws IOException {
+        this.compressAlgo = compressionAlgo;
+        this.fsOut = fsOut;
+        this.posStart = fsOut.getPos();
+
+        fsOutputBuffer.setCapacity(TFile.getFSOutputBufferSize(conf));
+
+        this.fsBufferedOutput =
+            new SimpleBufferedOutputStream(this.fsOut, fsOutputBuffer.getBytes());
+        this.compressor = compressAlgo.getCompressor();
+
+        try {
+          this.out =
+              compressionAlgo.createCompressionStream(fsBufferedOutput,
+                  compressor, 0);
+        } catch (IOException e) {
+          compressAlgo.returnCompressor(compressor);
+          throw e;
+        }
+      }
+
+      /**
+       * Get the output stream for BlockAppender's consumption.
+       * 
+       * @return the output stream suitable for writing block data.
+       */
+      OutputStream getOutputStream() {
+        return out;
+      }
+
+      /**
+       * Get the current position in file.
+       * 
+       * @return The current byte offset in underlying file.
+       * @throws IOException
+       */
+      long getCurrentPos() throws IOException {
+        return fsOut.getPos() + fsBufferedOutput.size();
+      }
+
+      long getStartPos() {
+        return posStart;
+      }
+
+      /**
+       * Current size of compressed data.
+       * 
+       * @return
+       * @throws IOException
+       */
+      long getCompressedSize() throws IOException {
+        long ret = getCurrentPos() - posStart;
+        return ret;
+      }
+
+      /**
+       * Finishing up the current block.
+       */
+      public void finish() throws IOException {
+        try {
+          if (out != null) {
+            out.flush();
+            out = null;
+          }
+        } finally {
+          compressAlgo.returnCompressor(compressor);
+          compressor = null;
+        }
+      }
+    }
+
+    /**
+     * Access point to stuff data into a block.
+     * 
+     * TODO: Change DataOutputStream to something else that tracks the size as
+     * long instead of int. Currently, we will wrap around if the row block size
+     * is greater than 4GB.
+     */
+    public class BlockAppender extends DataOutputStream {
+      private final BlockRegister blockRegister;
+      private final WBlockState wBlkState;
+      @SuppressWarnings("hiding")
+      private boolean closed = false;
+
+      /**
+       * Constructor
+       * 
+       * @param register
+       *          the block register, which is called when the block is closed.
+       * @param wbs
+       *          The writable compression block state.
+       */
+      BlockAppender(BlockRegister register, WBlockState wbs) {
+        super(wbs.getOutputStream());
+        this.blockRegister = register;
+        this.wBlkState = wbs;
+      }
+
+      /**
+       * Get the raw size of the block.
+       * 
+       * @return the number of uncompressed bytes written through the
+       *         BlockAppender so far.
+       * @throws IOException
+       */
+      public long getRawSize() throws IOException {
+        /**
+         * Expecting the size() of a block not exceeding 4GB. Assuming the
+         * size() will wrap to negative integer if it exceeds 2GB.
+         */
+        return size() & 0x00000000ffffffffL;
+      }
+
+      /**
+       * Get the compressed size of the block in progress.
+       * 
+       * @return the number of compressed bytes written to the underlying FS
+       *         file. The size may be smaller than actual need to compress the
+       *         all data written due to internal buffering inside the
+       *         compressor.
+       * @throws IOException
+       */
+      public long getCompressedSize() throws IOException {
+        return wBlkState.getCompressedSize();
+      }
+
+      @Override
+      public void flush() {
+        // The down stream is a special kind of stream that finishes a
+        // compression block upon flush. So we disable flush() here.
+      }
+
+      /**
+       * Signaling the end of write to the block. The block register will be
+       * called for registering the finished block.
+       */
+      @Override
+      public void close() throws IOException {
+        if (closed == true) {
+          return;
+        }
+        try {
+          ++errorCount;
+          wBlkState.finish();
+          blockRegister.register(getRawSize(), wBlkState.getStartPos(),
+              wBlkState.getCurrentPos());
+          --errorCount;
+        } finally {
+          closed = true;
+          blkInProgress = false;
+        }
+      }
+    }
+
+    /**
+     * Constructor
+     * 
+     * @param fout
+     *          FS output stream.
+     * @param compressionName
+     *          Name of the compression algorithm, which will be used for all
+     *          data blocks.
+     * @throws IOException
+     * @see Compression#getSupportedAlgorithms
+     */
+    public Writer(FSDataOutputStream fout, String compressionName,
+        Configuration conf) throws IOException {
+      if (fout.getPos() != 0) {
+        throw new IOException("Output file not at zero offset.");
+      }
+
+      this.out = fout;
+      this.conf = conf;
+      dataIndex = new DataIndex(compressionName);
+      metaIndex = new MetaIndex();
+      fsOutputBuffer = new BytesWritable();
+      Magic.write(fout);
+    }
+
+    /**
+     * Close the BCFile Writer. Attempting to use the Writer after calling
+     * <code>close</code> is not allowed and may lead to undetermined results.
+     */
+    public void close() throws IOException {
+      if (closed == true) {
+        return;
+      }
+
+      try {
+        if (errorCount == 0) {
+          if (blkInProgress == true) {
+            throw new IllegalStateException(
+                "Close() called with active block appender.");
+          }
+
+          // add metaBCFileIndex to metaIndex as the last meta block
+          BlockAppender appender =
+              prepareMetaBlock(DataIndex.BLOCK_NAME,
+                  getDefaultCompressionAlgorithm());
+          try {
+            dataIndex.write(appender);
+          } finally {
+            appender.close();
+          }
+
+          long offsetIndexMeta = out.getPos();
+          metaIndex.write(out);
+
+          // Meta Index and the trailing section are written out directly.
+          out.writeLong(offsetIndexMeta);
+
+          API_VERSION.write(out);
+          Magic.write(out);
+          out.flush();
+        }
+      } finally {
+        closed = true;
+      }
+    }
+
+    private Algorithm getDefaultCompressionAlgorithm() {
+      return dataIndex.getDefaultCompressionAlgorithm();
+    }
+
+    private BlockAppender prepareMetaBlock(String name, Algorithm compressAlgo)
+        throws IOException, MetaBlockAlreadyExists {
+      if (blkInProgress == true) {
+        throw new IllegalStateException(
+            "Cannot create Meta Block until previous block is closed.");
+      }
+
+      if (metaIndex.getMetaByName(name) != null) {
+        throw new MetaBlockAlreadyExists("name=" + name);
+      }
+
+      MetaBlockRegister mbr = new MetaBlockRegister(name, compressAlgo);
+      WBlockState wbs =
+          new WBlockState(compressAlgo, out, fsOutputBuffer, conf);
+      BlockAppender ba = new BlockAppender(mbr, wbs);
+      blkInProgress = true;
+      metaBlkSeen = true;
+      return ba;
+    }
+
+    /**
+     * Create a Meta Block and obtain an output stream for adding data into the
+     * block. There can only be one BlockAppender stream active at any time.
+     * Regular Blocks may not be created after the first Meta Blocks. The caller
+     * must call BlockAppender.close() to conclude the block creation.
+     * 
+     * @param name
+     *          The name of the Meta Block. The name must not conflict with
+     *          existing Meta Blocks.
+     * @param compressionName
+     *          The name of the compression algorithm to be used.
+     * @return The BlockAppender stream
+     * @throws IOException
+     * @throws MetaBlockAlreadyExists
+     *           If the meta block with the name already exists.
+     */
+    public BlockAppender prepareMetaBlock(String name, String compressionName)
+        throws IOException, MetaBlockAlreadyExists {
+      return prepareMetaBlock(name, Compression
+          .getCompressionAlgorithmByName(compressionName));
+    }
+
+    /**
+     * Create a Meta Block and obtain an output stream for adding data into the
+     * block. The Meta Block will be compressed with the same compression
+     * algorithm as data blocks. There can only be one BlockAppender stream
+     * active at any time. Regular Blocks may not be created after the first
+     * Meta Blocks. The caller must call BlockAppender.close() to conclude the
+     * block creation.
+     * 
+     * @param name
+     *          The name of the Meta Block. The name must not conflict with
+     *          existing Meta Blocks.
+     * @return The BlockAppender stream
+     * @throws MetaBlockAlreadyExists
+     *           If the meta block with the name already exists.
+     * @throws IOException
+     */
+    public BlockAppender prepareMetaBlock(String name) throws IOException,
+        MetaBlockAlreadyExists {
+      return prepareMetaBlock(name, getDefaultCompressionAlgorithm());
+    }
+
+    /**
+     * Create a Data Block and obtain an output stream for adding data into the
+     * block. There can only be one BlockAppender stream active at any time.
+     * Data Blocks may not be created after the first Meta Blocks. The caller
+     * must call BlockAppender.close() to conclude the block creation.
+     * 
+     * @return The BlockAppender stream
+     * @throws IOException
+     */
+    public BlockAppender prepareDataBlock() throws IOException {
+      if (blkInProgress == true) {
+        throw new IllegalStateException(
+            "Cannot create Data Block until previous block is closed.");
+      }
+
+      if (metaBlkSeen == true) {
+        throw new IllegalStateException(
+            "Cannot create Data Block after Meta Blocks.");
+      }
+
+      DataBlockRegister dbr = new DataBlockRegister();
+
+      WBlockState wbs =
+          new WBlockState(getDefaultCompressionAlgorithm(), out,
+              fsOutputBuffer, conf);
+      BlockAppender ba = new BlockAppender(dbr, wbs);
+      blkInProgress = true;
+      return ba;
+    }
+
+    /**
+     * Callback to make sure a meta block is added to the internal list when its
+     * stream is closed.
+     */
+    private class MetaBlockRegister implements BlockRegister {
+      private final String name;
+      private final Algorithm compressAlgo;
+
+      MetaBlockRegister(String name, Algorithm compressAlgo) {
+        this.name = name;
+        this.compressAlgo = compressAlgo;
+      }
+
+      public void register(long raw, long begin, long end) {
+        metaIndex.addEntry(new MetaIndexEntry(name, compressAlgo,
+            new BlockRegion(begin, end - begin, raw)));
+      }
+    }
+
+    /**
+     * Callback to make sure a data block is added to the internal list when
+     * it's being closed.
+     * 
+     */
+    private class DataBlockRegister implements BlockRegister {
+      DataBlockRegister() {
+        // do nothing
+      }
+
+      public void register(long raw, long begin, long end) {
+        dataIndex.addBlockRegion(new BlockRegion(begin, end - begin, raw));
+      }
+    }
+  }
+
+  /**
+   * BCFile Reader, interface to read the file's data and meta blocks.
+   */
+  static public class Reader implements Closeable {
+    private final FSDataInputStream in;
+    private final Configuration conf;
+    final DataIndex dataIndex;
+    // Index for meta blocks
+    final MetaIndex metaIndex;
+    final Version version;
+
+    /**
+     * Intermediate class that maintain the state of a Readable Compression
+     * Block.
+     */
+    static private final class RBlockState {
+      private final Algorithm compressAlgo;
+      private Decompressor decompressor;
+      private final BlockRegion region;
+      private final InputStream in;
+
+      public RBlockState(Algorithm compressionAlgo, FSDataInputStream fsin,
+          BlockRegion region, Configuration conf) throws IOException {
+        this.compressAlgo = compressionAlgo;
+        this.region = region;
+        this.decompressor = compressionAlgo.getDecompressor();
+
+        try {
+          this.in =
+              compressAlgo
+                  .createDecompressionStream(new BoundedRangeFileInputStream(
+                      fsin, this.region.getOffset(), this.region
+                          .getCompressedSize()), decompressor, TFile
+                      .getFSInputBufferSize(conf));
+        } catch (IOException e) {
+          compressAlgo.returnDecompressor(decompressor);
+          throw e;
+        }
+      }
+
+      /**
+       * Get the output stream for BlockAppender's consumption.
+       * 
+       * @return the output stream suitable for writing block data.
+       */
+      public InputStream getInputStream() {
+        return in;
+      }
+
+      public String getCompressionName() {
+        return compressAlgo.getName();
+      }
+
+      public BlockRegion getBlockRegion() {
+        return region;
+      }
+
+      public void finish() throws IOException {
+        try {
+          in.close();
+        } finally {
+          compressAlgo.returnDecompressor(decompressor);
+          decompressor = null;
+        }
+      }
+    }
+
+    /**
+     * Access point to read a block.
+     */
+    public static class BlockReader extends DataInputStream {
+      private final RBlockState rBlkState;
+      private boolean closed = false;
+
+      BlockReader(RBlockState rbs) {
+        super(rbs.getInputStream());
+        rBlkState = rbs;
+      }
+
+      /**
+       * Finishing reading the block. Release all resources.
+       */
+      @Override
+      public void close() throws IOException {
+        if (closed == true) {
+          return;
+        }
+        try {
+          // Do not set rBlkState to null. People may access stats after calling
+          // close().
+          rBlkState.finish();
+        } finally {
+          closed = true;
+        }
+      }
+
+      /**
+       * Get the name of the compression algorithm used to compress the block.
+       * 
+       * @return name of the compression algorithm.
+       */
+      public String getCompressionName() {
+        return rBlkState.getCompressionName();
+      }
+
+      /**
+       * Get the uncompressed size of the block.
+       * 
+       * @return uncompressed size of the block.
+       */
+      public long getRawSize() {
+        return rBlkState.getBlockRegion().getRawSize();
+      }
+
+      /**
+       * Get the compressed size of the block.
+       * 
+       * @return compressed size of the block.
+       */
+      public long getCompressedSize() {
+        return rBlkState.getBlockRegion().getCompressedSize();
+      }
+
+      /**
+       * Get the starting position of the block in the file.
+       * 
+       * @return the starting position of the block in the file.
+       */
+      public long getStartPos() {
+        return rBlkState.getBlockRegion().getOffset();
+      }
+    }
+
+    /**
+     * Constructor
+     * 
+     * @param fin
+     *          FS input stream.
+     * @param fileLength
+     *          Length of the corresponding file
+     * @throws IOException
+     */
+    public Reader(FSDataInputStream fin, long fileLength, Configuration conf)
+        throws IOException {
+      this.in = fin;
+      this.conf = conf;
+
+      // move the cursor to the beginning of the tail, containing: offset to the
+      // meta block index, version and magic
+      fin.seek(fileLength - Magic.size() - Version.size() - Long.SIZE
+          / Byte.SIZE);
+      long offsetIndexMeta = fin.readLong();
+      version = new Version(fin);
+      Magic.readAndVerify(fin);
+
+      if (!version.compatibleWith(BCFile.API_VERSION)) {
+        throw new RuntimeException("Incompatible BCFile fileBCFileVersion.");
+      }
+
+      // read meta index
+      fin.seek(offsetIndexMeta);
+      metaIndex = new MetaIndex(fin);
+
+      // read data:BCFile.index, the data block index
+      BlockReader blockR = getMetaBlock(DataIndex.BLOCK_NAME);
+      try {
+        dataIndex = new DataIndex(blockR);
+      } finally {
+        blockR.close();
+      }
+    }
+
+    /**
+     * Get the name of the default compression algorithm.
+     * 
+     * @return the name of the default compression algorithm.
+     */
+    public String getDefaultCompressionName() {
+      return dataIndex.getDefaultCompressionAlgorithm().getName();
+    }
+
+    /**
+     * Get version of BCFile file being read.
+     * 
+     * @return version of BCFile file being read.
+     */
+    public Version getBCFileVersion() {
+      return version;
+    }
+
+    /**
+     * Get version of BCFile API.
+     * 
+     * @return version of BCFile API.
+     */
+    public Version getAPIVersion() {
+      return API_VERSION;
+    }
+
+    /**
+     * Finishing reading the BCFile. Release all resources.
+     */
+    public void close() {
+      // nothing to be done now
+    }
+
+    /**
+     * Get the number of data blocks.
+     * 
+     * @return the number of data blocks.
+     */
+    public int getBlockCount() {
+      return dataIndex.getBlockRegionList().size();
+    }
+
+    /**
+     * Stream access to a Meta Block.
+     * 
+     * @param name
+     *          meta block name
+     * @return BlockReader input stream for reading the meta block.
+     * @throws IOException
+     * @throws MetaBlockDoesNotExist
+     *           The Meta Block with the given name does not exist.
+     */
+    public BlockReader getMetaBlock(String name) throws IOException,
+        MetaBlockDoesNotExist {
+      MetaIndexEntry imeBCIndex = metaIndex.getMetaByName(name);
+      if (imeBCIndex == null) {
+        throw new MetaBlockDoesNotExist("name=" + name);
+      }
+
+      BlockRegion region = imeBCIndex.getRegion();
+      return createReader(imeBCIndex.getCompressionAlgorithm(), region);
+    }
+
+    /**
+     * Stream access to a Data Block.
+     * 
+     * @param blockIndex
+     *          0-based data block index.
+     * @return BlockReader input stream for reading the data block.
+     * @throws IOException
+     */
+    public BlockReader getDataBlock(int blockIndex) throws IOException {
+      if (blockIndex < 0 || blockIndex >= getBlockCount()) {
+        throw new IndexOutOfBoundsException(String.format(
+            "blockIndex=%d, numBlocks=%d", blockIndex, getBlockCount()));
+      }
+
+      BlockRegion region = dataIndex.getBlockRegionList().get(blockIndex);
+      return createReader(dataIndex.getDefaultCompressionAlgorithm(), region);
+    }
+
+    private BlockReader createReader(Algorithm compressAlgo, BlockRegion region)
+        throws IOException {
+      RBlockState rbs = new RBlockState(compressAlgo, in, region, conf);
+      return new BlockReader(rbs);
+    }
+
+    /**
+     * Find the smallest Block index whose starting offset is greater than or
+     * equal to the specified offset.
+     * 
+     * @param offset
+     *          User-specific offset.
+     * @return the index to the data Block if such block exists; or -1
+     *         otherwise.
+     */
+    public int getBlockIndexNear(long offset) {
+      ArrayList<BlockRegion> list = dataIndex.getBlockRegionList();
+      int idx =
+          Utils
+              .lowerBound(list, new ScalarLong(offset), new ScalarComparator());
+
+      if (idx == list.size()) {
+        return -1;
+      }
+
+      return idx;
+    }
+  }
+
+  /**
+   * Index for all Meta blocks.
+   */
+  static class MetaIndex {
+    // use a tree map, for getting a meta block entry by name
+    final Map<String, MetaIndexEntry> index;
+
+    // for write
+    public MetaIndex() {
+      index = new TreeMap<String, MetaIndexEntry>();
+    }
+
+    // for read, construct the map from the file
+    public MetaIndex(DataInput in) throws IOException {
+      int count = Utils.readVInt(in);
+      index = new TreeMap<String, MetaIndexEntry>();
+
+      for (int nx = 0; nx < count; nx++) {
+        MetaIndexEntry indexEntry = new MetaIndexEntry(in);
+        index.put(indexEntry.getMetaName(), indexEntry);
+      }
+    }
+
+    public void addEntry(MetaIndexEntry indexEntry) {
+      index.put(indexEntry.getMetaName(), indexEntry);
+    }
+
+    public MetaIndexEntry getMetaByName(String name) {
+      return index.get(name);
+    }
+
+    public void write(DataOutput out) throws IOException {
+      Utils.writeVInt(out, index.size());
+
+      for (MetaIndexEntry indexEntry : index.values()) {
+        indexEntry.write(out);
+      }
+    }
+  }
+
+  /**
+   * An entry describes a meta block in the MetaIndex.
+   */
+  static final class MetaIndexEntry {
+    private final String metaName;
+    private final Algorithm compressionAlgorithm;
+    private final static String defaultPrefix = "data:";
+
+    private final BlockRegion region;
+
+    public MetaIndexEntry(DataInput in) throws IOException {
+      String fullMetaName = Utils.readString(in);
+      if (fullMetaName.startsWith(defaultPrefix)) {
+        metaName =
+            fullMetaName.substring(defaultPrefix.length(), fullMetaName
+                .length());
+      } else {
+        throw new IOException("Corrupted Meta region Index");
+      }
+
+      compressionAlgorithm =
+          Compression.getCompressionAlgorithmByName(Utils.readString(in));
+      region = new BlockRegion(in);
+    }
+
+    public MetaIndexEntry(String metaName, Algorithm compressionAlgorithm,
+        BlockRegion region) {
+      this.metaName = metaName;
+      this.compressionAlgorithm = compressionAlgorithm;
+      this.region = region;
+    }
+
+    public String getMetaName() {
+      return metaName;
+    }
+
+    public Algorithm getCompressionAlgorithm() {
+      return compressionAlgorithm;
+    }
+
+    public BlockRegion getRegion() {
+      return region;
+    }
+
+    public void write(DataOutput out) throws IOException {
+      Utils.writeString(out, defaultPrefix + metaName);
+      Utils.writeString(out, compressionAlgorithm.getName());
+
+      region.write(out);
+    }
+  }
+
+  /**
+   * Index of all compressed data blocks.
+   */
+  static class DataIndex {
+    final static String BLOCK_NAME = "BCFile.index";
+
+    private final Algorithm defaultCompressionAlgorithm;
+
+    // for data blocks, each entry specifies a block's offset, compressed size
+    // and raw size
+    private final ArrayList<BlockRegion> listRegions;
+
+    // for read, deserialized from a file
+    public DataIndex(DataInput in) throws IOException {
+      defaultCompressionAlgorithm =
+          Compression.getCompressionAlgorithmByName(Utils.readString(in));
+
+      int n = Utils.readVInt(in);
+      listRegions = new ArrayList<BlockRegion>(n);
+
+      for (int i = 0; i < n; i++) {
+        BlockRegion region = new BlockRegion(in);
+        listRegions.add(region);
+      }
+    }
+
+    // for write
+    public DataIndex(String defaultCompressionAlgorithmName) {
+      this.defaultCompressionAlgorithm =
+          Compression
+              .getCompressionAlgorithmByName(defaultCompressionAlgorithmName);
+      listRegions = new ArrayList<BlockRegion>();
+    }
+
+    public Algorithm getDefaultCompressionAlgorithm() {
+      return defaultCompressionAlgorithm;
+    }
+
+    public ArrayList<BlockRegion> getBlockRegionList() {
+      return listRegions;
+    }
+
+    public void addBlockRegion(BlockRegion region) {
+      listRegions.add(region);
+    }
+
+    public void write(DataOutput out) throws IOException {
+      Utils.writeString(out, defaultCompressionAlgorithm.getName());
+
+      Utils.writeVInt(out, listRegions.size());
+
+      for (BlockRegion region : listRegions) {
+        region.write(out);
+      }
+    }
+  }
+
+  /**
+   * Magic number uniquely identifying a BCFile in the header/footer.
+   */
+  static final class Magic {
+    private final static byte[] AB_MAGIC_BCFILE =
+        {
+            // ... total of 16 bytes
+            (byte) 0xd1, (byte) 0x11, (byte) 0xd3, (byte) 0x68, (byte) 0x91,
+            (byte) 0xb5, (byte) 0xd7, (byte) 0xb6, (byte) 0x39, (byte) 0xdf,
+            (byte) 0x41, (byte) 0x40, (byte) 0x92, (byte) 0xba, (byte) 0xe1,
+            (byte) 0x50 };
+
+    public static void readAndVerify(DataInput in) throws IOException {
+      byte[] abMagic = new byte[size()];
+      in.readFully(abMagic);
+
+      // check against AB_MAGIC_BCFILE, if not matching, throw an
+      // Exception
+      if (!Arrays.equals(abMagic, AB_MAGIC_BCFILE)) {
+        throw new IOException("Not a valid BCFile.");
+      }
+    }
+
+    public static void write(DataOutput out) throws IOException {
+      out.write(AB_MAGIC_BCFILE);
+    }
+
+    public static int size() {
+      return AB_MAGIC_BCFILE.length;
+    }
+  }
+
+  /**
+   * Block region.
+   */
+  static final class BlockRegion implements Scalar {
+    private final long offset;
+    private final long compressedSize;
+    private final long rawSize;
+
+    public BlockRegion(DataInput in) throws IOException {
+      offset = Utils.readVLong(in);
+      compressedSize = Utils.readVLong(in);
+      rawSize = Utils.readVLong(in);
+    }
+
+    public BlockRegion(long offset, long compressedSize, long rawSize) {
+      this.offset = offset;
+      this.compressedSize = compressedSize;
+      this.rawSize = rawSize;
+    }
+
+    public void write(DataOutput out) throws IOException {
+      Utils.writeVLong(out, offset);
+      Utils.writeVLong(out, compressedSize);
+      Utils.writeVLong(out, rawSize);
+    }
+
+    public long getOffset() {
+      return offset;
+    }
+
+    public long getCompressedSize() {
+      return compressedSize;
+    }
+
+    public long getRawSize() {
+      return rawSize;
+    }
+
+    @Override
+    public long magnitude() {
+      return offset;
+    }
+  }
+}

Added: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/tfile/BoundedByteArrayOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/tfile/BoundedByteArrayOutputStream.java?rev=883836&view=auto
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/tfile/BoundedByteArrayOutputStream.java (added)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/tfile/BoundedByteArrayOutputStream.java Tue Nov 24 19:54:19 2009
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.zebra.tfile;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.OutputStream;
+
+/**
+ * A byte array backed output stream with a limit. The limit should be smaller
+ * than the buffer capacity. The object can be reused through <code>reset</code>
+ * API and choose different limits in each round.
+ */
+public class BoundedByteArrayOutputStream extends OutputStream {
+  private final byte[] buffer;
+  private int limit;
+  private int count;
+
+  /**
+   * Create a BoundedByteArrayOutputStream with the specified
+   * capacity
+   * @param capacity The capacity of the underlying byte array
+   */
+  public BoundedByteArrayOutputStream(int capacity) {
+    this(capacity, capacity);
+  }
+
+  /**
+   * Create a BoundedByteArrayOutputStream with the specified
+   * capacity and limit.
+   * @param capacity The capacity of the underlying byte array
+   * @param limit The maximum limit upto which data can be written
+   */
+  public BoundedByteArrayOutputStream(int capacity, int limit) {
+    if ((capacity < limit) || (capacity | limit) < 0) {
+      throw new IllegalArgumentException("Invalid capacity/limit");
+    }
+    this.buffer = new byte[capacity];
+    this.limit = limit;
+    this.count = 0;
+  }
+
+  @Override
+  public void write(int b) throws IOException {
+    if (count >= limit) {
+      throw new EOFException("Reaching the limit of the buffer.");
+    }
+    buffer[count++] = (byte) b;
+  }
+
+  @Override
+  public void write(byte b[], int off, int len) throws IOException {
+    if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length)
+        || ((off + len) < 0)) {
+      throw new IndexOutOfBoundsException();
+    } else if (len == 0) {
+      return;
+    }
+
+    if (count + len > limit) {
+      throw new EOFException("Reach the limit of the buffer");
+    }
+
+    System.arraycopy(b, off, buffer, count, len);
+    count += len;
+  }
+
+  /**
+   * Reset the limit 
+   * @param newlim New Limit
+   */
+  public void reset(int newlim) {
+    if (newlim > buffer.length) {
+      throw new IndexOutOfBoundsException("Limit exceeds buffer size");
+    }
+    this.limit = newlim;
+    this.count = 0;
+  }
+
+  /** Reset the buffer */
+  public void reset() {
+    this.limit = buffer.length;
+    this.count = 0;
+  }
+
+  /** Return the current limit */
+  public int getLimit() {
+    return limit;
+  }
+
+  /** Returns the underlying buffer.
+   *  Data is only valid to {@link #size()}.
+   */
+  public byte[] getBuffer() {
+    return buffer;
+  }
+
+  /** Returns the length of the valid data 
+   * currently in the buffer.
+   */
+  public int size() {
+    return count;
+  }
+}

Added: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/tfile/BoundedRangeFileInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/tfile/BoundedRangeFileInputStream.java?rev=883836&view=auto
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/tfile/BoundedRangeFileInputStream.java (added)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/tfile/BoundedRangeFileInputStream.java Tue Nov 24 19:54:19 2009
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.zebra.tfile;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+
+/**
+ * BoundedRangeFIleInputStream abstracts a contiguous region of a Hadoop
+ * FSDataInputStream as a regular input stream. One can create multiple
+ * BoundedRangeFileInputStream on top of the same FSDataInputStream and they
+ * would not interfere with each other.
+ */
+class BoundedRangeFileInputStream extends InputStream {
+
+  private FSDataInputStream in;
+  private long pos;
+  private long end;
+  private long mark;
+  private final byte[] oneByte = new byte[1];
+
+  /**
+   * Constructor
+   * 
+   * @param in
+   *          The FSDataInputStream we connect to.
+   * @param offset
+   *          Begining offset of the region.
+   * @param length
+   *          Length of the region.
+   * 
+   *          The actual length of the region may be smaller if (off_begin +
+   *          length) goes beyond the end of FS input stream.
+   */
+  public BoundedRangeFileInputStream(FSDataInputStream in, long offset,
+      long length) {
+    if (offset < 0 || length < 0) {
+      throw new IndexOutOfBoundsException("Invalid offset/length: " + offset
+          + "/" + length);
+    }
+
+    this.in = in;
+    this.pos = offset;
+    this.end = offset + length;
+    this.mark = -1;
+  }
+
+  @Override
+  public int available() throws IOException {
+    int avail = in.available();
+    if (pos + avail > end) {
+      avail = (int) (end - pos);
+    }
+
+    return avail;
+  }
+
+  @Override
+  public int read() throws IOException {
+    int ret = read(oneByte);
+    if (ret == 1) return oneByte[0] & 0xff;
+    return -1;
+  }
+
+  @Override
+  public int read(byte[] b) throws IOException {
+    return read(b, 0, b.length);
+  }
+
+  @Override
+  public int read(byte[] b, int off, int len) throws IOException {
+    if ((off | len | (off + len) | (b.length - (off + len))) < 0) {
+      throw new IndexOutOfBoundsException();
+    }
+
+    int n = (int) Math.min(Integer.MAX_VALUE, Math.min(len, (end - pos)));
+    if (n == 0) return -1;
+    int ret = 0;
+    synchronized (in) {
+      in.seek(pos);
+      ret = in.read(b, off, n);
+    }
+    if (ret < 0) {
+      end = pos;
+      return -1;
+    }
+    pos += ret;
+    return ret;
+  }
+
+  @Override
+  /*
+   * We may skip beyond the end of the file.
+   */
+  public long skip(long n) throws IOException {
+    long len = Math.min(n, end - pos);
+    pos += len;
+    return len;
+  }
+
+  @Override
+  public synchronized void mark(int readlimit) {
+    mark = pos;
+  }
+
+  @Override
+  public synchronized void reset() throws IOException {
+    if (mark < 0) throw new IOException("Resetting to invalid mark");
+    pos = mark;
+  }
+
+  @Override
+  public boolean markSupported() {
+    return true;
+  }
+
+  @Override
+  public void close() {
+    // Invalidate the state of the stream.
+    in = null;
+    pos = end;
+    mark = -1;
+  }
+}

Added: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/tfile/ByteArray.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/tfile/ByteArray.java?rev=883836&view=auto
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/tfile/ByteArray.java (added)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/tfile/ByteArray.java Tue Nov 24 19:54:19 2009
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.zebra.tfile;
+
+import org.apache.hadoop.io.BytesWritable;
+
+/**
+ * Adaptor class to wrap byte-array backed objects (including java byte array)
+ * as RawComparable objects.
+ */
+public final class ByteArray implements RawComparable {
+  private final byte[] buffer;
+  private final int offset;
+  private final int len;
+
+  /**
+   * Constructing a ByteArray from a {@link BytesWritable}.
+   * 
+   * @param other
+   */
+  public ByteArray(BytesWritable other) {
+    this(other.getBytes(), 0, other.getLength());
+  }
+
+  /**
+   * Wrap a whole byte array as a RawComparable.
+   * 
+   * @param buffer
+   *          the byte array buffer.
+   */
+  public ByteArray(byte[] buffer) {
+    this(buffer, 0, buffer.length);
+  }
+
+  /**
+   * Wrap a partial byte array as a RawComparable.
+   * 
+   * @param buffer
+   *          the byte array buffer.
+   * @param offset
+   *          the starting offset
+   * @param len
+   *          the length of the consecutive bytes to be wrapped.
+   */
+  public ByteArray(byte[] buffer, int offset, int len) {
+    if ((offset | len | (buffer.length - offset - len)) < 0) {
+      throw new IndexOutOfBoundsException();
+    }
+    this.buffer = buffer;
+    this.offset = offset;
+    this.len = len;
+  }
+
+  /**
+   * @return the underlying buffer.
+   */
+  @Override
+  public byte[] buffer() {
+    return buffer;
+  }
+
+  /**
+   * @return the offset in the buffer.
+   */
+  @Override
+  public int offset() {
+    return offset;
+  }
+
+  /**
+   * @return the size of the byte array.
+   */
+  @Override
+  public int size() {
+    return len;
+  }
+}