You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by pr...@apache.org on 2009/11/24 20:54:34 UTC
svn commit: r883836 [7/23] - in /hadoop/pig/branches/load-store-redesign: ./
contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/
contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/
contrib/zebra/ contrib/zebra...
Added: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/tfile/TFileDumper.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/tfile/TFileDumper.java?rev=883836&view=auto
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/tfile/TFileDumper.java (added)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/tfile/TFileDumper.java Tue Nov 24 19:54:19 2009
@@ -0,0 +1,295 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.zebra.tfile;
+
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.zebra.tfile.BCFile.BlockRegion;
+import org.apache.hadoop.zebra.tfile.BCFile.MetaIndexEntry;
+import org.apache.hadoop.zebra.tfile.TFile.TFileIndexEntry;
+import org.apache.hadoop.zebra.tfile.Utils.Version;
+
+/**
+ * Dumping the information of a TFile.
+ */
+class TFileDumper {
+ static final Log LOG = LogFactory.getLog(TFileDumper.class);
+
+ private TFileDumper() {
+ // namespace object not constructable.
+ }
+
+ private enum Align {
+ LEFT, CENTER, RIGHT, ZERO_PADDED;
+ static String format(String s, int width, Align align) {
+ if (s.length() >= width) return s;
+ int room = width - s.length();
+ Align alignAdjusted = align;
+ if (room == 1) {
+ alignAdjusted = LEFT;
+ }
+ if (alignAdjusted == LEFT) {
+ return s + String.format("%" + room + "s", "");
+ }
+ if (alignAdjusted == RIGHT) {
+ return String.format("%" + room + "s", "") + s;
+ }
+ if (alignAdjusted == CENTER) {
+ int half = room / 2;
+ return String.format("%" + half + "s", "") + s
+ + String.format("%" + (room - half) + "s", "");
+ }
+ throw new IllegalArgumentException("Unsupported alignment");
+ }
+
+ static String format(long l, int width, Align align) {
+ if (align == ZERO_PADDED) {
+ return String.format("%0" + width + "d", l);
+ }
+ return format(Long.toString(l), width, align);
+ }
+
+ static int calculateWidth(String caption, long max) {
+ return Math.max(caption.length(), Long.toString(max).length());
+ }
+ }
+
+ /**
+ * Dump information about TFile.
+ *
+ * @param file
+ * Path string of the TFile
+ * @param out
+ * PrintStream to output the information.
+ * @param conf
+ * The configuration object.
+ * @throws IOException
+ */
+ static public void dumpInfo(String file, PrintStream out, Configuration conf)
+ throws IOException {
+ final int maxKeySampleLen = 16;
+ Path path = new Path(file);
+ FileSystem fs = path.getFileSystem(conf);
+ long length = fs.getFileStatus(path).getLen();
+ FSDataInputStream fsdis = fs.open(path);
+ TFile.Reader reader = new TFile.Reader(fsdis, length, conf);
+ try {
+ LinkedHashMap<String, String> properties =
+ new LinkedHashMap<String, String>();
+ int blockCnt = reader.readerBCF.getBlockCount();
+ int metaBlkCnt = reader.readerBCF.metaIndex.index.size();
+ properties.put("BCFile Version", reader.readerBCF.version.toString());
+ properties.put("TFile Version", reader.tfileMeta.version.toString());
+ properties.put("File Length", Long.toString(length));
+ properties.put("Data Compression", reader.readerBCF
+ .getDefaultCompressionName());
+ properties.put("Record Count", Long.toString(reader.getEntryCount()));
+ properties.put("Sorted", Boolean.toString(reader.isSorted()));
+ if (reader.isSorted()) {
+ properties.put("Comparator", reader.getComparatorName());
+ }
+ properties.put("Data Block Count", Integer.toString(blockCnt));
+ long dataSize = 0, dataSizeUncompressed = 0;
+ if (blockCnt > 0) {
+ for (int i = 0; i < blockCnt; ++i) {
+ BlockRegion region =
+ reader.readerBCF.dataIndex.getBlockRegionList().get(i);
+ dataSize += region.getCompressedSize();
+ dataSizeUncompressed += region.getRawSize();
+ }
+ properties.put("Data Block Bytes", Long.toString(dataSize));
+ if (reader.readerBCF.getDefaultCompressionName() != "none") {
+ properties.put("Data Block Uncompressed Bytes", Long
+ .toString(dataSizeUncompressed));
+ properties.put("Data Block Compression Ratio", String.format(
+ "1:%.1f", (double) dataSizeUncompressed / dataSize));
+ }
+ }
+
+ properties.put("Meta Block Count", Integer.toString(metaBlkCnt));
+ long metaSize = 0, metaSizeUncompressed = 0;
+ if (metaBlkCnt > 0) {
+ Collection<MetaIndexEntry> metaBlks =
+ reader.readerBCF.metaIndex.index.values();
+ boolean calculateCompression = false;
+ for (Iterator<MetaIndexEntry> it = metaBlks.iterator(); it.hasNext();) {
+ MetaIndexEntry e = it.next();
+ metaSize += e.getRegion().getCompressedSize();
+ metaSizeUncompressed += e.getRegion().getRawSize();
+ if (e.getCompressionAlgorithm() != Compression.Algorithm.NONE) {
+ calculateCompression = true;
+ }
+ }
+ properties.put("Meta Block Bytes", Long.toString(metaSize));
+ if (calculateCompression) {
+ properties.put("Meta Block Uncompressed Bytes", Long
+ .toString(metaSizeUncompressed));
+ properties.put("Meta Block Compression Ratio", String.format(
+ "1:%.1f", (double) metaSizeUncompressed / metaSize));
+ }
+ }
+ properties.put("Meta-Data Size Ratio", String.format("1:%.1f",
+ (double) dataSize / metaSize));
+ long leftOverBytes = length - dataSize - metaSize;
+ long miscSize =
+ BCFile.Magic.size() * 2 + Long.SIZE / Byte.SIZE + Version.size();
+ long metaIndexSize = leftOverBytes - miscSize;
+ properties.put("Meta Block Index Bytes", Long.toString(metaIndexSize));
+ properties.put("Headers Etc Bytes", Long.toString(miscSize));
+ // Now output the properties table.
+ int maxKeyLength = 0;
+ Set<Map.Entry<String, String>> entrySet = properties.entrySet();
+ for (Iterator<Map.Entry<String, String>> it = entrySet.iterator(); it
+ .hasNext();) {
+ Map.Entry<String, String> e = it.next();
+ if (e.getKey().length() > maxKeyLength) {
+ maxKeyLength = e.getKey().length();
+ }
+ }
+ for (Iterator<Map.Entry<String, String>> it = entrySet.iterator(); it
+ .hasNext();) {
+ Map.Entry<String, String> e = it.next();
+ out.printf("%s : %s\n", Align.format(e.getKey(), maxKeyLength,
+ Align.LEFT), e.getValue());
+ }
+ out.println();
+ reader.checkTFileDataIndex();
+ if (blockCnt > 0) {
+ String blkID = "Data-Block";
+ int blkIDWidth = Align.calculateWidth(blkID, blockCnt);
+ int blkIDWidth2 = Align.calculateWidth("", blockCnt);
+ String offset = "Offset";
+ int offsetWidth = Align.calculateWidth(offset, length);
+ String blkLen = "Length";
+ int blkLenWidth =
+ Align.calculateWidth(blkLen, dataSize / blockCnt * 10);
+ String rawSize = "Raw-Size";
+ int rawSizeWidth =
+ Align.calculateWidth(rawSize, dataSizeUncompressed / blockCnt * 10);
+ String records = "Records";
+ int recordsWidth =
+ Align.calculateWidth(records, reader.getEntryCount() / blockCnt
+ * 10);
+ String endKey = "End-Key";
+ int endKeyWidth = Math.max(endKey.length(), maxKeySampleLen * 2 + 5);
+
+ out.printf("%s %s %s %s %s %s\n", Align.format(blkID, blkIDWidth,
+ Align.CENTER), Align.format(offset, offsetWidth, Align.CENTER),
+ Align.format(blkLen, blkLenWidth, Align.CENTER), Align.format(
+ rawSize, rawSizeWidth, Align.CENTER), Align.format(records,
+ recordsWidth, Align.CENTER), Align.format(endKey, endKeyWidth,
+ Align.LEFT));
+
+ for (int i = 0; i < blockCnt; ++i) {
+ BlockRegion region =
+ reader.readerBCF.dataIndex.getBlockRegionList().get(i);
+ TFileIndexEntry indexEntry = reader.tfileIndex.getEntry(i);
+ out.printf("%s %s %s %s %s ", Align.format(Align.format(i,
+ blkIDWidth2, Align.ZERO_PADDED), blkIDWidth, Align.LEFT), Align
+ .format(region.getOffset(), offsetWidth, Align.LEFT), Align
+ .format(region.getCompressedSize(), blkLenWidth, Align.LEFT),
+ Align.format(region.getRawSize(), rawSizeWidth, Align.LEFT),
+ Align.format(indexEntry.kvEntries, recordsWidth, Align.LEFT));
+ byte[] key = indexEntry.key;
+ boolean asAscii = true;
+ int sampleLen = Math.min(maxKeySampleLen, key.length);
+ for (int j = 0; j < sampleLen; ++j) {
+ byte b = key[j];
+ if ((b < 32 && b != 9) || (b == 127)) {
+ asAscii = false;
+ }
+ }
+ if (!asAscii) {
+ out.print("0X");
+ for (int j = 0; j < sampleLen; ++j) {
+ byte b = key[i];
+ out.printf("%X", b);
+ }
+ } else {
+ out.print(new String(key, 0, sampleLen));
+ }
+ if (sampleLen < key.length) {
+ out.print("...");
+ }
+ out.println();
+ }
+ }
+
+ out.println();
+ if (metaBlkCnt > 0) {
+ String name = "Meta-Block";
+ int maxNameLen = 0;
+ Set<Map.Entry<String, MetaIndexEntry>> metaBlkEntrySet =
+ reader.readerBCF.metaIndex.index.entrySet();
+ for (Iterator<Map.Entry<String, MetaIndexEntry>> it =
+ metaBlkEntrySet.iterator(); it.hasNext();) {
+ Map.Entry<String, MetaIndexEntry> e = it.next();
+ if (e.getKey().length() > maxNameLen) {
+ maxNameLen = e.getKey().length();
+ }
+ }
+ int nameWidth = Math.max(name.length(), maxNameLen);
+ String offset = "Offset";
+ int offsetWidth = Align.calculateWidth(offset, length);
+ String blkLen = "Length";
+ int blkLenWidth =
+ Align.calculateWidth(blkLen, metaSize / metaBlkCnt * 10);
+ String rawSize = "Raw-Size";
+ int rawSizeWidth =
+ Align.calculateWidth(rawSize, metaSizeUncompressed / metaBlkCnt
+ * 10);
+ String compression = "Compression";
+ int compressionWidth = compression.length();
+ out.printf("%s %s %s %s %s\n", Align.format(name, nameWidth,
+ Align.CENTER), Align.format(offset, offsetWidth, Align.CENTER),
+ Align.format(blkLen, blkLenWidth, Align.CENTER), Align.format(
+ rawSize, rawSizeWidth, Align.CENTER), Align.format(compression,
+ compressionWidth, Align.LEFT));
+
+ for (Iterator<Map.Entry<String, MetaIndexEntry>> it =
+ metaBlkEntrySet.iterator(); it.hasNext();) {
+ Map.Entry<String, MetaIndexEntry> e = it.next();
+ String blkName = e.getValue().getMetaName();
+ BlockRegion region = e.getValue().getRegion();
+ String blkCompression =
+ e.getValue().getCompressionAlgorithm().getName();
+ out.printf("%s %s %s %s %s\n", Align.format(blkName, nameWidth,
+ Align.LEFT), Align.format(region.getOffset(), offsetWidth,
+ Align.LEFT), Align.format(region.getCompressedSize(),
+ blkLenWidth, Align.LEFT), Align.format(region.getRawSize(),
+ rawSizeWidth, Align.LEFT), Align.format(blkCompression,
+ compressionWidth, Align.LEFT));
+ }
+ }
+ } finally {
+ IOUtils.cleanup(LOG, reader, fsdis);
+ }
+ }
+}
Added: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/tfile/Utils.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/tfile/Utils.java?rev=883836&view=auto
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/tfile/Utils.java (added)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/tfile/Utils.java Tue Nov 24 19:54:19 2009
@@ -0,0 +1,517 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.zebra.tfile;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Comparator;
+import java.util.List;
+
+import org.apache.hadoop.io.Text;
+
+/**
+ * Supporting Utility classes used by TFile, and shared by users of TFile.
+ */
+public final class Utils {
+
+ /**
+ * Prevent the instantiation of Utils.
+ */
+ private Utils() {
+ // nothing
+ }
+
+ /**
+ * Encoding an integer into a variable-length encoding format. Synonymous to
+ * <code>Utils#writeVLong(out, n)</code>.
+ *
+ * @param out
+ * output stream
+ * @param n
+ * The integer to be encoded
+ * @throws IOException
+ * @see Utils#writeVLong(DataOutput, long)
+ */
+ public static void writeVInt(DataOutput out, int n) throws IOException {
+ writeVLong(out, n);
+ }
+
+ /**
+ * Encoding a Long integer into a variable-length encoding format.
+ * <ul>
+ * <li>if n in [-32, 127): encode in one byte with the actual value.
+ * Otherwise,
+ * <li>if n in [-20*2^8, 20*2^8): encode in two bytes: byte[0] = n/256 - 52;
+ * byte[1]=n&0xff. Otherwise,
+ * <li>if n IN [-16*2^16, 16*2^16): encode in three bytes: byte[0]=n/2^16 -
+ * 88; byte[1]=(n>>8)&0xff; byte[2]=n&0xff. Otherwise,
+ * <li>if n in [-8*2^24, 8*2^24): encode in four bytes: byte[0]=n/2^24 - 112;
+ * byte[1] = (n>>16)&0xff; byte[2] = (n>>8)&0xff; byte[3]=n&0xff. Otherwise:
+ * <li>if n in [-2^31, 2^31): encode in five bytes: byte[0]=-125; byte[1] =
+ * (n>>24)&0xff; byte[2]=(n>>16)&0xff; byte[3]=(n>>8)&0xff; byte[4]=n&0xff;
+ * <li>if n in [-2^39, 2^39): encode in six bytes: byte[0]=-124; byte[1] =
+ * (n>>32)&0xff; byte[2]=(n>>24)&0xff; byte[3]=(n>>16)&0xff;
+ * byte[4]=(n>>8)&0xff; byte[5]=n&0xff
+ * <li>if n in [-2^47, 2^47): encode in seven bytes: byte[0]=-123; byte[1] =
+ * (n>>40)&0xff; byte[2]=(n>>32)&0xff; byte[3]=(n>>24)&0xff;
+ * byte[4]=(n>>16)&0xff; byte[5]=(n>>8)&0xff; byte[6]=n&0xff;
+ * <li>if n in [-2^55, 2^55): encode in eight bytes: byte[0]=-122; byte[1] =
+ * (n>>48)&0xff; byte[2] = (n>>40)&0xff; byte[3]=(n>>32)&0xff;
+ * byte[4]=(n>>24)&0xff; byte[5]=(n>>16)&0xff; byte[6]=(n>>8)&0xff;
+ * byte[7]=n&0xff;
+ * <li>if n in [-2^63, 2^63): encode in nine bytes: byte[0]=-121; byte[1] =
+ * (n>>54)&0xff; byte[2] = (n>>48)&0xff; byte[3] = (n>>40)&0xff;
+ * byte[4]=(n>>32)&0xff; byte[5]=(n>>24)&0xff; byte[6]=(n>>16)&0xff;
+ * byte[7]=(n>>8)&0xff; byte[8]=n&0xff;
+ * </ul>
+ *
+ * @param out
+ * output stream
+ * @param n
+ * the integer number
+ * @throws IOException
+ */
+ @SuppressWarnings("fallthrough")
+ public static void writeVLong(DataOutput out, long n) throws IOException {
+ if ((n < 128) && (n >= -32)) {
+ out.writeByte((int) n);
+ return;
+ }
+
+ long un = (n < 0) ? ~n : n;
+ // how many bytes do we need to represent the number with sign bit?
+ int len = (Long.SIZE - Long.numberOfLeadingZeros(un)) / 8 + 1;
+ int firstByte = (int) (n >> ((len - 1) * 8));
+ switch (len) {
+ case 1:
+ // fall it through to firstByte==-1, len=2.
+ firstByte >>= 8;
+ case 2:
+ if ((firstByte < 20) && (firstByte >= -20)) {
+ out.writeByte(firstByte - 52);
+ out.writeByte((int) n);
+ return;
+ }
+ // fall it through to firstByte==0/-1, len=3.
+ firstByte >>= 8;
+ case 3:
+ if ((firstByte < 16) && (firstByte >= -16)) {
+ out.writeByte(firstByte - 88);
+ out.writeShort((int) n);
+ return;
+ }
+ // fall it through to firstByte==0/-1, len=4.
+ firstByte >>= 8;
+ case 4:
+ if ((firstByte < 8) && (firstByte >= -8)) {
+ out.writeByte(firstByte - 112);
+ out.writeShort(((int) n) >>> 8);
+ out.writeByte((int) n);
+ return;
+ }
+ out.writeByte(len - 129);
+ out.writeInt((int) n);
+ return;
+ case 5:
+ out.writeByte(len - 129);
+ out.writeInt((int) (n >>> 8));
+ out.writeByte((int) n);
+ return;
+ case 6:
+ out.writeByte(len - 129);
+ out.writeInt((int) (n >>> 16));
+ out.writeShort((int) n);
+ return;
+ case 7:
+ out.writeByte(len - 129);
+ out.writeInt((int) (n >>> 24));
+ out.writeShort((int) (n >>> 8));
+ out.writeByte((int) n);
+ return;
+ case 8:
+ out.writeByte(len - 129);
+ out.writeLong(n);
+ return;
+ default:
+ throw new RuntimeException("Internel error");
+ }
+ }
+
+ /**
+ * Decoding the variable-length integer. Synonymous to
+ * <code>(int)Utils#readVLong(in)</code>.
+ *
+ * @param in
+ * input stream
+ * @return the decoded integer
+ * @throws IOException
+ *
+ * @see Utils#readVLong(DataInput)
+ */
+ public static int readVInt(DataInput in) throws IOException {
+ long ret = readVLong(in);
+ if ((ret > Integer.MAX_VALUE) || (ret < Integer.MIN_VALUE)) {
+ throw new RuntimeException(
+ "Number too large to be represented as Integer");
+ }
+ return (int) ret;
+ }
+
+ /**
+ * Decoding the variable-length integer. Suppose the value of the first byte
+ * is FB, and the following bytes are NB[*].
+ * <ul>
+ * <li>if (FB >= -32), return (long)FB;
+ * <li>if (FB in [-72, -33]), return (FB+52)<<8 + NB[0]&0xff;
+ * <li>if (FB in [-104, -73]), return (FB+88)<<16 + (NB[0]&0xff)<<8 +
+ * NB[1]&0xff;
+ * <li>if (FB in [-120, -105]), return (FB+112)<<24 + (NB[0]&0xff)<<16 +
+ * (NB[1]&0xff)<<8 + NB[2]&0xff;
+ * <li>if (FB in [-128, -121]), return interpret NB[FB+129] as a signed
+ * big-endian integer.
+ *
+ * @param in
+ * input stream
+ * @return the decoded long integer.
+ * @throws IOException
+ */
+
+ public static long readVLong(DataInput in) throws IOException {
+ int firstByte = in.readByte();
+ if (firstByte >= -32) {
+ return firstByte;
+ }
+
+ switch ((firstByte + 128) / 8) {
+ case 11:
+ case 10:
+ case 9:
+ case 8:
+ case 7:
+ return ((firstByte + 52) << 8) | in.readUnsignedByte();
+ case 6:
+ case 5:
+ case 4:
+ case 3:
+ return ((firstByte + 88) << 16) | in.readUnsignedShort();
+ case 2:
+ case 1:
+ return ((firstByte + 112) << 24) | (in.readUnsignedShort() << 8)
+ | in.readUnsignedByte();
+ case 0:
+ int len = firstByte + 129;
+ switch (len) {
+ case 4:
+ return in.readInt();
+ case 5:
+ return ((long) in.readInt()) << 8 | in.readUnsignedByte();
+ case 6:
+ return ((long) in.readInt()) << 16 | in.readUnsignedShort();
+ case 7:
+ return ((long) in.readInt()) << 24 | (in.readUnsignedShort() << 8)
+ | in.readUnsignedByte();
+ case 8:
+ return in.readLong();
+ default:
+ throw new IOException("Corrupted VLong encoding");
+ }
+ default:
+ throw new RuntimeException("Internal error");
+ }
+ }
+
+ /**
+ * Write a String as a VInt n, followed by n Bytes as in Text format.
+ *
+ * @param out
+ * @param s
+ * @throws IOException
+ */
+ public static void writeString(DataOutput out, String s) throws IOException {
+ if (s != null) {
+ Text text = new Text(s);
+ byte[] buffer = text.getBytes();
+ int len = text.getLength();
+ writeVInt(out, len);
+ out.write(buffer, 0, len);
+ } else {
+ writeVInt(out, -1);
+ }
+ }
+
+ /**
+ * Read a String as a VInt n, followed by n Bytes in Text format.
+ *
+ * @param in
+ * The input stream.
+ * @return The string
+ * @throws IOException
+ */
+ public static String readString(DataInput in) throws IOException {
+ int length = readVInt(in);
+ if (length == -1) return null;
+ byte[] buffer = new byte[length];
+ in.readFully(buffer);
+ return Text.decode(buffer);
+ }
+
+ /**
+ * A generic Version class. We suggest applications built on top of TFile use
+ * this class to maintain version information in their meta blocks.
+ *
+ * A version number consists of a major version and a minor version. The
+ * suggested usage of major and minor version number is to increment major
+ * version number when the new storage format is not backward compatible, and
+ * increment the minor version otherwise.
+ */
+ public static final class Version implements Comparable<Version> {
+ private final short major;
+ private final short minor;
+
+ /**
+ * Construct the Version object by reading from the input stream.
+ *
+ * @param in
+ * input stream
+ * @throws IOException
+ */
+ public Version(DataInput in) throws IOException {
+ major = in.readShort();
+ minor = in.readShort();
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param major
+ * major version.
+ * @param minor
+ * minor version.
+ */
+ public Version(short major, short minor) {
+ this.major = major;
+ this.minor = minor;
+ }
+
+ /**
+ * Write the objec to a DataOutput. The serialized format of the Version is
+ * major version followed by minor version, both as big-endian short
+ * integers.
+ *
+ * @param out
+ * The DataOutput object.
+ * @throws IOException
+ */
+ public void write(DataOutput out) throws IOException {
+ out.writeShort(major);
+ out.writeShort(minor);
+ }
+
+ /**
+ * Get the major version.
+ *
+ * @return Major version.
+ */
+ public int getMajor() {
+ return major;
+ }
+
+ /**
+ * Get the minor version.
+ *
+ * @return The minor version.
+ */
+ public int getMinor() {
+ return minor;
+ }
+
+ /**
+ * Get the size of the serialized Version object.
+ *
+ * @return serialized size of the version object.
+ */
+ public static int size() {
+ return (Short.SIZE + Short.SIZE) / Byte.SIZE;
+ }
+
+ /**
+ * Return a string representation of the version.
+ */
+ @Override
+ public String toString() {
+ return new StringBuilder("v").append(major).append(".").append(minor)
+ .toString();
+ }
+
+ /**
+ * Test compatibility.
+ *
+ * @param other
+ * The Version object to test compatibility with.
+ * @return true if both versions have the same major version number; false
+ * otherwise.
+ */
+ public boolean compatibleWith(Version other) {
+ return major == other.major;
+ }
+
+ /**
+ * Compare this version with another version.
+ */
+ @Override
+ public int compareTo(Version that) {
+ if (major != that.major) {
+ return major - that.major;
+ }
+ return minor - that.minor;
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (this == other) return true;
+ if (!(other instanceof Version)) return false;
+ return compareTo((Version) other) == 0;
+ }
+
+ @Override
+ public int hashCode() {
+ return (major << 16 + minor);
+ }
+ }
+
+ /**
+ * Lower bound binary search. Find the index to the first element in the list
+ * that compares greater than or equal to key.
+ *
+ * @param <T>
+ * Type of the input key.
+ * @param list
+ * The list
+ * @param key
+ * The input key.
+ * @param cmp
+ * Comparator for the key.
+ * @return The index to the desired element if it exists; or list.size()
+ * otherwise.
+ */
+ public static <T> int lowerBound(List<? extends T> list, T key,
+ Comparator<? super T> cmp) {
+ int low = 0;
+ int high = list.size();
+
+ while (low < high) {
+ int mid = (low + high) >>> 1;
+ T midVal = list.get(mid);
+ int ret = cmp.compare(midVal, key);
+ if (ret < 0)
+ low = mid + 1;
+ else high = mid;
+ }
+ return low;
+ }
+
+ /**
+ * Upper bound binary search. Find the index to the first element in the list
+ * that compares greater than the input key.
+ *
+ * @param <T>
+ * Type of the input key.
+ * @param list
+ * The list
+ * @param key
+ * The input key.
+ * @param cmp
+ * Comparator for the key.
+ * @return The index to the desired element if it exists; or list.size()
+ * otherwise.
+ */
+ public static <T> int upperBound(List<? extends T> list, T key,
+ Comparator<? super T> cmp) {
+ int low = 0;
+ int high = list.size();
+
+ while (low < high) {
+ int mid = (low + high) >>> 1;
+ T midVal = list.get(mid);
+ int ret = cmp.compare(midVal, key);
+ if (ret <= 0)
+ low = mid + 1;
+ else high = mid;
+ }
+ return low;
+ }
+
+ /**
+ * Lower bound binary search. Find the index to the first element in the list
+ * that compares greater than or equal to key.
+ *
+ * @param <T>
+ * Type of the input key.
+ * @param list
+ * The list
+ * @param key
+ * The input key.
+ * @return The index to the desired element if it exists; or list.size()
+ * otherwise.
+ */
+ public static <T> int lowerBound(List<? extends Comparable<? super T>> list,
+ T key) {
+ int low = 0;
+ int high = list.size();
+
+ while (low < high) {
+ int mid = (low + high) >>> 1;
+ Comparable<? super T> midVal = list.get(mid);
+ int ret = midVal.compareTo(key);
+ if (ret < 0)
+ low = mid + 1;
+ else high = mid;
+ }
+ return low;
+ }
+
+ /**
+ * Upper bound binary search. Find the index to the first element in the list
+ * that compares greater than the input key.
+ *
+ * @param <T>
+ * Type of the input key.
+ * @param list
+ * The list
+ * @param key
+ * The input key.
+ * @return The index to the desired element if it exists; or list.size()
+ * otherwise.
+ */
+ public static <T> int upperBound(List<? extends Comparable<? super T>> list,
+ T key) {
+ int low = 0;
+ int high = list.size();
+
+ while (low < high) {
+ int mid = (low + high) >>> 1;
+ Comparable<? super T> midVal = list.get(mid);
+ int ret = midVal.compareTo(key);
+ if (ret <= 0)
+ low = mid + 1;
+ else high = mid;
+ }
+ return low;
+ }
+}
Modified: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/types/CGSchema.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/types/CGSchema.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/types/CGSchema.java (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/types/CGSchema.java Tue Nov 24 19:54:19 2009
@@ -27,7 +27,7 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.WritableUtils;
-import org.apache.hadoop.io.file.tfile.Utils.Version;
+import org.apache.hadoop.zebra.tfile.Utils.Version;
import org.apache.hadoop.zebra.schema.Schema;
import org.apache.hadoop.zebra.parser.ParseException;
@@ -50,7 +50,6 @@
// tmp schema file name, used as a flag of unfinished CG
private final static String SCHEMA_FILE = ".schema";
- private final static String DEFAULT_COMPARATOR = "memcmp";
// schema version, should be same as BasicTable's most of the time
private final static Version SCHEMA_VERSION =
new Version((short) 1, (short) 1);
@@ -87,19 +86,18 @@
this.version = SCHEMA_VERSION;
}
- public CGSchema(Schema schema, boolean sorted) {
+ public CGSchema(Schema schema, boolean sorted, String comparator) {
this.sorted = sorted;
- this.comparator = (sorted) ? DEFAULT_COMPARATOR : "";
+ this.comparator = (sorted) ? (comparator == null ? SortInfo.DEFAULT_COMPARATOR : comparator) : "";
this.schema = schema;
this.version = SCHEMA_VERSION;
}
- public CGSchema(Schema schema, boolean sorted, String name, String serializer, String compressor, String owner, String group, short perm) {
- this(schema, sorted);
+ public CGSchema(Schema schema, boolean sorted, String comparator, String name, String serializer, String compressor, String owner, String group, short perm) {
+ this(schema, sorted, comparator);
this.name = name;
this.serializer = serializer;
this.compressor = compressor;
-// this.owner = owner;
this.group = group;
this.perm = perm;
}
Modified: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/types/Partition.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/types/Partition.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/types/Partition.java (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/types/Partition.java Tue Nov 24 19:54:19 2009
@@ -41,6 +41,9 @@
* insertions and queries respectively.
*/
public class Partition {
+ /**
+ * Storage split types
+ */
public enum SplitType {
NONE, RECORD, COLLECTION, MAP
}
@@ -87,7 +90,7 @@
* add map keys
* return false if any key already exists but no rollback!
*/
- public boolean addKeys(HashSet<String> keys)
+ public boolean addKeys(HashSet<String> keys, HashSet<String> columnKeySet)
{
if (keySet == null)
keySet = new HashSet<String>();
@@ -95,6 +98,11 @@
for (Iterator<String> it = keys.iterator(); it.hasNext(); )
{
key = it.next();
+
+ // if the key is used in another CG?
+ if (!columnKeySet.add(key))
+ return false;
+
if (!keySet.add(key))
return false;
}
@@ -144,6 +152,7 @@
private HashSet<String> mSplitColumns = new HashSet<String>();
private ColumnMappingEntry mCGIndex = null;
private String mCGName = null; // fully qualified name
+ private HashSet<String> keySet = null;
private SplitType stype = SplitType.NONE;
private boolean splitChild;
@@ -157,7 +166,9 @@
mSplitMaps.add(cme);
// multiple map splits on one MAP column is allowed!
mSplitColumns.add(name);
- return cme.addKeys(keys);
+ if (keySet == null)
+ keySet = new HashSet<String>();
+ return cme.addKeys(keys, keySet);
}
/**
@@ -317,7 +328,7 @@
*/
public CGSchema generateDefaultCGSchema(String name, String compressor,
String serializer, String owner, String group,
- short perm, final int defaultCGIndex) throws ParseException {
+ short perm, final int defaultCGIndex, String comparator) throws ParseException {
Schema schema = new Schema();
Schema.ColumnSchema fs;
for (int i = 0; i < mSchema.getNumColumns(); i++) {
@@ -369,13 +380,13 @@
}
}
CGSchema defaultSchema =
- (schema.getNumColumns() == 0 ? null : new CGSchema(schema, false, name, serializer, compressor, owner, group, perm));
+ (schema.getNumColumns() == 0 ? null : new CGSchema(schema, false, comparator, name, serializer, compressor, owner, group, perm));
return defaultSchema;
}
/**
* returns "hash key-to-(sub)column" map on a (sub)column which is MAP-split
- * aross different hash keys
+ * across different hash keys
*/
public HashSet<PartitionInfo.ColumnMappingEntry> getSplitMap(
Schema.ColumnSchema fs) {
@@ -661,17 +672,21 @@
private Projection mProjection = null;
private ArrayList<PartitionedColumn> mPCNeedTmpTuple = new ArrayList<PartitionedColumn>();
private ArrayList<PartitionedColumn> mPCNeedMap = new ArrayList<PartitionedColumn>();
+ private String comparator;
+ private boolean mSorted;
+ private SortInfo mSortInfo;
/*
* ctor used for LOAD
*/
- public Partition(Schema schema, Projection projection, String storage)
+ public Partition(Schema schema, Projection projection, String storage, String comparator)
throws ParseException, IOException {
mSchema = schema;
TableStorageParser sparser =
- new TableStorageParser(new StringReader(storage), this, mSchema);
+ new TableStorageParser(new StringReader(storage), this, mSchema, comparator);
mPartitionInfo = new PartitionInfo(schema);
- ArrayList<CGSchema> cgschemas = sparser.StorageSchema();
+ ArrayList<CGSchema> cgschemas = new ArrayList<CGSchema>();
+ sparser.StorageSchema(cgschemas);
mCGSchemas = cgschemas.toArray(new CGSchema[cgschemas.size()]);
mProjection = projection;
Schema projSchema = projection.getProjectionSchema();
@@ -711,8 +726,6 @@
cgindex = mapentry.getKey();
if (cgindex == null)
throw new AssertionError( "Internal Logical Error: RECORD does not have a CG index.");
- if (mapentry.getValue() != null)
- throw new AssertionError( "Internal Logical Error: RECORD should not have a split key map.");
cgentry = getCGEntry(cgindex.getCGIndex());
parCol = new PartitionedColumn(i, false);
cgentry.addUser(parCol, name);
@@ -747,15 +760,34 @@
/*
* ctor used by STORE
*/
- public Partition(final String schema, final String storage)
+ public Partition(final String schema, final String storage, String comparator, String sortColumns)
+ throws ParseException, IOException
+ {
+ TableSchemaParser parser = new TableSchemaParser(new StringReader(schema));
+ mSchema = parser.RecordSchema(null);
+ mSortInfo = SortInfo.parse(sortColumns, mSchema, comparator);
+ mSorted = (mSortInfo != null && mSortInfo.size() > 0);
+ this.comparator = (mSorted ? mSortInfo.getComparator() : "");
+ storeConst(storage);
+ }
+
+ public Partition(String schema, final String storage, String comparator)
throws ParseException, IOException
{
TableSchemaParser parser = new TableSchemaParser(new StringReader(schema));
mSchema = parser.RecordSchema(null);
+ this.comparator = comparator;
+ storeConst(storage);
+ }
+
+ private void storeConst(final String storage)
+ throws ParseException, IOException
+ {
mPartitionInfo = new PartitionInfo(mSchema);
TableStorageParser sparser =
- new TableStorageParser(new StringReader(storage), this, mSchema);
- ArrayList<CGSchema> cgschemas = sparser.StorageSchema();
+ new TableStorageParser(new StringReader(storage), this, mSchema, this.comparator);
+ ArrayList<CGSchema> cgschemas = new ArrayList<CGSchema>();
+ sparser.StorageSchema(cgschemas);
mCGSchemas = cgschemas.toArray(new CGSchema[cgschemas.size()]);
int size = mSchema.getNumColumns();
PartitionInfo.ColumnMappingEntry cgindex;
@@ -785,7 +817,7 @@
} else {
// this subtype is MAP-split
// => need to add splits for all split keys
- handleMapSplit(curCol, fs, i, cgentry);
+ handleMapSplit(curCol, fs, i, cgentry, cgindex.getFieldIndex());
}
}
else {
@@ -805,6 +837,18 @@
mPCNeedMap.get(i).createMap();
}
+ public SortInfo getSortInfo() {
+ return mSortInfo;
+ }
+
+ public boolean isSorted() {
+ return mSorted;
+ }
+
+ public String getComparator() {
+ return comparator;
+ }
+
/**
* returns table schema
*/
@@ -856,10 +900,10 @@
{
if (projectedKeys != null)
{
- pn.mDT = ColumnType.MAP;
+ pn.setDT(ColumnType.MAP);
map = true;
} else {
- pn.mDT = ColumnType.ANY;
+ pn.setDT(ColumnType.ANY);
PartitionInfo.ColumnMappingEntry cme;
for (Iterator<PartitionInfo.ColumnMappingEntry> it = results.iterator(); it.hasNext(); )
{
@@ -985,7 +1029,7 @@
pn.parseName(fs);
Schema.ParsedName oripn = new Schema.ParsedName();
for (int i = 0; i < schema.getNumColumns(); i++) {
- oripn.setName(new String(pn.mName), pn.mDT);
+ oripn.setName(new String(pn.getName()), pn.getDT());
child = schema.getColumn(i);
if (getCGIndex(child) == null) {
// not a CG: go one level lower
@@ -1038,7 +1082,7 @@
PartitionedColumn parent, Schema.ColumnSchema child, int i,
int fi, HashMap<PartitionInfo.ColumnMappingEntry, HashSet<String>> cgindices) throws IOException {
CGEntry cgentry;
- if (pn.mDT == ColumnType.ANY) {
+ if (pn.getDT() == ColumnType.ANY) {
// this subtype is MAP split and the projection is on the whole MAP:
// => need to add stitches for all split keys
@@ -1174,7 +1218,7 @@
else {
// this subfield is MAP-split
// => need to add splits for all split keys
- handleMapSplit(parent, child, i, cgentry);
+ handleMapSplit(parent, child, i, cgentry, cgindex.getFieldIndex());
}
}
}
@@ -1189,12 +1233,13 @@
* @throws IOException
*/
private void handleMapSplit(PartitionedColumn parent,
- Schema.ColumnSchema child, int i, CGEntry cgentry) throws ParseException, IOException {
+ Schema.ColumnSchema child, int i, CGEntry cgentry, int childProjIndex) throws ParseException, IOException {
// first the map partitioned column that contain all non-key-partitioned
// hashes
PartitionedColumn mapParCol =
new PartitionedColumn(i, Partition.SplitType.MAP, false);
cgentry.addUser(mapParCol, getCGName(child));
+ mapParCol.setProjIndex(childProjIndex);
mExecs.add(mapParCol); // not a leaf : MAP split needed
mSplitSize++;
parent.addChild(mapParCol);
@@ -1236,7 +1281,6 @@
while (it.hasNext())
it.next().getValue().read();
- TypesUtils.resetTuple(t);
// dispatch
mExecs.get(mStitchSize - 1).setRecord(t);
@@ -1313,9 +1357,9 @@
public CGSchema generateDefaultCGSchema(String name, String compressor, String serializer,
String owner, String group, short perm,
- final int defaultCGIndex) throws ParseException {
+ final int defaultCGIndex, String comparator) throws ParseException {
return mPartitionInfo.generateDefaultCGSchema(name, compressor, serializer,owner, group, perm,
- defaultCGIndex);
+ defaultCGIndex, comparator);
}
public void setSplit(Schema.ColumnSchema fs, SplitType st, SplitType cst, String name, String childName, boolean splitChild) throws ParseException {
Modified: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/types/Projection.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/types/Projection.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/types/Projection.java (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/types/Projection.java Tue Nov 24 19:54:19 2009
@@ -19,6 +19,7 @@
import java.util.HashMap;
import java.util.HashSet;
+import java.util.ArrayList;
import org.apache.hadoop.zebra.schema.Schema;
import org.apache.hadoop.zebra.parser.ParseException;
@@ -27,6 +28,7 @@
*/
public class Projection {
+ public static final String source_table_vcolumn_name = "source_table";
private Schema mProjection; // schema as needed by the projection
private int mNumColumns;
private String mProjStr;
@@ -52,6 +54,43 @@
}
/**
+ * if a column name is on a virtual column
+ */
+ public static boolean isVirtualColumn(String name)
+ {
+ if (name == null || name.isEmpty())
+ return false;
+ return name.trim().equalsIgnoreCase(source_table_vcolumn_name);
+ }
+
+ /**
+ * Get the indices of all virtual columns
+ */
+ public static Integer[] getVirtualColumnIndices(String projection)
+ {
+ if (projection == null)
+ return null;
+ String[] colnames = projection.trim().split(Schema.COLUMN_DELIMITER);
+ int size = colnames.length, realsize = 0;
+ ArrayList<Integer> vcol = new ArrayList();
+
+ for (int i = 0; i < size; i++)
+ {
+ if (Projection.isVirtualColumn(colnames[i]))
+ {
+ vcol.add(i);
+ }
+ }
+ Integer[] result = null;
+ if (!vcol.isEmpty())
+ {
+ result = new Integer[vcol.size()];
+ vcol.toArray(result);
+ }
+ return result;
+ }
+
+ /**
* ctor for partial projection
*/
public Projection(Schema s, String projection) throws ParseException
Added: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/types/SortInfo.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/types/SortInfo.java?rev=883836&view=auto
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/types/SortInfo.java (added)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/types/SortInfo.java Tue Nov 24 19:54:19 2009
@@ -0,0 +1,199 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.zebra.types;
+
+import java.io.IOException;
+import org.apache.hadoop.zebra.schema.Schema;
+import org.apache.hadoop.zebra.schema.ColumnType;
+import org.apache.hadoop.zebra.tfile.TFile;
+
+/**
+ * Sortness related Information
+ */
+public class SortInfo {
+ public static final String DEFAULT_COMPARATOR = TFile.COMPARATOR_MEMCMP;
+ private boolean global = true;
+ private String[] columns;
+ private int[] indices = null;
+ private ColumnType[] types = null;
+ private String comparator = null;
+ private Schema schema = null;
+ private static final String SORTED_COLUMN_DELIMITER = ",";
+
+ private SortInfo(String[] columns, int[] sortIndices, ColumnType[] sortColTypes, String comparator, Schema schema)
+ {
+ this.columns = columns;
+ this.indices = sortIndices;
+ this.comparator = comparator;
+ this.schema = schema;
+ this.types = sortColTypes;
+ }
+
+ /**
+ * Get an array of the sorted column names with the first column
+ * being the primary sort key, the second column being the
+ * secondary sort key, ..., etc.
+ *
+ * @return an array of strings of sorted column names
+ */
+ public String[] getSortColumnNames() {
+ return columns;
+ }
+
+ /**
+ * Get an array of zebra types of the sorted columns with the first column
+ * being the primary sort key, the second column being the
+ * secondary sort key, ..., etc.
+ *
+ * @return an array of strings of sorted column names
+ */
+ public ColumnType[] getSortColumnTypes() {
+ return types;
+ }
+
+ /**
+ * Get an array of column indices in schema of the sorted columns with the first column
+ * being the primary sort key, the second column being the
+ * secondary sort key, ..., etc.
+ *
+ * @return an array of strings of sorted column names
+ */
+ public int[] getSortIndices() {
+ return indices;
+ }
+
+ /**
+ * Get the number of sorted columns
+ *
+ * @return number of sorted columns
+ */
+ public int size() {
+ return (columns == null ? 0 : columns.length);
+ }
+
+ /**
+ * Get the comparator name
+ *
+ * @return comparator name
+ */
+ public String getComparator() {
+ return comparator;
+ }
+
+ /**
+ * Check if the two SortInfo objects are equal
+ *
+ * @return true if one's sort columns is equal to a leading portion of the other's
+ */
+ public boolean equals(String sortcolumns, String comparator) throws IOException {
+ if (sortcolumns == null || sortcolumns.trim().isEmpty())
+ {
+ return false;
+ }
+ String[] columns = sortcolumns.trim().split(SORTED_COLUMN_DELIMITER);
+ for (String column : columns)
+ {
+ if (schema.getColumn(column) == null)
+ throw new IOException(column + " does not exist in schema");
+ }
+ if (this.columns.length <= columns.length)
+ {
+ for (int i = 0; i < this.columns.length; i++)
+ {
+ if (!this.columns[i].equals(columns[i]))
+ return false;
+ }
+ } else {
+ for (int i = 0; i < columns.length; i++)
+ {
+ if (!columns[i].equals(this.columns[i]))
+ return false;
+ }
+ }
+ if (this.comparator == null && comparator == null)
+ {
+ return true;
+ } else if (this.comparator != null && comparator != null)
+ {
+ return (this.comparator.equals(comparator));
+ } else {
+ return false;
+ }
+ }
+
+ /**
+ * Build a SortInfo object from sort column names, schema, and comparator
+ *
+ * @param sortStr
+ * comma-separated sort column names
+ * @param schema
+ * schema of the Zebra table for the sort columns
+ * @param comparator
+ * comparator name
+ * @return newly built SortInfo object
+ */
+ public static SortInfo parse(String sortStr, Schema schema, String comparator) throws IOException
+ {
+ if (sortStr == null || sortStr.trim().isEmpty())
+ {
+ return null;
+ }
+ String[] sortedColumns = sortStr.trim().split(SORTED_COLUMN_DELIMITER);
+ int[] sortColIndices = new int[sortedColumns.length];
+ ColumnType[] sortColTypes = new ColumnType[sortedColumns.length];
+ Schema.ColumnSchema cs;
+ for (int i = 0; i < sortedColumns.length; i++)
+ {
+ sortedColumns[i] = sortedColumns[i].trim();
+ /*
+ * sanity check the sort column's existence
+ */
+ if ((cs = schema.getColumn(sortedColumns[i])) == null)
+ throw new IOException(sortedColumns[i] + " does not exist in schema");
+ sortColIndices[i] = schema.getColumnIndex(sortedColumns[i]);
+ sortColTypes[i] = schema.getColumn(sortedColumns[i]).getType();
+ }
+ String comparatorInUse = (sortedColumns.length > 0 ?
+ (comparator == null || comparator.isEmpty() ?
+ DEFAULT_COMPARATOR : comparator) : null);
+ return new SortInfo(sortedColumns, sortColIndices, sortColTypes, comparatorInUse, schema);
+ }
+
+ /**
+ * Build a string of comma-separated sort column names from an array of sort column names
+ *
+ * @param names an array of sort column names
+ *
+ * @return a string of comma-separated sort column names
+ */
+ public static String toSortString(String[] names)
+ {
+ if (names == null || names.length == 0)
+ return null;
+
+ StringBuilder sb = new StringBuilder();
+ for (int i = 0; i < names.length; i++)
+ {
+ if (i > 0)
+ sb.append(SORTED_COLUMN_DELIMITER);
+ sb.append(names[i]);
+ }
+ return sb.toString();
+ }
+}
Modified: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/types/SubColumnExtraction.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/types/SubColumnExtraction.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/types/SubColumnExtraction.java (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/types/SubColumnExtraction.java Tue Nov 24 19:54:19 2009
@@ -30,14 +30,14 @@
import org.apache.hadoop.zebra.parser.ParseException;
+/**
+ * This class extracts a subfield from a column or subcolumn stored
+ * in entirety on disk
+ * It should be used only by readers whose serializers do not
+ * support projection
+ */
public class SubColumnExtraction {
- /**
- * This class extracts a subfield from a column or subcolumn stored
- * in entirety on disk
- * It should be used only by readers whose serializers do not
- * support projection
- */
- public static class SubColumn {
+ static class SubColumn {
Schema physical;
Projection projection;
ArrayList<SplitColumn> exec = null;
@@ -77,16 +77,17 @@
pn.setName(name);
fs = physical.getColumnSchema(pn);
if (keySet != null)
- pn.mDT = ColumnType.MAP;
+ pn.setDT(ColumnType.MAP);
if (fs == null)
continue; // skip non-existing field
j = fs.getIndex();
- if (pn.mDT == ColumnType.MAP || pn.mDT == ColumnType.RECORD || pn.mDT == ColumnType.COLLECTION)
+ ColumnType ct = pn.getDT();
+ if (ct == ColumnType.MAP || ct == ColumnType.RECORD || ct == ColumnType.COLLECTION)
{
// record/map subfield is expected
- sc = new SplitColumn(j, pn.mDT);
- if (pn.mDT == ColumnType.MAP)
+ sc = new SplitColumn(j, ct);
+ if (ct == ColumnType.MAP)
sclist.add(sc);
exec.add(sc); // breadth-first
// (i, j) represents the mapping between projection and physical schema
@@ -105,18 +106,19 @@
* build the split executions
*/
private void buildSplit(SplitColumn parent, Schema.ColumnSchema fs,
- Schema.ParsedName pn, final int projIndex, HashSet<String> keys) throws ParseException, ExecException
+ final Schema.ParsedName pn, final int projIndex, HashSet<String> keys) throws ParseException, ExecException
{
// recursive call to get the next level schema
- if (pn.mDT != fs.getType())
+ ColumnType ct = pn.getDT();
+ if (ct != fs.getType())
throw new ParseException(fs.getName()+" is not of proper type.");
String prefix;
int fieldIndex;
SplitColumn sc;
- Partition.SplitType callerDT = (pn.mDT == ColumnType.MAP ? Partition.SplitType.MAP :
- (pn.mDT == ColumnType.RECORD ? Partition.SplitType.RECORD :
- (pn.mDT == ColumnType.COLLECTION ? Partition.SplitType.COLLECTION :
+ Partition.SplitType callerDT = (ct == ColumnType.MAP ? Partition.SplitType.MAP :
+ (ct == ColumnType.RECORD ? Partition.SplitType.RECORD :
+ (ct == ColumnType.COLLECTION ? Partition.SplitType.COLLECTION :
Partition.SplitType.NONE)));
prefix = pn.parseName(fs);
Schema schema = fs.getSchema();
@@ -133,11 +135,12 @@
fieldIndex = 0;
}
- if (pn.mDT != ColumnType.ANY)
+ ct = pn.getDT();
+ if (ct != ColumnType.ANY)
{
// record subfield is expected
- sc = new SplitColumn(fieldIndex, pn.mDT);
- if (pn.mDT == ColumnType.MAP)
+ sc = new SplitColumn(fieldIndex, ct);
+ if (ct == ColumnType.MAP)
sclist.add(sc);
exec.add(sc); // breadth-first
buildSplit(sc, fs, pn, projIndex, null);
@@ -314,7 +317,7 @@
} else if (st == Partition.SplitType.MAP && keys != null) {
String key;
Iterator<String> it;
-
+ Object value;
for (int i = 0; i < size; i++)
{
if (children.get(i).projIndex != -1) // a leaf: set projection directly
@@ -322,7 +325,10 @@
for (it = keys.iterator(); it.hasNext(); )
{
key = it.next();
- ((Map<String, Object>) (((Tuple)children.get(i).leaf.field).get(children.get(i).projIndex))).put(key, ((Map<String, Object>) field).get(key));
+ value = ((Map<String, Object>) field).get(key);
+ if (value == null)
+ continue;
+ ((Map<String, Object>) (((Tuple)children.get(i).leaf.field).get(children.get(i).projIndex))).put(key, value);
}
} else {
for (it = keys.iterator(); it.hasNext(); )
Modified: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/types/TableStorageParser.jjt
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/types/TableStorageParser.jjt?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/types/TableStorageParser.jjt (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/types/TableStorageParser.jjt Tue Nov 24 19:54:19 2009
@@ -30,7 +30,7 @@
import org.apache.hadoop.zebra.types.*;
public class TableStorageParser {
- public TableStorageParser(java.io.Reader in, Partition partition, Schema schema) { this(in); mSchema = schema; this.partition = partition;}
+ public TableStorageParser(java.io.Reader in, Partition partition, Schema schema, String comparator) { this(in); mSchema = schema; this.partition = partition; this.comparator = comparator; }
private Schema mSchema;
private int mDefaultCGIndex = -1;
private String mName = null;
@@ -39,6 +39,7 @@
private short mPerm = -1;
private int mCGCount = 0;
private Partition partition;
+ private String comparator = null;
}
PARSER_END(TableStorageParser)
@@ -54,9 +55,10 @@
TOKEN : { <COMPRESSOR : "lzo" | "gz"> }
TOKEN : { <SERIALIZER : ("pig" | "avro")> }
-TOKEN : { <ORDER : "order by"> }
TOKEN : { <COMPRESS : "compress by"> }
TOKEN : { <SERIALIZE : "serialize by"> }
+TOKEN : { <ASC : "ASC"> }
+TOKEN : { <DESC: "DESC"> }
TOKEN : { <SECURE : "secure by"> }
TOKEN : { <USER : "user"> }
TOKEN : { <GROUP : "group"> }
@@ -64,8 +66,6 @@
TOKEN : { <AS : "as"> }
-
-
TOKEN:
{
<#LETTER : ["a"-"z", "A"-"Z"] >
@@ -77,62 +77,17 @@
| <SHORT : (<OCTAL>){3} >
}
-ArrayList<CGSchema> StorageSchema() throws ParseException :
+void StorageSchema(ArrayList<CGSchema> s) throws ParseException :
{
- ArrayList<CGSchema> s = new ArrayList();
CGSchema fs;
CGSchema defaultSchema;
}
{
try {
- LOOKAHEAD(2) <EOF>
- {
- defaultSchema = partition.generateDefaultCGSchema(mName, mCompressor, mSerializer, mOwner, mGroup, mPerm, 0);
- if (defaultSchema != null)
- s.add(defaultSchema);
-
- // check column group names, add system created names when necessary;
- HashSet<String> cgNames = new HashSet<String>();
- ArrayList<CGSchema> unnamed = new ArrayList<CGSchema>();
- for (int i = 0; i < s.size(); i++) {
- CGSchema cgSchema = s.get(i);
- String str = cgSchema.getName();
- if (str != null) {
- if (!cgNames.add(str)) {
- throw new ParseException("Duplicate column group names.");
- }
- } else {
- unnamed.add(cgSchema);
- }
- }
-
- int digits = 1;
- int total = unnamed.size();
- while (total >= 10) {
- ++digits;
- total /= 10;
- }
- String formatString = "%0" + digits + "d";
-
- int idx = 0;
- for (int i = 0; i < unnamed.size(); i++) {
- CGSchema cgSchema = unnamed.get(i);
- String str = null;
- while (true) {
- str = "CG" + String.format(formatString, idx++);
- if (!cgNames.contains(str)) {
- break;
- }
- }
- cgSchema.setName(str);
- }
- return s;
- }
- |
fs = FieldSchema() {mCGCount++; if (fs != null) s.add(fs);}
(";" fs = FieldSchema() {mCGCount++; if (fs != null) s.add(fs);})* <EOF>
{
- defaultSchema = partition.generateDefaultCGSchema(mName, mCompressor, mSerializer, mOwner, mGroup, mPerm, mDefaultCGIndex == -1 ? mDefaultCGIndex = mCGCount++ : mDefaultCGIndex);
+ defaultSchema = partition.generateDefaultCGSchema(mName, mCompressor, mSerializer, mOwner, mGroup, mPerm, mDefaultCGIndex == -1 ? mDefaultCGIndex = mCGCount++ : mDefaultCGIndex, comparator);
if (defaultSchema != null)
s.add(defaultSchema);
@@ -171,13 +126,29 @@
}
cgSchema.setName(str);
}
- return s;
+ return;
}
} catch (TokenMgrError e) {
throw new ParseException(e.getMessage());
}
}
+boolean ascdsc() throws ParseException :
+{
+ Token t1 = null, t2 = null;
+}
+{
+ (
+ t1 = <ASC>
+ | t2 = <DESC>
+ )
+ {
+ if (t2 != null)
+ return false;
+ return true;
+ }
+}
+
CGSchema FieldSchema() throws ParseException:
{
Token t1 = null, t2 = null;
@@ -310,8 +281,9 @@
mOwner = owner;
mGroup = group;
mPerm = perm;
- } else
- cs = new CGSchema(fs, false, name, serializer, compressor, owner, group, perm);
+ } else {
+ cs = new CGSchema(fs, false, comparator, name, serializer, compressor, owner, group, perm);
+ }
return cs;
}
}
@@ -531,7 +503,7 @@
"{" keys = hashKeys() "}"
{
if(!partition.getPartitionInfo().setKeyCGIndex(schema, mCGCount, colIndex, name, keys))
- throw new ParseException("Column "+name+" specified more than once!");
+ throw new ParseException("Column "+name+" has split keys splecified more than once.");
fs = new Schema.ColumnSchema(name, schema.getSchema(), schema.getType());
}
)
Modified: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/types/TypesUtils.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/types/TypesUtils.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/types/TypesUtils.java (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/types/TypesUtils.java Tue Nov 24 19:54:19 2009
@@ -212,8 +212,8 @@
* @throws IOException
*
*/
- public static void formatTuple(Tuple tuple, String projection) throws IOException {
- Tuple one = createTuple(Projection.getNumColumns(projection));
+ public static void formatTuple(Tuple tuple, int ncols) throws IOException {
+ Tuple one = createTuple(ncols);
tuple.reference(one);
return;
/*
Modified: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestBasicTable.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestBasicTable.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestBasicTable.java (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestBasicTable.java Tue Nov 24 19:54:19 2009
@@ -31,7 +31,7 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.file.tfile.RawComparable;
+import org.apache.hadoop.zebra.tfile.RawComparable;
import org.apache.hadoop.zebra.io.BasicTable;
import org.apache.hadoop.zebra.io.BasicTableStatus;
import org.apache.hadoop.zebra.io.KeyDistribution;
@@ -81,14 +81,13 @@
return String.format("%s%09d", prefix, random.nextInt(max));
}
- static int createBasicTable(int parts, int rows, String strSchema, String storage,
- Path path, boolean properClose, boolean sorted) throws IOException {
+ public static int createBasicTable(int parts, int rows, String strSchema, String storage, String sortColumns,
+ Path path, boolean properClose) throws IOException {
if (fs.exists(path)) {
BasicTable.drop(path, conf);
}
- BasicTable.Writer writer = new BasicTable.Writer(path, strSchema, storage,
- sorted, conf);
+ BasicTable.Writer writer = new BasicTable.Writer(path, strSchema, storage, sortColumns, null, conf);
writer.finish();
int total = 0;
@@ -96,6 +95,7 @@
String colNames[] = schema.getColumns();
Tuple tuple = TypesUtils.createTuple(schema);
+ boolean sorted = writer.isSorted();
for (int i = 0; i < parts; ++i) {
writer = new BasicTable.Writer(path, conf);
TableInserter inserter = writer.getInserter(
@@ -126,8 +126,11 @@
if (properClose) {
writer = new BasicTable.Writer(path, conf);
writer.close();
- BasicTableStatus status = getStatus(path);
- Assert.assertEquals(total, status.getRows());
+ /* We can only test number of rows on sorted tables.*/
+ if (sorted) {
+ BasicTableStatus status = getStatus(path);
+ Assert.assertEquals(total, status.getRows());
+ }
}
return total;
@@ -230,10 +233,10 @@
}
static void doReadWrite(Path path, int parts, int rows, String schema,
- String storage, String projection, boolean properClose, boolean sorted)
+ String storage, String sortColumns, String projection, boolean properClose, boolean sorted)
throws IOException, ParseException {
- int totalRows = createBasicTable(parts, rows, schema, storage, path,
- properClose, sorted);
+ int totalRows = createBasicTable(parts, rows, schema, storage, sortColumns, path,
+ properClose);
if (rows == 0) {
Assert.assertEquals(rows, 0);
}
@@ -248,17 +251,17 @@
public void testMultiCGs() throws IOException, ParseException {
Path path = new Path(rootPath, "TestBasicTableMultiCGs");
- doReadWrite(path, 2, 100, "SF_a,SF_b,SF_c,SF_d,SF_e", "[SF_a,SF_b,SF_c];[SF_d,SF_e]", "SF_f,SF_a,SF_c,SF_d", true, false);
+ doReadWrite(path, 2, 100, "SF_a,SF_b,SF_c,SF_d,SF_e", "[SF_a,SF_b,SF_c];[SF_d,SF_e]", null, "SF_f,SF_a,SF_c,SF_d", true, false);
}
public void testCornerCases() throws IOException, ParseException {
Path path = new Path(rootPath, "TestBasicTableCornerCases");
- doReadWrite(path, 0, 0, "a, b, c", "", "a, d, c, f", false, false);
- doReadWrite(path, 0, 0, "a, b, c", "", "a, d, c, f", true, false);
- doReadWrite(path, 0, 0, "a, b, c", "", "a, d, c, f", true, true);
- doReadWrite(path, 2, 0, "a, b, c", "", "a, d, c, f", false, false);
- doReadWrite(path, 2, 0, "a, b, c", "", "a, d, c, f", true, false);
- doReadWrite(path, 2, 0, "a, b, c", "", "a, d, c, f", true, true);
+ doReadWrite(path, 0, 0, "a, b, c", "", null, "a, d, c, f", false, false);
+ doReadWrite(path, 0, 0, "a, b, c", "", null, "a, d, c, f", true, false);
+ doReadWrite(path, 0, 0, "a, b, c", "", "a", "a, d, c, f", true, true);
+ doReadWrite(path, 2, 0, "a, b, c", "", null, "a, d, c, f", false, false);
+ doReadWrite(path, 2, 0, "a, b, c", "", null, "a, d, c, f", true, false);
+ doReadWrite(path, 2, 0, "a, b, c", "", "a", "a, d, c, f", true, true);
}
static int doReadOnly(TableScanner scanner) throws IOException, ParseException {
@@ -289,7 +292,7 @@
@Test
public void testNullSplits() throws IOException, ParseException {
Path path = new Path(rootPath, "TestBasicTableNullSplits");
- int totalRows = createBasicTable(2, 250, "a, b, c", "", path, true, true);
+ int totalRows = createBasicTable(2, 250, "a, b, c", "", "a", path, true);
BasicTable.Reader reader = new BasicTable.Reader(path, conf);
reader.setProjection("a,d,c,f");
Assert.assertEquals(totalRows, doReadOnly(reader.getScanner(null, false)));
@@ -301,14 +304,14 @@
@Test
public void testNegativeSplits() throws IOException, ParseException {
Path path = new Path(rootPath, "TestNegativeSplits");
- int totalRows = createBasicTable(2, 250, "a, b, c", "", path, true, true);
+ int totalRows = createBasicTable(2, 250, "a, b, c", "", "", path, true);
rangeSplitBasicTable(-1, totalRows, "a,d,c,f", path);
}
@Test
public void testMetaBlocks() throws IOException, ParseException {
Path path = new Path(rootPath, "TestBasicTableMetaBlocks");
- createBasicTable(3, 100, "a, b, c", "", path, false, false);
+ createBasicTable(3, 100, "a, b, c", "", null, path, false);
BasicTable.Writer writer = new BasicTable.Writer(path, conf);
BytesWritable meta1 = makeKey(1234);
BytesWritable meta2 = makeKey(9876);
@@ -350,8 +353,8 @@
@Test
public void testNormalCases() throws IOException, ParseException {
Path path = new Path(rootPath, "TestBasicTableNormal");
- doReadWrite(path, 2, 250, "a, b, c", "", "a, d, c, f", false, false);
- doReadWrite(path, 2, 250, "a, b, c", "", "a, d, c, f", true, false);
- doReadWrite(path, 2, 250, "a, b, c", "", "a, d, c, f", true, true);
+ doReadWrite(path, 2, 250, "a, b, c", "", null, "a, d, c, f", false, false);
+ doReadWrite(path, 2, 250, "a, b, c", "", null, "a, d, c, f", true, false);
+ doReadWrite(path, 2, 250, "a, b, c", "", "a", "a, d, c, f", true, true);
}
}
Modified: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestBasicTableMapSplits.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestBasicTableMapSplits.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestBasicTableMapSplits.java (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestBasicTableMapSplits.java Tue Nov 24 19:54:19 2009
@@ -72,7 +72,7 @@
BasicTable.drop(path, conf);
BasicTable.Writer writer = new BasicTable.Writer(path, STR_SCHEMA,
- STR_STORAGE, false, conf);
+ STR_STORAGE, conf);
writer.finish();
Schema schema = writer.getSchema();
Modified: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestBasicTableProjections.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestBasicTableProjections.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestBasicTableProjections.java (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestBasicTableProjections.java Tue Nov 24 19:54:19 2009
@@ -62,7 +62,7 @@
BasicTable.drop(path, conf);
BasicTable.Writer writer = new BasicTable.Writer(path, "a,b,c,d,e,f,g",
- "[a,b,c];[d,e,f,g]", false, conf);
+ "[a,b,c];[d,e,f,g]", conf);
writer.finish();
Schema schema = writer.getSchema();
Modified: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestBasicTableSplits.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestBasicTableSplits.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestBasicTableSplits.java (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestBasicTableSplits.java Tue Nov 24 19:54:19 2009
@@ -72,7 +72,7 @@
BasicTable.drop(path, conf);
BasicTable.Writer writer = new BasicTable.Writer(path, STR_SCHEMA,
- STR_STORAGE, false, conf);
+ STR_STORAGE, conf);
writer.finish();
Schema schema = writer.getSchema();
Modified: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestCollection.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestCollection.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestCollection.java (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestCollection.java Tue Nov 24 19:54:19 2009
@@ -80,7 +80,7 @@
// drop any previous tables
BasicTable.drop(path, conf);
BasicTable.Writer writer = new BasicTable.Writer(path, STR_SCHEMA,
- STR_STORAGE, false, conf);
+ STR_STORAGE, conf);
writer.finish();
@@ -987,7 +987,7 @@
BasicTable.drop(path, conf);
try {
BasicTable.Writer writer = new BasicTable.Writer(path, STR_SCHEMA,
- STR_STORAGE, false, conf);
+ STR_STORAGE, conf);
Assert.fail("should throw exception");
} catch (Exception e) {
System.out.println(e);
@@ -1012,10 +1012,10 @@
BasicTable.drop(path, conf);
try {
BasicTable.Writer writer = new BasicTable.Writer(path, STR_SCHEMA,
- STR_STORAGE, false, conf);
+ STR_STORAGE, conf);
Assert.fail("should throw exception");
} catch (Exception e) {
System.out.println(e);
}
}
-}
\ No newline at end of file
+}
Modified: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroup.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroup.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroup.java (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroup.java Tue Nov 24 19:54:19 2009
@@ -30,7 +30,7 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.file.tfile.RawComparable;
+import org.apache.hadoop.zebra.tfile.RawComparable;
import org.apache.hadoop.zebra.io.BasicTableStatus;
import org.apache.hadoop.zebra.io.ColumnGroup;
import org.apache.hadoop.zebra.io.KeyDistribution;
@@ -149,8 +149,11 @@
if (properClose) {
writer = new ColumnGroup.Writer(path, conf);
writer.close();
- BasicTableStatus status = getStatus(path);
- Assert.assertEquals(total, status.getRows());
+ /* We can only test number of rows on sorted tables.*/
+ if (sorted) {
+ BasicTableStatus status = getStatus(path);
+ Assert.assertEquals(total, status.getRows());
+ }
}
return total;
Modified: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupName1.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupName1.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupName1.java (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupName1.java Tue Nov 24 19:54:19 2009
@@ -67,7 +67,7 @@
// drop any previous tables
BasicTable.drop(path, conf);
BasicTable.Writer writer = new BasicTable.Writer(path, STR_SCHEMA,
- STR_STORAGE, false, conf);
+ STR_STORAGE, conf);
writer.finish();
Schema schema = writer.getSchema();
Tuple tuple = TypesUtils.createTuple(schema);
@@ -170,4 +170,4 @@
reader.close();
}
-}
\ No newline at end of file
+}
Modified: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupName2.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupName2.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupName2.java (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupName2.java Tue Nov 24 19:54:19 2009
@@ -62,7 +62,7 @@
// drop any previous tables
BasicTable.drop(path, conf);
BasicTable.Writer writer = new BasicTable.Writer(path, STR_SCHEMA,
- STR_STORAGE, false, conf);
+ STR_STORAGE, conf);
writer.finish();
Schema schema = writer.getSchema();
Tuple tuple = TypesUtils.createTuple(schema);
@@ -110,4 +110,4 @@
public void test1() throws IOException, Exception {
BasicTable.dumpInfo(path.toString(), System.out, conf);
}
-}
\ No newline at end of file
+}
Modified: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupName3.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupName3.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupName3.java (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupName3.java Tue Nov 24 19:54:19 2009
@@ -62,7 +62,7 @@
// drop any previous tables
BasicTable.drop(path, conf);
BasicTable.Writer writer = new BasicTable.Writer(path, STR_SCHEMA,
- STR_STORAGE, false, conf);
+ STR_STORAGE, conf);
writer.finish();
Schema schema = writer.getSchema();
Tuple tuple = TypesUtils.createTuple(schema);
@@ -110,4 +110,4 @@
public void test1() throws IOException, Exception {
BasicTable.dumpInfo(path.toString(), System.out, conf);
}
-}
\ No newline at end of file
+}
Modified: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupName4.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupName4.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupName4.java (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupName4.java Tue Nov 24 19:54:19 2009
@@ -62,7 +62,7 @@
// drop any previous tables
BasicTable.drop(path, conf);
BasicTable.Writer writer = new BasicTable.Writer(path, STR_SCHEMA,
- STR_STORAGE, false, conf);
+ STR_STORAGE, conf);
writer.finish();
Schema schema = writer.getSchema();
Tuple tuple = TypesUtils.createTuple(schema);
@@ -110,4 +110,4 @@
public void test1() throws IOException, Exception {
BasicTable.dumpInfo(path.toString(), System.out, conf);
}
-}
\ No newline at end of file
+}
Modified: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupName5.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupName5.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupName5.java (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupName5.java Tue Nov 24 19:54:19 2009
@@ -62,7 +62,7 @@
// drop any previous tables
BasicTable.drop(path, conf);
BasicTable.Writer writer = new BasicTable.Writer(path, STR_SCHEMA,
- STR_STORAGE, false, conf);
+ STR_STORAGE, conf);
writer.finish();
Schema schema = writer.getSchema();
Tuple tuple = TypesUtils.createTuple(schema);
Modified: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupName6.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupName6.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupName6.java (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupName6.java Tue Nov 24 19:54:19 2009
@@ -62,7 +62,7 @@
// drop any previous tables
BasicTable.drop(path, conf);
BasicTable.Writer writer = new BasicTable.Writer(path, STR_SCHEMA,
- STR_STORAGE, false, conf);
+ STR_STORAGE, conf);
writer.finish();
Schema schema = writer.getSchema();
Tuple tuple = TypesUtils.createTuple(schema);
Modified: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupProjections.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupProjections.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupProjections.java (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupProjections.java Tue Nov 24 19:54:19 2009
@@ -59,6 +59,10 @@
fs = new LocalFileSystem(rawLFS);
path = new Path(fs.getWorkingDirectory(), outputFile);
System.out.println("output file: " + path);
+
+ if (fs.exists(path)) {
+ ColumnGroup.drop(path, conf);
+ }
schema = new Schema("a,b,c,d,e,f,g");
Modified: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupSplits.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupSplits.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupSplits.java (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupSplits.java Tue Nov 24 19:54:19 2009
@@ -70,6 +70,10 @@
fs = new LocalFileSystem(rawLFS);
path = new Path(fs.getWorkingDirectory(), outputFile);
System.out.println("output file: " + path);
+
+ if (fs.exists(path)) {
+ ColumnGroup.drop(path, conf);
+ }
schema = new Schema(STR_SCHEMA);