You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by pr...@apache.org on 2009/11/24 20:54:34 UTC

svn commit: r883836 [7/23] - in /hadoop/pig/branches/load-store-redesign: ./ contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/ contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/ contrib/zebra/ contrib/zebra...

Added: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/tfile/TFileDumper.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/tfile/TFileDumper.java?rev=883836&view=auto
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/tfile/TFileDumper.java (added)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/tfile/TFileDumper.java Tue Nov 24 19:54:19 2009
@@ -0,0 +1,295 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.zebra.tfile;
+
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.zebra.tfile.BCFile.BlockRegion;
+import org.apache.hadoop.zebra.tfile.BCFile.MetaIndexEntry;
+import org.apache.hadoop.zebra.tfile.TFile.TFileIndexEntry;
+import org.apache.hadoop.zebra.tfile.Utils.Version;
+
+/**
+ * Dumping the information of a TFile.
+ */
+class TFileDumper {
+  static final Log LOG = LogFactory.getLog(TFileDumper.class);
+
+  private TFileDumper() {
+    // namespace object not constructable.
+  }
+
+  private enum Align {
+    LEFT, CENTER, RIGHT, ZERO_PADDED;
+    static String format(String s, int width, Align align) {
+      if (s.length() >= width) return s;
+      int room = width - s.length();
+      Align alignAdjusted = align;
+      if (room == 1) {
+        alignAdjusted = LEFT;
+      }
+      if (alignAdjusted == LEFT) {
+        return s + String.format("%" + room + "s", "");
+      }
+      if (alignAdjusted == RIGHT) {
+        return String.format("%" + room + "s", "") + s;
+      }
+      if (alignAdjusted == CENTER) {
+        int half = room / 2;
+        return String.format("%" + half + "s", "") + s
+            + String.format("%" + (room - half) + "s", "");
+      }
+      throw new IllegalArgumentException("Unsupported alignment");
+    }
+
+    static String format(long l, int width, Align align) {
+      if (align == ZERO_PADDED) {
+        return String.format("%0" + width + "d", l);
+      }
+      return format(Long.toString(l), width, align);
+    }
+
+    static int calculateWidth(String caption, long max) {
+      return Math.max(caption.length(), Long.toString(max).length());
+    }
+  }
+
+  /**
+   * Dump information about TFile.
+   * 
+   * @param file
+   *          Path string of the TFile
+   * @param out
+   *          PrintStream to output the information.
+   * @param conf
+   *          The configuration object.
+   * @throws IOException
+   */
+  static public void dumpInfo(String file, PrintStream out, Configuration conf)
+      throws IOException {
+    final int maxKeySampleLen = 16;
+    Path path = new Path(file);
+    FileSystem fs = path.getFileSystem(conf);
+    long length = fs.getFileStatus(path).getLen();
+    FSDataInputStream fsdis = fs.open(path);
+    TFile.Reader reader = new TFile.Reader(fsdis, length, conf);
+    try {
+      LinkedHashMap<String, String> properties =
+          new LinkedHashMap<String, String>();
+      int blockCnt = reader.readerBCF.getBlockCount();
+      int metaBlkCnt = reader.readerBCF.metaIndex.index.size();
+      properties.put("BCFile Version", reader.readerBCF.version.toString());
+      properties.put("TFile Version", reader.tfileMeta.version.toString());
+      properties.put("File Length", Long.toString(length));
+      properties.put("Data Compression", reader.readerBCF
+          .getDefaultCompressionName());
+      properties.put("Record Count", Long.toString(reader.getEntryCount()));
+      properties.put("Sorted", Boolean.toString(reader.isSorted()));
+      if (reader.isSorted()) {
+        properties.put("Comparator", reader.getComparatorName());
+      }
+      properties.put("Data Block Count", Integer.toString(blockCnt));
+      long dataSize = 0, dataSizeUncompressed = 0;
+      if (blockCnt > 0) {
+        for (int i = 0; i < blockCnt; ++i) {
+          BlockRegion region =
+              reader.readerBCF.dataIndex.getBlockRegionList().get(i);
+          dataSize += region.getCompressedSize();
+          dataSizeUncompressed += region.getRawSize();
+        }
+        properties.put("Data Block Bytes", Long.toString(dataSize));
+        if (reader.readerBCF.getDefaultCompressionName() != "none") {
+          properties.put("Data Block Uncompressed Bytes", Long
+              .toString(dataSizeUncompressed));
+          properties.put("Data Block Compression Ratio", String.format(
+              "1:%.1f", (double) dataSizeUncompressed / dataSize));
+        }
+      }
+
+      properties.put("Meta Block Count", Integer.toString(metaBlkCnt));
+      long metaSize = 0, metaSizeUncompressed = 0;
+      if (metaBlkCnt > 0) {
+        Collection<MetaIndexEntry> metaBlks =
+            reader.readerBCF.metaIndex.index.values();
+        boolean calculateCompression = false;
+        for (Iterator<MetaIndexEntry> it = metaBlks.iterator(); it.hasNext();) {
+          MetaIndexEntry e = it.next();
+          metaSize += e.getRegion().getCompressedSize();
+          metaSizeUncompressed += e.getRegion().getRawSize();
+          if (e.getCompressionAlgorithm() != Compression.Algorithm.NONE) {
+            calculateCompression = true;
+          }
+        }
+        properties.put("Meta Block Bytes", Long.toString(metaSize));
+        if (calculateCompression) {
+          properties.put("Meta Block Uncompressed Bytes", Long
+              .toString(metaSizeUncompressed));
+          properties.put("Meta Block Compression Ratio", String.format(
+              "1:%.1f", (double) metaSizeUncompressed / metaSize));
+        }
+      }
+      properties.put("Meta-Data Size Ratio", String.format("1:%.1f",
+          (double) dataSize / metaSize));
+      long leftOverBytes = length - dataSize - metaSize;
+      long miscSize =
+          BCFile.Magic.size() * 2 + Long.SIZE / Byte.SIZE + Version.size();
+      long metaIndexSize = leftOverBytes - miscSize;
+      properties.put("Meta Block Index Bytes", Long.toString(metaIndexSize));
+      properties.put("Headers Etc Bytes", Long.toString(miscSize));
+      // Now output the properties table.
+      int maxKeyLength = 0;
+      Set<Map.Entry<String, String>> entrySet = properties.entrySet();
+      for (Iterator<Map.Entry<String, String>> it = entrySet.iterator(); it
+          .hasNext();) {
+        Map.Entry<String, String> e = it.next();
+        if (e.getKey().length() > maxKeyLength) {
+          maxKeyLength = e.getKey().length();
+        }
+      }
+      for (Iterator<Map.Entry<String, String>> it = entrySet.iterator(); it
+          .hasNext();) {
+        Map.Entry<String, String> e = it.next();
+        out.printf("%s : %s\n", Align.format(e.getKey(), maxKeyLength,
+            Align.LEFT), e.getValue());
+      }
+      out.println();
+      reader.checkTFileDataIndex();
+      if (blockCnt > 0) {
+        String blkID = "Data-Block";
+        int blkIDWidth = Align.calculateWidth(blkID, blockCnt);
+        int blkIDWidth2 = Align.calculateWidth("", blockCnt);
+        String offset = "Offset";
+        int offsetWidth = Align.calculateWidth(offset, length);
+        String blkLen = "Length";
+        int blkLenWidth =
+            Align.calculateWidth(blkLen, dataSize / blockCnt * 10);
+        String rawSize = "Raw-Size";
+        int rawSizeWidth =
+            Align.calculateWidth(rawSize, dataSizeUncompressed / blockCnt * 10);
+        String records = "Records";
+        int recordsWidth =
+            Align.calculateWidth(records, reader.getEntryCount() / blockCnt
+                * 10);
+        String endKey = "End-Key";
+        int endKeyWidth = Math.max(endKey.length(), maxKeySampleLen * 2 + 5);
+
+        out.printf("%s %s %s %s %s %s\n", Align.format(blkID, blkIDWidth,
+            Align.CENTER), Align.format(offset, offsetWidth, Align.CENTER),
+            Align.format(blkLen, blkLenWidth, Align.CENTER), Align.format(
+                rawSize, rawSizeWidth, Align.CENTER), Align.format(records,
+                recordsWidth, Align.CENTER), Align.format(endKey, endKeyWidth,
+                Align.LEFT));
+
+        for (int i = 0; i < blockCnt; ++i) {
+          BlockRegion region =
+              reader.readerBCF.dataIndex.getBlockRegionList().get(i);
+          TFileIndexEntry indexEntry = reader.tfileIndex.getEntry(i);
+          out.printf("%s %s %s %s %s ", Align.format(Align.format(i,
+              blkIDWidth2, Align.ZERO_PADDED), blkIDWidth, Align.LEFT), Align
+              .format(region.getOffset(), offsetWidth, Align.LEFT), Align
+              .format(region.getCompressedSize(), blkLenWidth, Align.LEFT),
+              Align.format(region.getRawSize(), rawSizeWidth, Align.LEFT),
+              Align.format(indexEntry.kvEntries, recordsWidth, Align.LEFT));
+          byte[] key = indexEntry.key;
+          boolean asAscii = true;
+          int sampleLen = Math.min(maxKeySampleLen, key.length);
+          for (int j = 0; j < sampleLen; ++j) {
+            byte b = key[j];
+            if ((b < 32 && b != 9) || (b == 127)) {
+              asAscii = false;
+            }
+          }
+          if (!asAscii) {
+            out.print("0X");
+            for (int j = 0; j < sampleLen; ++j) {
+              byte b = key[i];
+              out.printf("%X", b);
+            }
+          } else {
+            out.print(new String(key, 0, sampleLen));
+          }
+          if (sampleLen < key.length) {
+            out.print("...");
+          }
+          out.println();
+        }
+      }
+
+      out.println();
+      if (metaBlkCnt > 0) {
+        String name = "Meta-Block";
+        int maxNameLen = 0;
+        Set<Map.Entry<String, MetaIndexEntry>> metaBlkEntrySet =
+            reader.readerBCF.metaIndex.index.entrySet();
+        for (Iterator<Map.Entry<String, MetaIndexEntry>> it =
+            metaBlkEntrySet.iterator(); it.hasNext();) {
+          Map.Entry<String, MetaIndexEntry> e = it.next();
+          if (e.getKey().length() > maxNameLen) {
+            maxNameLen = e.getKey().length();
+          }
+        }
+        int nameWidth = Math.max(name.length(), maxNameLen);
+        String offset = "Offset";
+        int offsetWidth = Align.calculateWidth(offset, length);
+        String blkLen = "Length";
+        int blkLenWidth =
+            Align.calculateWidth(blkLen, metaSize / metaBlkCnt * 10);
+        String rawSize = "Raw-Size";
+        int rawSizeWidth =
+            Align.calculateWidth(rawSize, metaSizeUncompressed / metaBlkCnt
+                * 10);
+        String compression = "Compression";
+        int compressionWidth = compression.length();
+        out.printf("%s %s %s %s %s\n", Align.format(name, nameWidth,
+            Align.CENTER), Align.format(offset, offsetWidth, Align.CENTER),
+            Align.format(blkLen, blkLenWidth, Align.CENTER), Align.format(
+                rawSize, rawSizeWidth, Align.CENTER), Align.format(compression,
+                compressionWidth, Align.LEFT));
+
+        for (Iterator<Map.Entry<String, MetaIndexEntry>> it =
+            metaBlkEntrySet.iterator(); it.hasNext();) {
+          Map.Entry<String, MetaIndexEntry> e = it.next();
+          String blkName = e.getValue().getMetaName();
+          BlockRegion region = e.getValue().getRegion();
+          String blkCompression =
+              e.getValue().getCompressionAlgorithm().getName();
+          out.printf("%s %s %s %s %s\n", Align.format(blkName, nameWidth,
+              Align.LEFT), Align.format(region.getOffset(), offsetWidth,
+              Align.LEFT), Align.format(region.getCompressedSize(),
+              blkLenWidth, Align.LEFT), Align.format(region.getRawSize(),
+              rawSizeWidth, Align.LEFT), Align.format(blkCompression,
+              compressionWidth, Align.LEFT));
+        }
+      }
+    } finally {
+      IOUtils.cleanup(LOG, reader, fsdis);
+    }
+  }
+}

Added: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/tfile/Utils.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/tfile/Utils.java?rev=883836&view=auto
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/tfile/Utils.java (added)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/tfile/Utils.java Tue Nov 24 19:54:19 2009
@@ -0,0 +1,517 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.zebra.tfile;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Comparator;
+import java.util.List;
+
+import org.apache.hadoop.io.Text;
+
+/**
+ * Supporting Utility classes used by TFile, and shared by users of TFile.
+ */
+public final class Utils {
+
+  /**
+   * Prevent the instantiation of Utils.
+   */
+  private Utils() {
+    // nothing
+  }
+
+  /**
+   * Encoding an integer into a variable-length encoding format. Synonymous to
+   * <code>Utils#writeVLong(out, n)</code>.
+   * 
+   * @param out
+   *          output stream
+   * @param n
+   *          The integer to be encoded
+   * @throws IOException
+   * @see Utils#writeVLong(DataOutput, long)
+   */
+  public static void writeVInt(DataOutput out, int n) throws IOException {
+    writeVLong(out, n);
+  }
+
+  /**
+   * Encoding a Long integer into a variable-length encoding format.
+   * <ul>
+   * <li>if n in [-32, 127): encode in one byte with the actual value.
+   * Otherwise,
+   * <li>if n in [-20*2^8, 20*2^8): encode in two bytes: byte[0] = n/256 - 52;
+   * byte[1]=n&0xff. Otherwise,
+   * <li>if n IN [-16*2^16, 16*2^16): encode in three bytes: byte[0]=n/2^16 -
+   * 88; byte[1]=(n>>8)&0xff; byte[2]=n&0xff. Otherwise,
+   * <li>if n in [-8*2^24, 8*2^24): encode in four bytes: byte[0]=n/2^24 - 112;
+   * byte[1] = (n>>16)&0xff; byte[2] = (n>>8)&0xff; byte[3]=n&0xff. Otherwise:
+   * <li>if n in [-2^31, 2^31): encode in five bytes: byte[0]=-125; byte[1] =
+   * (n>>24)&0xff; byte[2]=(n>>16)&0xff; byte[3]=(n>>8)&0xff; byte[4]=n&0xff;
+   * <li>if n in [-2^39, 2^39): encode in six bytes: byte[0]=-124; byte[1] =
+   * (n>>32)&0xff; byte[2]=(n>>24)&0xff; byte[3]=(n>>16)&0xff;
+   * byte[4]=(n>>8)&0xff; byte[5]=n&0xff
+   * <li>if n in [-2^47, 2^47): encode in seven bytes: byte[0]=-123; byte[1] =
+   * (n>>40)&0xff; byte[2]=(n>>32)&0xff; byte[3]=(n>>24)&0xff;
+   * byte[4]=(n>>16)&0xff; byte[5]=(n>>8)&0xff; byte[6]=n&0xff;
+   * <li>if n in [-2^55, 2^55): encode in eight bytes: byte[0]=-122; byte[1] =
+   * (n>>48)&0xff; byte[2] = (n>>40)&0xff; byte[3]=(n>>32)&0xff;
+   * byte[4]=(n>>24)&0xff; byte[5]=(n>>16)&0xff; byte[6]=(n>>8)&0xff;
+   * byte[7]=n&0xff;
+   * <li>if n in [-2^63, 2^63): encode in nine bytes: byte[0]=-121; byte[1] =
+   * (n>>54)&0xff; byte[2] = (n>>48)&0xff; byte[3] = (n>>40)&0xff;
+   * byte[4]=(n>>32)&0xff; byte[5]=(n>>24)&0xff; byte[6]=(n>>16)&0xff;
+   * byte[7]=(n>>8)&0xff; byte[8]=n&0xff;
+   * </ul>
+   * 
+   * @param out
+   *          output stream
+   * @param n
+   *          the integer number
+   * @throws IOException
+   */
+  @SuppressWarnings("fallthrough")
+  public static void writeVLong(DataOutput out, long n) throws IOException {
+    if ((n < 128) && (n >= -32)) {
+      out.writeByte((int) n);
+      return;
+    }
+
+    long un = (n < 0) ? ~n : n;
+    // how many bytes do we need to represent the number with sign bit?
+    int len = (Long.SIZE - Long.numberOfLeadingZeros(un)) / 8 + 1;
+    int firstByte = (int) (n >> ((len - 1) * 8));
+    switch (len) {
+      case 1:
+        // fall it through to firstByte==-1, len=2.
+        firstByte >>= 8;
+      case 2:
+        if ((firstByte < 20) && (firstByte >= -20)) {
+          out.writeByte(firstByte - 52);
+          out.writeByte((int) n);
+          return;
+        }
+        // fall it through to firstByte==0/-1, len=3.
+        firstByte >>= 8;
+      case 3:
+        if ((firstByte < 16) && (firstByte >= -16)) {
+          out.writeByte(firstByte - 88);
+          out.writeShort((int) n);
+          return;
+        }
+        // fall it through to firstByte==0/-1, len=4.
+        firstByte >>= 8;
+      case 4:
+        if ((firstByte < 8) && (firstByte >= -8)) {
+          out.writeByte(firstByte - 112);
+          out.writeShort(((int) n) >>> 8);
+          out.writeByte((int) n);
+          return;
+        }
+        out.writeByte(len - 129);
+        out.writeInt((int) n);
+        return;
+      case 5:
+        out.writeByte(len - 129);
+        out.writeInt((int) (n >>> 8));
+        out.writeByte((int) n);
+        return;
+      case 6:
+        out.writeByte(len - 129);
+        out.writeInt((int) (n >>> 16));
+        out.writeShort((int) n);
+        return;
+      case 7:
+        out.writeByte(len - 129);
+        out.writeInt((int) (n >>> 24));
+        out.writeShort((int) (n >>> 8));
+        out.writeByte((int) n);
+        return;
+      case 8:
+        out.writeByte(len - 129);
+        out.writeLong(n);
+        return;
+      default:
+        throw new RuntimeException("Internel error");
+    }
+  }
+
+  /**
+   * Decoding the variable-length integer. Synonymous to
+   * <code>(int)Utils#readVLong(in)</code>.
+   * 
+   * @param in
+   *          input stream
+   * @return the decoded integer
+   * @throws IOException
+   * 
+   * @see Utils#readVLong(DataInput)
+   */
+  public static int readVInt(DataInput in) throws IOException {
+    long ret = readVLong(in);
+    if ((ret > Integer.MAX_VALUE) || (ret < Integer.MIN_VALUE)) {
+      throw new RuntimeException(
+          "Number too large to be represented as Integer");
+    }
+    return (int) ret;
+  }
+
+  /**
+   * Decoding the variable-length integer. Suppose the value of the first byte
+   * is FB, and the following bytes are NB[*].
+   * <ul>
+   * <li>if (FB >= -32), return (long)FB;
+   * <li>if (FB in [-72, -33]), return (FB+52)<<8 + NB[0]&0xff;
+   * <li>if (FB in [-104, -73]), return (FB+88)<<16 + (NB[0]&0xff)<<8 +
+   * NB[1]&0xff;
+   * <li>if (FB in [-120, -105]), return (FB+112)<<24 + (NB[0]&0xff)<<16 +
+   * (NB[1]&0xff)<<8 + NB[2]&0xff;
+   * <li>if (FB in [-128, -121]), return interpret NB[FB+129] as a signed
+   * big-endian integer.
+   * 
+   * @param in
+   *          input stream
+   * @return the decoded long integer.
+   * @throws IOException
+   */
+
+  public static long readVLong(DataInput in) throws IOException {
+    int firstByte = in.readByte();
+    if (firstByte >= -32) {
+      return firstByte;
+    }
+
+    switch ((firstByte + 128) / 8) {
+      case 11:
+      case 10:
+      case 9:
+      case 8:
+      case 7:
+        return ((firstByte + 52) << 8) | in.readUnsignedByte();
+      case 6:
+      case 5:
+      case 4:
+      case 3:
+        return ((firstByte + 88) << 16) | in.readUnsignedShort();
+      case 2:
+      case 1:
+        return ((firstByte + 112) << 24) | (in.readUnsignedShort() << 8)
+            | in.readUnsignedByte();
+      case 0:
+        int len = firstByte + 129;
+        switch (len) {
+          case 4:
+            return in.readInt();
+          case 5:
+            return ((long) in.readInt()) << 8 | in.readUnsignedByte();
+          case 6:
+            return ((long) in.readInt()) << 16 | in.readUnsignedShort();
+          case 7:
+            return ((long) in.readInt()) << 24 | (in.readUnsignedShort() << 8)
+                | in.readUnsignedByte();
+          case 8:
+            return in.readLong();
+          default:
+            throw new IOException("Corrupted VLong encoding");
+        }
+      default:
+        throw new RuntimeException("Internal error");
+    }
+  }
+
+  /**
+   * Write a String as a VInt n, followed by n Bytes as in Text format.
+   * 
+   * @param out
+   * @param s
+   * @throws IOException
+   */
+  public static void writeString(DataOutput out, String s) throws IOException {
+    if (s != null) {
+      Text text = new Text(s);
+      byte[] buffer = text.getBytes();
+      int len = text.getLength();
+      writeVInt(out, len);
+      out.write(buffer, 0, len);
+    } else {
+      writeVInt(out, -1);
+    }
+  }
+
+  /**
+   * Read a String as a VInt n, followed by n Bytes in Text format.
+   * 
+   * @param in
+   *          The input stream.
+   * @return The string
+   * @throws IOException
+   */
+  public static String readString(DataInput in) throws IOException {
+    int length = readVInt(in);
+    if (length == -1) return null;
+    byte[] buffer = new byte[length];
+    in.readFully(buffer);
+    return Text.decode(buffer);
+  }
+
+  /**
+   * A generic Version class. We suggest applications built on top of TFile use
+   * this class to maintain version information in their meta blocks.
+   * 
+   * A version number consists of a major version and a minor version. The
+   * suggested usage of major and minor version number is to increment major
+   * version number when the new storage format is not backward compatible, and
+   * increment the minor version otherwise.
+   */
+  public static final class Version implements Comparable<Version> {
+    private final short major;
+    private final short minor;
+
+    /**
+     * Construct the Version object by reading from the input stream.
+     * 
+     * @param in
+     *          input stream
+     * @throws IOException
+     */
+    public Version(DataInput in) throws IOException {
+      major = in.readShort();
+      minor = in.readShort();
+    }
+
+    /**
+     * Constructor.
+     * 
+     * @param major
+     *          major version.
+     * @param minor
+     *          minor version.
+     */
+    public Version(short major, short minor) {
+      this.major = major;
+      this.minor = minor;
+    }
+
+    /**
+     * Write the objec to a DataOutput. The serialized format of the Version is
+     * major version followed by minor version, both as big-endian short
+     * integers.
+     * 
+     * @param out
+     *          The DataOutput object.
+     * @throws IOException
+     */
+    public void write(DataOutput out) throws IOException {
+      out.writeShort(major);
+      out.writeShort(minor);
+    }
+
+    /**
+     * Get the major version.
+     * 
+     * @return Major version.
+     */
+    public int getMajor() {
+      return major;
+    }
+
+    /**
+     * Get the minor version.
+     * 
+     * @return The minor version.
+     */
+    public int getMinor() {
+      return minor;
+    }
+
+    /**
+     * Get the size of the serialized Version object.
+     * 
+     * @return serialized size of the version object.
+     */
+    public static int size() {
+      return (Short.SIZE + Short.SIZE) / Byte.SIZE;
+    }
+
+    /**
+     * Return a string representation of the version.
+     */
+    @Override
+    public String toString() {
+      return new StringBuilder("v").append(major).append(".").append(minor)
+          .toString();
+    }
+
+    /**
+     * Test compatibility.
+     * 
+     * @param other
+     *          The Version object to test compatibility with.
+     * @return true if both versions have the same major version number; false
+     *         otherwise.
+     */
+    public boolean compatibleWith(Version other) {
+      return major == other.major;
+    }
+
+    /**
+     * Compare this version with another version.
+     */
+    @Override
+    public int compareTo(Version that) {
+      if (major != that.major) {
+        return major - that.major;
+      }
+      return minor - that.minor;
+    }
+
+    @Override
+    public boolean equals(Object other) {
+      if (this == other) return true;
+      if (!(other instanceof Version)) return false;
+      return compareTo((Version) other) == 0;
+    }
+
+    @Override
+    public int hashCode() {
+      return (major << 16 + minor);
+    }
+  }
+
+  /**
+   * Lower bound binary search. Find the index to the first element in the list
+   * that compares greater than or equal to key.
+   * 
+   * @param <T>
+   *          Type of the input key.
+   * @param list
+   *          The list
+   * @param key
+   *          The input key.
+   * @param cmp
+   *          Comparator for the key.
+   * @return The index to the desired element if it exists; or list.size()
+   *         otherwise.
+   */
+  public static <T> int lowerBound(List<? extends T> list, T key,
+      Comparator<? super T> cmp) {
+    int low = 0;
+    int high = list.size();
+
+    while (low < high) {
+      int mid = (low + high) >>> 1;
+      T midVal = list.get(mid);
+      int ret = cmp.compare(midVal, key);
+      if (ret < 0)
+        low = mid + 1;
+      else high = mid;
+    }
+    return low;
+  }
+
+  /**
+   * Upper bound binary search. Find the index to the first element in the list
+   * that compares greater than the input key.
+   * 
+   * @param <T>
+   *          Type of the input key.
+   * @param list
+   *          The list
+   * @param key
+   *          The input key.
+   * @param cmp
+   *          Comparator for the key.
+   * @return The index to the desired element if it exists; or list.size()
+   *         otherwise.
+   */
+  public static <T> int upperBound(List<? extends T> list, T key,
+      Comparator<? super T> cmp) {
+    int low = 0;
+    int high = list.size();
+
+    while (low < high) {
+      int mid = (low + high) >>> 1;
+      T midVal = list.get(mid);
+      int ret = cmp.compare(midVal, key);
+      if (ret <= 0)
+        low = mid + 1;
+      else high = mid;
+    }
+    return low;
+  }
+
+  /**
+   * Lower bound binary search. Find the index to the first element in the list
+   * that compares greater than or equal to key.
+   * 
+   * @param <T>
+   *          Type of the input key.
+   * @param list
+   *          The list
+   * @param key
+   *          The input key.
+   * @return The index to the desired element if it exists; or list.size()
+   *         otherwise.
+   */
+  public static <T> int lowerBound(List<? extends Comparable<? super T>> list,
+      T key) {
+    int low = 0;
+    int high = list.size();
+
+    while (low < high) {
+      int mid = (low + high) >>> 1;
+      Comparable<? super T> midVal = list.get(mid);
+      int ret = midVal.compareTo(key);
+      if (ret < 0)
+        low = mid + 1;
+      else high = mid;
+    }
+    return low;
+  }
+
+  /**
+   * Upper bound binary search. Find the index to the first element in the list
+   * that compares greater than the input key.
+   * 
+   * @param <T>
+   *          Type of the input key.
+   * @param list
+   *          The list
+   * @param key
+   *          The input key.
+   * @return The index to the desired element if it exists; or list.size()
+   *         otherwise.
+   */
+  public static <T> int upperBound(List<? extends Comparable<? super T>> list,
+      T key) {
+    int low = 0;
+    int high = list.size();
+
+    while (low < high) {
+      int mid = (low + high) >>> 1;
+      Comparable<? super T> midVal = list.get(mid);
+      int ret = midVal.compareTo(key);
+      if (ret <= 0)
+        low = mid + 1;
+      else high = mid;
+    }
+    return low;
+  }
+}

Modified: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/types/CGSchema.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/types/CGSchema.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/types/CGSchema.java (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/types/CGSchema.java Tue Nov 24 19:54:19 2009
@@ -27,7 +27,7 @@
 
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.WritableUtils;
-import org.apache.hadoop.io.file.tfile.Utils.Version;
+import org.apache.hadoop.zebra.tfile.Utils.Version;
 import org.apache.hadoop.zebra.schema.Schema;
 import org.apache.hadoop.zebra.parser.ParseException;
 
@@ -50,7 +50,6 @@
    
    // tmp schema file name, used as a flag of unfinished CG
    private final static String SCHEMA_FILE = ".schema";
-   private final static String DEFAULT_COMPARATOR = "memcmp";
 	// schema version, should be same as BasicTable's most of the time
    private final static Version SCHEMA_VERSION =
      new Version((short) 1, (short) 1);
@@ -87,19 +86,18 @@
      this.version = SCHEMA_VERSION;
    }
 
-   public CGSchema(Schema schema, boolean sorted) {
+   public CGSchema(Schema schema, boolean sorted, String comparator) {
      this.sorted = sorted;
-     this.comparator = (sorted) ? DEFAULT_COMPARATOR : "";
+     this.comparator = (sorted) ? (comparator == null ? SortInfo.DEFAULT_COMPARATOR : comparator) : "";
      this.schema = schema;
      this.version = SCHEMA_VERSION;
    }
 
-   public CGSchema(Schema schema, boolean sorted, String name, String serializer, String compressor, String owner, String group, short perm) {
-  	this(schema, sorted);
+   public CGSchema(Schema schema, boolean sorted, String comparator, String name, String serializer, String compressor, String owner, String group, short perm) {
+  	this(schema, sorted, comparator);
     this.name = name;
   	this.serializer = serializer;
   	this.compressor = compressor;
-//  	this.owner = owner;
   	this.group = group;
   	this.perm  = perm;
    }

Modified: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/types/Partition.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/types/Partition.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/types/Partition.java (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/types/Partition.java Tue Nov 24 19:54:19 2009
@@ -41,6 +41,9 @@
  * insertions and queries respectively.
  */
 public class Partition {
+  /**
+   * Storage split types
+   */
   public enum SplitType {
     NONE, RECORD, COLLECTION, MAP
   }
@@ -87,7 +90,7 @@
        * add map keys
        * return false if any key already exists but no rollback!
        */
-      public boolean addKeys(HashSet<String> keys)
+      public boolean addKeys(HashSet<String> keys, HashSet<String> columnKeySet)
       {
         if (keySet == null)
           keySet = new HashSet<String>();
@@ -95,6 +98,11 @@
         for (Iterator<String> it = keys.iterator(); it.hasNext(); )
         {
           key = it.next();
+          
+          // if the key is used in another CG?
+          if (!columnKeySet.add(key))
+            return false;
+          
           if (!keySet.add(key))
             return false;
         }
@@ -144,6 +152,7 @@
       private HashSet<String> mSplitColumns = new HashSet<String>();
       private ColumnMappingEntry mCGIndex = null;
       private String mCGName = null; // fully qualified name
+      private HashSet<String> keySet = null;
       private SplitType stype = SplitType.NONE;
       private boolean splitChild;
 
@@ -157,7 +166,9 @@
         mSplitMaps.add(cme);
         // multiple map splits on one MAP column is allowed!
         mSplitColumns.add(name);
-        return cme.addKeys(keys);
+        if (keySet == null)
+          keySet = new HashSet<String>();
+        return cme.addKeys(keys, keySet);
       }
 
       /**
@@ -317,7 +328,7 @@
      */
     public CGSchema generateDefaultCGSchema(String name, String compressor,
         String serializer, String owner, String group, 
-        short perm, final int defaultCGIndex) throws ParseException {
+        short perm, final int defaultCGIndex, String comparator) throws ParseException {
       Schema schema = new Schema();
       Schema.ColumnSchema fs;
       for (int i = 0; i < mSchema.getNumColumns(); i++) {
@@ -369,13 +380,13 @@
         }
       }
       CGSchema defaultSchema =
-          (schema.getNumColumns() == 0 ? null : new CGSchema(schema, false, name, serializer, compressor, owner, group, perm));
+          (schema.getNumColumns() == 0 ? null : new CGSchema(schema, false, comparator, name, serializer, compressor, owner, group, perm));
       return defaultSchema;
     }
 
     /**
      * returns "hash key-to-(sub)column" map on a (sub)column which is MAP-split
-     * aross different hash keys
+     * across different hash keys
      */
     public HashSet<PartitionInfo.ColumnMappingEntry> getSplitMap(
         Schema.ColumnSchema fs) {
@@ -661,17 +672,21 @@
   private Projection mProjection = null;
   private ArrayList<PartitionedColumn> mPCNeedTmpTuple = new ArrayList<PartitionedColumn>();
   private ArrayList<PartitionedColumn> mPCNeedMap = new ArrayList<PartitionedColumn>();
+  private String comparator;
+  private boolean mSorted;
+  private SortInfo mSortInfo;
 
   /*
    * ctor used for LOAD
    */
-  public Partition(Schema schema, Projection projection, String storage)
+  public Partition(Schema schema, Projection projection, String storage, String comparator)
       throws ParseException, IOException {
     mSchema = schema;
     TableStorageParser sparser =
-        new TableStorageParser(new StringReader(storage), this, mSchema);
+        new TableStorageParser(new StringReader(storage), this, mSchema, comparator);
     mPartitionInfo = new PartitionInfo(schema);
-    ArrayList<CGSchema> cgschemas = sparser.StorageSchema();
+    ArrayList<CGSchema> cgschemas = new ArrayList<CGSchema>();
+    sparser.StorageSchema(cgschemas);
     mCGSchemas = cgschemas.toArray(new CGSchema[cgschemas.size()]);
     mProjection = projection;
     Schema projSchema = projection.getProjectionSchema();
@@ -711,8 +726,6 @@
           cgindex = mapentry.getKey();
           if (cgindex == null)
             throw new AssertionError( "Internal Logical Error: RECORD does not have a CG index.");
-          if (mapentry.getValue() != null)
-            throw new AssertionError( "Internal Logical Error: RECORD should not have a split key map.");
           cgentry = getCGEntry(cgindex.getCGIndex());
           parCol = new PartitionedColumn(i, false);
           cgentry.addUser(parCol, name);
@@ -747,15 +760,34 @@
   /*
    * ctor used by STORE
    */
-  public Partition(final String schema, final String storage)
+  public Partition(final String schema, final String storage, String comparator, String sortColumns)
+        throws ParseException, IOException
+  {
+    TableSchemaParser parser = new TableSchemaParser(new StringReader(schema));
+    mSchema = parser.RecordSchema(null);
+    mSortInfo = SortInfo.parse(sortColumns, mSchema, comparator);
+    mSorted = (mSortInfo != null && mSortInfo.size() > 0);
+    this.comparator = (mSorted ? mSortInfo.getComparator() : "");
+    storeConst(storage);
+  }
+
+  public Partition(String schema, final String storage, String comparator)
       throws ParseException, IOException
   {
     TableSchemaParser parser = new TableSchemaParser(new StringReader(schema));
     mSchema = parser.RecordSchema(null);
+    this.comparator = comparator;
+    storeConst(storage);
+  }
+
+  private void storeConst(final String storage)
+      throws ParseException, IOException
+  {
     mPartitionInfo = new PartitionInfo(mSchema);
     TableStorageParser sparser =
-        new TableStorageParser(new StringReader(storage), this, mSchema);
-    ArrayList<CGSchema> cgschemas = sparser.StorageSchema();    
+      new TableStorageParser(new StringReader(storage), this, mSchema, this.comparator);
+    ArrayList<CGSchema> cgschemas = new ArrayList<CGSchema>();
+    sparser.StorageSchema(cgschemas);
     mCGSchemas = cgschemas.toArray(new CGSchema[cgschemas.size()]);
     int size = mSchema.getNumColumns();
     PartitionInfo.ColumnMappingEntry cgindex;
@@ -785,7 +817,7 @@
         } else {
           // this subtype is MAP-split
           // => need to add splits for all split keys
-          handleMapSplit(curCol, fs, i, cgentry);
+          handleMapSplit(curCol, fs, i, cgentry, cgindex.getFieldIndex());
         }
       }
       else {
@@ -805,6 +837,18 @@
       mPCNeedMap.get(i).createMap();
   }
 
+  public SortInfo getSortInfo() {
+    return mSortInfo;
+  }
+
+  public boolean isSorted() {
+    return mSorted;
+  }
+
+  public String getComparator() {
+    return comparator;
+  }
+
   /**
    * returns table schema
    */
@@ -856,10 +900,10 @@
     {
       if (projectedKeys != null)
       {
-        pn.mDT = ColumnType.MAP;
+        pn.setDT(ColumnType.MAP);
         map = true;
       } else {
-        pn.mDT = ColumnType.ANY;
+        pn.setDT(ColumnType.ANY);
         PartitionInfo.ColumnMappingEntry cme;
         for (Iterator<PartitionInfo.ColumnMappingEntry> it = results.iterator(); it.hasNext(); )
         {
@@ -985,7 +1029,7 @@
       pn.parseName(fs);
       Schema.ParsedName oripn = new Schema.ParsedName();
       for (int i = 0; i < schema.getNumColumns(); i++) {
-        oripn.setName(new String(pn.mName), pn.mDT);
+        oripn.setName(new String(pn.getName()), pn.getDT());
         child = schema.getColumn(i);
         if (getCGIndex(child) == null) {
           // not a CG: go one level lower
@@ -1038,7 +1082,7 @@
       PartitionedColumn parent, Schema.ColumnSchema child, int i,
       int fi, HashMap<PartitionInfo.ColumnMappingEntry, HashSet<String>> cgindices) throws IOException {
     CGEntry cgentry;
-    if (pn.mDT == ColumnType.ANY) {
+    if (pn.getDT() == ColumnType.ANY) {
       // this subtype is MAP split and the projection is on the whole MAP:
       // => need to add stitches for all split keys
 
@@ -1174,7 +1218,7 @@
           else {
             // this subfield is MAP-split
             // => need to add splits for all split keys
-            handleMapSplit(parent, child, i, cgentry);
+            handleMapSplit(parent, child, i, cgentry, cgindex.getFieldIndex());
           }
         }
       }
@@ -1189,12 +1233,13 @@
    * @throws IOException
    */
   private void handleMapSplit(PartitionedColumn parent,
-      Schema.ColumnSchema child, int i, CGEntry cgentry) throws ParseException, IOException {
+      Schema.ColumnSchema child, int i, CGEntry cgentry, int childProjIndex) throws ParseException, IOException {
     // first the map partitioned column that contain all non-key-partitioned
     // hashes
     PartitionedColumn mapParCol =
         new PartitionedColumn(i, Partition.SplitType.MAP, false);
     cgentry.addUser(mapParCol, getCGName(child));
+    mapParCol.setProjIndex(childProjIndex);
     mExecs.add(mapParCol); // not a leaf : MAP split needed
     mSplitSize++;
     parent.addChild(mapParCol);
@@ -1236,7 +1281,6 @@
     while (it.hasNext())
       it.next().getValue().read();
 
-    TypesUtils.resetTuple(t);
     // dispatch
     mExecs.get(mStitchSize - 1).setRecord(t);
 
@@ -1313,9 +1357,9 @@
 
   public CGSchema generateDefaultCGSchema(String name, String compressor, String serializer,
       String owner, String group, short perm, 
-      final int defaultCGIndex) throws ParseException {
+      final int defaultCGIndex, String comparator) throws ParseException {
     return mPartitionInfo.generateDefaultCGSchema(name, compressor, serializer,owner, group, perm,
-        defaultCGIndex);
+        defaultCGIndex, comparator);
   }
 
   public void setSplit(Schema.ColumnSchema fs, SplitType st, SplitType cst, String name, String childName, boolean splitChild) throws ParseException {

Modified: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/types/Projection.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/types/Projection.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/types/Projection.java (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/types/Projection.java Tue Nov 24 19:54:19 2009
@@ -19,6 +19,7 @@
 
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.ArrayList;
 import org.apache.hadoop.zebra.schema.Schema;
 import org.apache.hadoop.zebra.parser.ParseException;
 
@@ -27,6 +28,7 @@
  */
 
 public class Projection {
+  public static final String source_table_vcolumn_name = "source_table";
   private Schema mProjection; // schema as needed by the projection
   private int mNumColumns;
   private String mProjStr;
@@ -52,6 +54,43 @@
   }
 
   /**
+   * if a column name is on a virtual column
+   */
+  public static boolean isVirtualColumn(String name)
+  {
+    if (name == null || name.isEmpty())
+      return false;
+    return name.trim().equalsIgnoreCase(source_table_vcolumn_name);
+  }
+
+  /**
+   * Get the indices of all virtual columns
+   */
+  public static Integer[] getVirtualColumnIndices(String projection)
+  {
+    if (projection == null)
+      return null;
+    String[] colnames = projection.trim().split(Schema.COLUMN_DELIMITER);
+    int size = colnames.length, realsize = 0;
+    ArrayList<Integer> vcol = new ArrayList();
+    
+    for (int i = 0; i < size; i++)
+    {
+      if (Projection.isVirtualColumn(colnames[i]))
+      {
+        vcol.add(i);
+      }
+    }
+    Integer[] result = null;
+    if (!vcol.isEmpty())
+    {
+      result = new Integer[vcol.size()];
+      vcol.toArray(result);
+    }
+    return result;
+  }
+  
+  /**
    * ctor for partial projection
    */
   public Projection(Schema s, String projection) throws ParseException

Added: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/types/SortInfo.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/types/SortInfo.java?rev=883836&view=auto
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/types/SortInfo.java (added)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/types/SortInfo.java Tue Nov 24 19:54:19 2009
@@ -0,0 +1,199 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.zebra.types;
+
+import java.io.IOException;
+import org.apache.hadoop.zebra.schema.Schema;
+import org.apache.hadoop.zebra.schema.ColumnType;
+import org.apache.hadoop.zebra.tfile.TFile;
+
+/**
+ * Sortness related Information
+ */
+public class SortInfo {
+  public static final String DEFAULT_COMPARATOR =  TFile.COMPARATOR_MEMCMP;
+  private boolean global = true;
+  private String[] columns;
+  private int[] indices = null;
+  private ColumnType[] types = null;
+  private String comparator = null;
+  private Schema schema = null;
+  private static final String SORTED_COLUMN_DELIMITER = ",";
+
+  private SortInfo(String[] columns, int[] sortIndices, ColumnType[] sortColTypes, String comparator, Schema schema)
+  {
+    this.columns = columns;
+    this.indices = sortIndices;
+    this.comparator = comparator;
+    this.schema = schema;
+    this.types = sortColTypes;
+  }
+
+  /**
+   * Get an array of the sorted column names with the first column
+   * being the primary sort key, the second column being the
+   * secondary sort key, ..., etc.
+   *
+   * @return an array of strings of sorted column names
+   */
+  public String[] getSortColumnNames() {
+    return columns;
+  }
+  
+  /**
+   * Get an array of zebra types of the sorted columns with the first column
+   * being the primary sort key, the second column being the
+   * secondary sort key, ..., etc.
+   *
+   * @return an array of strings of sorted column names
+   */
+  public ColumnType[] getSortColumnTypes() {
+	  return types;
+  }
+
+  /**
+   * Get an array of column indices in schema of the sorted columns with the first column
+   * being the primary sort key, the second column being the
+   * secondary sort key, ..., etc.
+   *
+   * @return an array of strings of sorted column names
+   */
+  public int[] getSortIndices() {
+    return indices;
+  }
+
+  /**
+   * Get the number of sorted columns
+   *
+   * @return number of sorted columns
+   */
+  public int size() {
+    return (columns == null ? 0 : columns.length);
+  }
+
+  /**
+   * Get the comparator name 
+   *
+   * @return  comparator name
+   */
+  public String getComparator() {
+    return comparator;
+  }
+
+  /**
+   * Check if the two SortInfo objects are equal
+   *
+   * @return true if one's sort columns is equal to a leading portion of the other's
+   */
+  public boolean equals(String sortcolumns, String comparator) throws IOException {
+    if (sortcolumns == null || sortcolumns.trim().isEmpty())
+    {
+      return false;
+    }
+    String[] columns = sortcolumns.trim().split(SORTED_COLUMN_DELIMITER);
+    for (String column : columns)
+    {
+    	if (schema.getColumn(column) == null)
+            throw new IOException(column + " does not exist in schema");
+    }
+    if (this.columns.length <= columns.length)
+    {
+      for (int i = 0; i < this.columns.length; i++)
+     {
+       if (!this.columns[i].equals(columns[i]))
+         return false;
+     }
+    } else {
+      for (int i = 0; i < columns.length; i++)
+     {
+       if (!columns[i].equals(this.columns[i]))
+         return false;
+     }
+    }
+    if (this.comparator == null && comparator == null)
+    {
+      return true;
+    } else if (this.comparator != null && comparator != null)
+    {
+      return (this.comparator.equals(comparator));
+    } else {
+      return false;
+    }
+  }
+
+  /**
+   * Build a SortInfo object from sort column names, schema, and comparator
+   *
+   * @param sortStr
+   *                     comma-separated sort column names
+   * @param schema
+   *                     schema of the Zebra table for the sort columns
+   * @param comparator
+   *                     comparator name
+   * @return             newly built SortInfo object
+   */
+  public static SortInfo parse(String sortStr, Schema schema, String comparator) throws IOException
+  {
+    if (sortStr == null || sortStr.trim().isEmpty())
+    {
+      return null;
+    }
+    String[] sortedColumns = sortStr.trim().split(SORTED_COLUMN_DELIMITER);
+    int[] sortColIndices = new int[sortedColumns.length];
+    ColumnType[] sortColTypes = new ColumnType[sortedColumns.length];
+    Schema.ColumnSchema cs;
+    for (int i = 0; i < sortedColumns.length; i++)
+    {
+      sortedColumns[i] = sortedColumns[i].trim();
+      /*
+       * sanity check the sort column's existence
+       */
+      if ((cs = schema.getColumn(sortedColumns[i])) == null)
+        throw new IOException(sortedColumns[i] + " does not exist in schema");
+      sortColIndices[i] = schema.getColumnIndex(sortedColumns[i]);
+      sortColTypes[i] = schema.getColumn(sortedColumns[i]).getType();
+    }
+    String comparatorInUse = (sortedColumns.length > 0 ?
+        (comparator == null || comparator.isEmpty() ?
+          DEFAULT_COMPARATOR : comparator) : null);
+    return new SortInfo(sortedColumns, sortColIndices, sortColTypes, comparatorInUse, schema);
+  }
+
+  /**
+   * Build a string of comma-separated sort column names from an array of sort column names
+   *
+   * @param names an array of sort column names
+   *
+   * @return a string of comma-separated sort column names
+   */
+  public static String toSortString(String[] names)
+  {
+    if (names == null || names.length == 0)
+      return null;
+
+    StringBuilder sb = new StringBuilder();
+    for (int i = 0; i < names.length; i++)
+    {
+      if (i > 0)
+        sb.append(SORTED_COLUMN_DELIMITER);
+      sb.append(names[i]);
+    }
+    return sb.toString();
+  }
+}

Modified: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/types/SubColumnExtraction.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/types/SubColumnExtraction.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/types/SubColumnExtraction.java (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/types/SubColumnExtraction.java Tue Nov 24 19:54:19 2009
@@ -30,14 +30,14 @@
 import org.apache.hadoop.zebra.parser.ParseException;
 
 
+/**
+ * This class extracts a subfield from a column or subcolumn stored
+ * in entirety on disk
+ * It should be used only by readers whose serializers do not
+ * support projection
+ */
 public class SubColumnExtraction {
-	/**
-	 * This class extracts a subfield from a column or subcolumn stored
-   * in entirety on disk
-	 * It should be used only by readers whose serializers do not
-   * support projection
-	 */
-	public static class SubColumn {
+	static class SubColumn {
 		Schema physical;
 		Projection projection;
 		ArrayList<SplitColumn> exec = null;
@@ -77,16 +77,17 @@
 				pn.setName(name);
 				fs = physical.getColumnSchema(pn);
 				if (keySet != null)
-				  pn.mDT = ColumnType.MAP;
+				  pn.setDT(ColumnType.MAP);
 				if (fs == null)
 		     		continue; // skip non-existing field
 		
 				j = fs.getIndex();
-				if (pn.mDT == ColumnType.MAP || pn.mDT == ColumnType.RECORD || pn.mDT == ColumnType.COLLECTION)
+				ColumnType ct = pn.getDT();
+				if (ct == ColumnType.MAP || ct == ColumnType.RECORD || ct == ColumnType.COLLECTION)
 				{
 					// record/map subfield is expected
-					sc = new SplitColumn(j, pn.mDT);
-          if (pn.mDT == ColumnType.MAP)
+					sc = new SplitColumn(j, ct);
+          if (ct == ColumnType.MAP)
             sclist.add(sc);
 					exec.add(sc); // breadth-first
 					// (i, j) represents the mapping between projection and physical schema
@@ -105,18 +106,19 @@
      * build the split executions
      */
 		private void buildSplit(SplitColumn parent, Schema.ColumnSchema fs,
-        Schema.ParsedName pn, final int projIndex, HashSet<String> keys) throws ParseException, ExecException
+        final Schema.ParsedName pn, final int projIndex, HashSet<String> keys) throws ParseException, ExecException
 		{
 			// recursive call to get the next level schema
-			if (pn.mDT != fs.getType())
+		  ColumnType ct = pn.getDT();
+			if (ct != fs.getType())
 	      	throw new ParseException(fs.getName()+" is not of proper type.");
 	
 			String prefix;
 			int fieldIndex;
 			SplitColumn sc;
-			Partition.SplitType callerDT = (pn.mDT == ColumnType.MAP ? Partition.SplitType.MAP :
-				                               (pn.mDT == ColumnType.RECORD ? Partition.SplitType.RECORD :
-				                                 (pn.mDT == ColumnType.COLLECTION ? Partition.SplitType.COLLECTION :
+			Partition.SplitType callerDT = (ct == ColumnType.MAP ? Partition.SplitType.MAP :
+				                               (ct == ColumnType.RECORD ? Partition.SplitType.RECORD :
+				                                 (ct == ColumnType.COLLECTION ? Partition.SplitType.COLLECTION :
 				                        	         Partition.SplitType.NONE)));
 			prefix = pn.parseName(fs);
 			Schema schema = fs.getSchema();
@@ -133,11 +135,12 @@
 				fieldIndex = 0;
 			}
 	
-			if (pn.mDT != ColumnType.ANY)
+		  ct = pn.getDT();
+			if (ct != ColumnType.ANY)
 			{
 				// record subfield is expected
-			 	sc = new SplitColumn(fieldIndex, pn.mDT);
-        if (pn.mDT == ColumnType.MAP)
+			 	sc = new SplitColumn(fieldIndex, ct);
+        if (ct == ColumnType.MAP)
           sclist.add(sc);
 			 	exec.add(sc); // breadth-first
 			 	buildSplit(sc, fs, pn, projIndex, null);
@@ -314,7 +317,7 @@
 		 } else if (st == Partition.SplitType.MAP && keys != null) {
        String key;
        Iterator<String> it;
-
+       Object value;
 			 for (int i = 0; i < size; i++)
 			 {
 				 if (children.get(i).projIndex != -1) // a leaf: set projection directly
@@ -322,7 +325,10 @@
            for (it = keys.iterator(); it.hasNext(); )
            {
              key = it.next();
-			 		   ((Map<String, Object>) (((Tuple)children.get(i).leaf.field).get(children.get(i).projIndex))).put(key, ((Map<String, Object>) field).get(key));
+             value = ((Map<String, Object>) field).get(key);
+             if (value == null)
+               continue;
+			 		   ((Map<String, Object>) (((Tuple)children.get(i).leaf.field).get(children.get(i).projIndex))).put(key, value);
            }
          } else {
            for (it = keys.iterator(); it.hasNext(); )

Modified: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/types/TableStorageParser.jjt
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/types/TableStorageParser.jjt?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/types/TableStorageParser.jjt (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/types/TableStorageParser.jjt Tue Nov 24 19:54:19 2009
@@ -30,7 +30,7 @@
 import org.apache.hadoop.zebra.types.*;
 
 public class TableStorageParser {
-		public TableStorageParser(java.io.Reader in, Partition partition, Schema schema) { this(in); mSchema = schema; this.partition = partition;}
+		public TableStorageParser(java.io.Reader in, Partition partition, Schema schema, String comparator) { this(in); mSchema = schema; this.partition = partition; this.comparator = comparator; }
 		private Schema mSchema;
 		private int mDefaultCGIndex = -1;
 		private String mName = null;
@@ -39,6 +39,7 @@
 		private short mPerm = -1;
 		private int mCGCount = 0;
 		private Partition partition;
+    private String comparator = null;
 }
 PARSER_END(TableStorageParser)
 
@@ -54,9 +55,10 @@
 TOKEN : { <COMPRESSOR : "lzo" | "gz"> }
 TOKEN : { <SERIALIZER : ("pig" | "avro")> }
 
-TOKEN : { <ORDER	 : "order by"> }
 TOKEN : { <COMPRESS	 : "compress by"> }
 TOKEN : { <SERIALIZE : "serialize by"> }
+TOKEN : { <ASC : "ASC"> }
+TOKEN : { <DESC: "DESC"> }
 TOKEN : { <SECURE 	 : "secure by"> }
 TOKEN : { <USER		 : "user"> }
 TOKEN : { <GROUP	 : "group"> }
@@ -64,8 +66,6 @@
 TOKEN : { <AS		 : "as"> }
 
 
-
-
 TOKEN:
 {
  	<#LETTER : ["a"-"z", "A"-"Z"] >
@@ -77,62 +77,17 @@
 |   <SHORT	:	(<OCTAL>){3}	>
 }
 
-ArrayList<CGSchema> StorageSchema() throws ParseException :
+void StorageSchema(ArrayList<CGSchema> s) throws ParseException :
 {
-	ArrayList<CGSchema> s = new ArrayList();
 	CGSchema fs;
 	CGSchema defaultSchema;
 }
 {
 	try {
-		LOOKAHEAD(2) <EOF>
-		{
-			defaultSchema = partition.generateDefaultCGSchema(mName, mCompressor, mSerializer, mOwner, mGroup, mPerm, 0);
-			if (defaultSchema != null)
-				s.add(defaultSchema);
-
-      // check column group names, add system created names when necessary;
-      HashSet<String> cgNames = new HashSet<String>();
-      ArrayList<CGSchema> unnamed = new ArrayList<CGSchema>(); 	
-      for (int i = 0; i < s.size(); i++) { 
-        CGSchema cgSchema = s.get(i);
-        String str = cgSchema.getName();
-        if (str != null) {
-          if (!cgNames.add(str)) {
-            throw new ParseException("Duplicate column group names.");
-          }
-        } else {
-          unnamed.add(cgSchema);
-        }
-      }
-      
-      int digits = 1;
-      int total = unnamed.size();
-      while (total >= 10) {
-        ++digits;
-        total /= 10;
-      }
-      String formatString = "%0" + digits + "d";
-      
-      int idx = 0;
-      for (int i = 0; i < unnamed.size(); i++) { 
-        CGSchema cgSchema = unnamed.get(i);
-        String str = null;
-        while (true) {
-          str = "CG" + String.format(formatString, idx++);
-          if (!cgNames.contains(str)) {
-            break;
-          }
-        }
-        cgSchema.setName(str);
-      }
-			return s;
-		}
-	|
 		fs = FieldSchema() {mCGCount++; if (fs != null) s.add(fs);}
 		(";" fs = FieldSchema() {mCGCount++; if (fs != null) s.add(fs);})* <EOF>
 		{
-			defaultSchema = partition.generateDefaultCGSchema(mName, mCompressor, mSerializer, mOwner, mGroup, mPerm, mDefaultCGIndex == -1 ? mDefaultCGIndex = mCGCount++ : mDefaultCGIndex);
+			defaultSchema = partition.generateDefaultCGSchema(mName, mCompressor, mSerializer, mOwner, mGroup, mPerm, mDefaultCGIndex == -1 ? mDefaultCGIndex = mCGCount++ : mDefaultCGIndex, comparator);
 			if (defaultSchema != null)
 				s.add(defaultSchema);
 
@@ -171,13 +126,29 @@
         }
         cgSchema.setName(str);
       }
-			return s;
+			return;
 		}
 	} catch (TokenMgrError e) {
 		throw new ParseException(e.getMessage());
 	}
 }
 
+boolean ascdsc() throws ParseException :
+{
+  Token t1 = null, t2 = null;
+}
+{
+  (
+		t1 = <ASC>
+  | t2 = <DESC>
+  )
+  {
+    if (t2 != null)
+      return false;
+    return true;
+  }
+}
+
 CGSchema FieldSchema() throws ParseException:
 {
 	Token t1 = null, t2 = null; 
@@ -310,8 +281,9 @@
 			mOwner		= owner;
 			mGroup		= group;
 			mPerm 		= perm; 
-		} else
-			cs = new CGSchema(fs, false, name, serializer, compressor, owner, group, perm);
+		} else {
+			cs = new CGSchema(fs, false, comparator, name, serializer, compressor, owner, group, perm);
+    }
 		return cs;
 	}
 }
@@ -531,7 +503,7 @@
 		"{" keys = hashKeys() "}"
 		{
 			if(!partition.getPartitionInfo().setKeyCGIndex(schema, mCGCount, colIndex, name, keys))
-				throw new ParseException("Column "+name+" specified more than once!");
+				throw new ParseException("Column "+name+" has split keys splecified more than once.");
 			fs = new Schema.ColumnSchema(name, schema.getSchema(), schema.getType());
 		}
 	)

Modified: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/types/TypesUtils.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/types/TypesUtils.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/types/TypesUtils.java (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/types/TypesUtils.java Tue Nov 24 19:54:19 2009
@@ -212,8 +212,8 @@
    * @throws IOException
    * 
    */
-  public static void formatTuple(Tuple tuple, String projection) throws IOException {
-    Tuple one = createTuple(Projection.getNumColumns(projection));
+  public static void formatTuple(Tuple tuple, int ncols) throws IOException {
+    Tuple one = createTuple(ncols);
     tuple.reference(one);
     return;
     /*

Modified: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestBasicTable.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestBasicTable.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestBasicTable.java (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestBasicTable.java Tue Nov 24 19:54:19 2009
@@ -31,7 +31,7 @@
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.file.tfile.RawComparable;
+import org.apache.hadoop.zebra.tfile.RawComparable;
 import org.apache.hadoop.zebra.io.BasicTable;
 import org.apache.hadoop.zebra.io.BasicTableStatus;
 import org.apache.hadoop.zebra.io.KeyDistribution;
@@ -81,14 +81,13 @@
     return String.format("%s%09d", prefix, random.nextInt(max));
   }
 
-  static int createBasicTable(int parts, int rows, String strSchema, String storage,
-      Path path, boolean properClose, boolean sorted) throws IOException {
+  public static int createBasicTable(int parts, int rows, String strSchema, String storage, String sortColumns,
+      Path path, boolean properClose) throws IOException {
     if (fs.exists(path)) {
       BasicTable.drop(path, conf);
     }
 
-    BasicTable.Writer writer = new BasicTable.Writer(path, strSchema, storage,
-        sorted, conf);
+    BasicTable.Writer writer = new BasicTable.Writer(path, strSchema, storage, sortColumns, null, conf);
     writer.finish();
 
     int total = 0;
@@ -96,6 +95,7 @@
     String colNames[] = schema.getColumns();
     Tuple tuple = TypesUtils.createTuple(schema);
 
+    boolean sorted = writer.isSorted();
     for (int i = 0; i < parts; ++i) {
       writer = new BasicTable.Writer(path, conf);
       TableInserter inserter = writer.getInserter(
@@ -126,8 +126,11 @@
     if (properClose) {
       writer = new BasicTable.Writer(path, conf);
       writer.close();
-      BasicTableStatus status = getStatus(path);
-      Assert.assertEquals(total, status.getRows());
+      /* We can only test number of rows on sorted tables.*/
+      if (sorted) {
+        BasicTableStatus status = getStatus(path);
+        Assert.assertEquals(total, status.getRows());
+      }
     }
 
     return total;
@@ -230,10 +233,10 @@
   }
 
   static void doReadWrite(Path path, int parts, int rows, String schema,
-      String storage, String projection, boolean properClose, boolean sorted)
+      String storage, String sortColumns, String projection, boolean properClose, boolean sorted)
       throws IOException, ParseException {
-    int totalRows = createBasicTable(parts, rows, schema, storage, path,
-        properClose, sorted);
+    int totalRows = createBasicTable(parts, rows, schema, storage, sortColumns, path,
+        properClose);
     if (rows == 0) {
       Assert.assertEquals(rows, 0);
     }
@@ -248,17 +251,17 @@
 
   public void testMultiCGs() throws IOException, ParseException {
     Path path = new Path(rootPath, "TestBasicTableMultiCGs");
-    doReadWrite(path, 2, 100, "SF_a,SF_b,SF_c,SF_d,SF_e", "[SF_a,SF_b,SF_c];[SF_d,SF_e]", "SF_f,SF_a,SF_c,SF_d", true, false);
+    doReadWrite(path, 2, 100, "SF_a,SF_b,SF_c,SF_d,SF_e", "[SF_a,SF_b,SF_c];[SF_d,SF_e]", null, "SF_f,SF_a,SF_c,SF_d", true, false);
   }
 
   public void testCornerCases() throws IOException, ParseException {
     Path path = new Path(rootPath, "TestBasicTableCornerCases");
-    doReadWrite(path, 0, 0, "a, b, c", "", "a, d, c, f", false, false);
-    doReadWrite(path, 0, 0, "a, b, c", "", "a, d, c, f", true, false);
-    doReadWrite(path, 0, 0, "a, b, c", "", "a, d, c, f", true, true);
-    doReadWrite(path, 2, 0, "a, b, c", "", "a, d, c, f", false, false);
-    doReadWrite(path, 2, 0, "a, b, c", "", "a, d, c, f", true, false);
-    doReadWrite(path, 2, 0, "a, b, c", "", "a, d, c, f", true, true);
+    doReadWrite(path, 0, 0, "a, b, c", "", null, "a, d, c, f", false, false);
+    doReadWrite(path, 0, 0, "a, b, c", "", null, "a, d, c, f", true, false);
+    doReadWrite(path, 0, 0, "a, b, c", "", "a", "a, d, c, f", true, true);
+    doReadWrite(path, 2, 0, "a, b, c", "", null, "a, d, c, f", false, false);
+    doReadWrite(path, 2, 0, "a, b, c", "", null, "a, d, c, f", true, false);
+    doReadWrite(path, 2, 0, "a, b, c", "", "a", "a, d, c, f", true, true);
   }
 
   static int doReadOnly(TableScanner scanner) throws IOException, ParseException {
@@ -289,7 +292,7 @@
   @Test
   public void testNullSplits() throws IOException, ParseException {
     Path path = new Path(rootPath, "TestBasicTableNullSplits");
-    int totalRows = createBasicTable(2, 250, "a, b, c", "", path, true, true);
+    int totalRows = createBasicTable(2, 250, "a, b, c", "", "a", path, true);
     BasicTable.Reader reader = new BasicTable.Reader(path, conf);
     reader.setProjection("a,d,c,f");
     Assert.assertEquals(totalRows, doReadOnly(reader.getScanner(null, false)));
@@ -301,14 +304,14 @@
   @Test
   public void testNegativeSplits() throws IOException, ParseException {
     Path path = new Path(rootPath, "TestNegativeSplits");
-    int totalRows = createBasicTable(2, 250, "a, b, c", "", path, true, true);
+    int totalRows = createBasicTable(2, 250, "a, b, c", "", "", path, true);
     rangeSplitBasicTable(-1, totalRows, "a,d,c,f", path);
   }
 
   @Test
   public void testMetaBlocks() throws IOException, ParseException {
     Path path = new Path(rootPath, "TestBasicTableMetaBlocks");
-    createBasicTable(3, 100, "a, b, c", "", path, false, false);
+    createBasicTable(3, 100, "a, b, c", "", null, path, false);
     BasicTable.Writer writer = new BasicTable.Writer(path, conf);
     BytesWritable meta1 = makeKey(1234);
     BytesWritable meta2 = makeKey(9876);
@@ -350,8 +353,8 @@
   @Test
   public void testNormalCases() throws IOException, ParseException {
     Path path = new Path(rootPath, "TestBasicTableNormal");
-    doReadWrite(path, 2, 250, "a, b, c", "", "a, d, c, f", false, false);
-    doReadWrite(path, 2, 250, "a, b, c", "", "a, d, c, f", true, false);
-    doReadWrite(path, 2, 250, "a, b, c", "", "a, d, c, f", true, true);
+    doReadWrite(path, 2, 250, "a, b, c", "", null, "a, d, c, f", false, false);
+    doReadWrite(path, 2, 250, "a, b, c", "", null, "a, d, c, f", true, false);
+    doReadWrite(path, 2, 250, "a, b, c", "", "a", "a, d, c, f", true, true);
   }
 }

Modified: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestBasicTableMapSplits.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestBasicTableMapSplits.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestBasicTableMapSplits.java (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestBasicTableMapSplits.java Tue Nov 24 19:54:19 2009
@@ -72,7 +72,7 @@
     BasicTable.drop(path, conf);
 
     BasicTable.Writer writer = new BasicTable.Writer(path, STR_SCHEMA,
-        STR_STORAGE, false, conf);
+        STR_STORAGE, conf);
     writer.finish();
 
     Schema schema = writer.getSchema();

Modified: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestBasicTableProjections.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestBasicTableProjections.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestBasicTableProjections.java (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestBasicTableProjections.java Tue Nov 24 19:54:19 2009
@@ -62,7 +62,7 @@
     BasicTable.drop(path, conf);
 
     BasicTable.Writer writer = new BasicTable.Writer(path, "a,b,c,d,e,f,g",
-        "[a,b,c];[d,e,f,g]", false, conf);
+        "[a,b,c];[d,e,f,g]", conf);
     writer.finish();
 
     Schema schema = writer.getSchema();

Modified: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestBasicTableSplits.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestBasicTableSplits.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestBasicTableSplits.java (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestBasicTableSplits.java Tue Nov 24 19:54:19 2009
@@ -72,7 +72,7 @@
     BasicTable.drop(path, conf);
 
     BasicTable.Writer writer = new BasicTable.Writer(path, STR_SCHEMA,
-        STR_STORAGE, false, conf);
+        STR_STORAGE, conf);
     writer.finish();
 
     Schema schema = writer.getSchema();

Modified: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestCollection.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestCollection.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestCollection.java (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestCollection.java Tue Nov 24 19:54:19 2009
@@ -80,7 +80,7 @@
     // drop any previous tables
     BasicTable.drop(path, conf);
     BasicTable.Writer writer = new BasicTable.Writer(path, STR_SCHEMA,
-        STR_STORAGE, false, conf);
+        STR_STORAGE, conf);
 
     writer.finish();
 
@@ -987,7 +987,7 @@
     BasicTable.drop(path, conf);
     try {
       BasicTable.Writer writer = new BasicTable.Writer(path, STR_SCHEMA,
-          STR_STORAGE, false, conf);
+          STR_STORAGE, conf);
       Assert.fail("should throw exception");
     } catch (Exception e) {
       System.out.println(e);
@@ -1012,10 +1012,10 @@
     BasicTable.drop(path, conf);
     try {
       BasicTable.Writer writer = new BasicTable.Writer(path, STR_SCHEMA,
-          STR_STORAGE, false, conf);
+          STR_STORAGE, conf);
       Assert.fail("should throw exception");
     } catch (Exception e) {
       System.out.println(e);
     }
   }
-}
\ No newline at end of file
+}

Modified: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroup.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroup.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroup.java (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroup.java Tue Nov 24 19:54:19 2009
@@ -30,7 +30,7 @@
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.file.tfile.RawComparable;
+import org.apache.hadoop.zebra.tfile.RawComparable;
 import org.apache.hadoop.zebra.io.BasicTableStatus;
 import org.apache.hadoop.zebra.io.ColumnGroup;
 import org.apache.hadoop.zebra.io.KeyDistribution;
@@ -149,8 +149,11 @@
     if (properClose) {
       writer = new ColumnGroup.Writer(path, conf);
       writer.close();
-      BasicTableStatus status = getStatus(path);
-      Assert.assertEquals(total, status.getRows());
+      /* We can only test number of rows on sorted tables.*/
+      if (sorted) {
+        BasicTableStatus status = getStatus(path);
+        Assert.assertEquals(total, status.getRows());
+      }
     }
 
     return total;

Modified: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupName1.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupName1.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupName1.java (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupName1.java Tue Nov 24 19:54:19 2009
@@ -67,7 +67,7 @@
     // drop any previous tables
     BasicTable.drop(path, conf);
     BasicTable.Writer writer = new BasicTable.Writer(path, STR_SCHEMA,
-        STR_STORAGE, false, conf);
+        STR_STORAGE, conf);
     writer.finish();
     Schema schema = writer.getSchema();
     Tuple tuple = TypesUtils.createTuple(schema);
@@ -170,4 +170,4 @@
 
     reader.close();
   }
-}
\ No newline at end of file
+}

Modified: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupName2.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupName2.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupName2.java (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupName2.java Tue Nov 24 19:54:19 2009
@@ -62,7 +62,7 @@
     // drop any previous tables
     BasicTable.drop(path, conf);
     BasicTable.Writer writer = new BasicTable.Writer(path, STR_SCHEMA,
-        STR_STORAGE, false, conf);
+        STR_STORAGE, conf);
     writer.finish();
     Schema schema = writer.getSchema();
     Tuple tuple = TypesUtils.createTuple(schema);
@@ -110,4 +110,4 @@
   public void test1() throws IOException, Exception {
     BasicTable.dumpInfo(path.toString(), System.out, conf);
   }
-}
\ No newline at end of file
+}

Modified: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupName3.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupName3.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupName3.java (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupName3.java Tue Nov 24 19:54:19 2009
@@ -62,7 +62,7 @@
     // drop any previous tables
     BasicTable.drop(path, conf);
     BasicTable.Writer writer = new BasicTable.Writer(path, STR_SCHEMA,
-        STR_STORAGE, false, conf);
+        STR_STORAGE, conf);
     writer.finish();
     Schema schema = writer.getSchema();
     Tuple tuple = TypesUtils.createTuple(schema);
@@ -110,4 +110,4 @@
   public void test1() throws IOException, Exception {
     BasicTable.dumpInfo(path.toString(), System.out, conf);
   }
-}
\ No newline at end of file
+}

Modified: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupName4.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupName4.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupName4.java (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupName4.java Tue Nov 24 19:54:19 2009
@@ -62,7 +62,7 @@
     // drop any previous tables
     BasicTable.drop(path, conf);
     BasicTable.Writer writer = new BasicTable.Writer(path, STR_SCHEMA,
-        STR_STORAGE, false, conf);
+        STR_STORAGE, conf);
     writer.finish();
     Schema schema = writer.getSchema();
     Tuple tuple = TypesUtils.createTuple(schema);
@@ -110,4 +110,4 @@
   public void test1() throws IOException, Exception {
     BasicTable.dumpInfo(path.toString(), System.out, conf);
   }
-}
\ No newline at end of file
+}

Modified: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupName5.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupName5.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupName5.java (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupName5.java Tue Nov 24 19:54:19 2009
@@ -62,7 +62,7 @@
     // drop any previous tables
     BasicTable.drop(path, conf);
     BasicTable.Writer writer = new BasicTable.Writer(path, STR_SCHEMA,
-        STR_STORAGE, false, conf);
+        STR_STORAGE, conf);
     writer.finish();
     Schema schema = writer.getSchema();
     Tuple tuple = TypesUtils.createTuple(schema);

Modified: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupName6.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupName6.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupName6.java (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupName6.java Tue Nov 24 19:54:19 2009
@@ -62,7 +62,7 @@
     // drop any previous tables
     BasicTable.drop(path, conf);
     BasicTable.Writer writer = new BasicTable.Writer(path, STR_SCHEMA,
-        STR_STORAGE, false, conf);
+        STR_STORAGE, conf);
     writer.finish();
     Schema schema = writer.getSchema();
     Tuple tuple = TypesUtils.createTuple(schema);

Modified: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupProjections.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupProjections.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupProjections.java (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupProjections.java Tue Nov 24 19:54:19 2009
@@ -59,6 +59,10 @@
     fs = new LocalFileSystem(rawLFS);
     path = new Path(fs.getWorkingDirectory(), outputFile);
     System.out.println("output file: " + path);
+    
+    if (fs.exists(path)) {
+        ColumnGroup.drop(path, conf);
+    }
 
     schema = new Schema("a,b,c,d,e,f,g");
 

Modified: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupSplits.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupSplits.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupSplits.java (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupSplits.java Tue Nov 24 19:54:19 2009
@@ -70,6 +70,10 @@
     fs = new LocalFileSystem(rawLFS);
     path = new Path(fs.getWorkingDirectory(), outputFile);
     System.out.println("output file: " + path);
+    
+    if (fs.exists(path)) {
+        ColumnGroup.drop(path, conf);
+    }
 
     schema = new Schema(STR_SCHEMA);