You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ga...@apache.org on 2009/08/12 00:27:47 UTC
svn commit: r803312 [2/16] - in /hadoop/pig/trunk: ./ contrib/zebra/
contrib/zebra/docs/ contrib/zebra/src/ contrib/zebra/src/java/
contrib/zebra/src/java/org/ contrib/zebra/src/java/org/apache/
contrib/zebra/src/java/org/apache/hadoop/ contrib/zebra/s...
Added: hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/ColumnGroup.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/ColumnGroup.java?rev=803312&view=auto
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/ColumnGroup.java (added)
+++ hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/ColumnGroup.java Tue Aug 11 22:27:44 2009
@@ -0,0 +1,1760 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.zebra.io;
+
+import java.io.Closeable;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.EOFException;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.file.tfile.TFile;
+import org.apache.hadoop.io.file.tfile.Utils;
+import org.apache.hadoop.io.file.tfile.ByteArray;
+import org.apache.hadoop.io.file.tfile.RawComparable;
+import org.apache.hadoop.zebra.types.CGSchema;
+import org.apache.hadoop.zebra.types.ParseException;
+import org.apache.hadoop.zebra.types.Partition;
+import org.apache.hadoop.zebra.types.Projection;
+import org.apache.hadoop.zebra.types.Schema;
+import org.apache.hadoop.zebra.types.TypesUtils;
+import org.apache.hadoop.zebra.types.TypesUtils.TupleReader;
+import org.apache.hadoop.zebra.types.TypesUtils.TupleWriter;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.Tuple;
+
+/**
+ * ColumnGroup is the basic unit of a persistent table. The following
+ * Configuration parameters can customize the behavior of ColumnGroup.
+ * <ul>
+ * <li><b>table.output.tfile.minBlock.size</b> (int) Minimum compression block
+ * size for underlying TFile (default to 1024*1024).
+ * <li><b>table.output.tfile.compression</b> (String) Compression method (one
+ * of "none", "lzo", "gz") (default to "lzo").
+ *
+ * @see {@link TFile#getSupportedCompressionAlgorithms()}
+ * <li><b>table.input.split.minSize</b> (int) Minimum split size (default
+ * to 64*1024).
+ * </ul>
+ */
+class ColumnGroup {
+ private final static String CONF_COMPRESS = "table.output.tfile.compression";
+ private final static String DEFAULT_COMPRESS = "gz";
+ private final static String CONF_MIN_BLOCK_SIZE = "table.tfile.minblock.size";
+ private final static int DEFAULT_MIN_BLOCK_SIZE = 1024 * 1024;
+
+ private final static String CONF_MIN_SPLIT_SIZE = "table.input.split.minSize";
+ private final static int DEFAULT_MIN_SPLIT_SIZE = 64 * 1024;
+
+ private static final double SPLIT_SLOP = 1.1; // 10% slop
+
+ // excluding files start with the following prefix, may change to regex
+ private final static String CONF_NON_DATAFILE_PREFIX =
+ "table.cg.nondatafile.prefix";
+ private final static String SPECIAL_FILE_PREFIX = ".";
+
+ // tmp schema file name, used as a flag of unfinished CG
+ private final static String SCHEMA_FILE = ".schema";
+ // meta data TFile for entire CG, used as a flag of closed CG
+ final static String META_FILE = ".meta";
+
+ static final String BLOCK_NAME_INDEX = "ColumnGroup.index";
+
+ static Path makeMetaFilePath(Path parent) {
+ return new Path(parent, META_FILE);
+ }
+
+ static String getCompression(Configuration conf) {
+ return conf.get(CONF_COMPRESS, DEFAULT_COMPRESS);
+ }
+
+ static int getMinBlockSize(Configuration conf) {
+ return conf.getInt(CONF_MIN_BLOCK_SIZE, DEFAULT_MIN_BLOCK_SIZE);
+ }
+
+ static String getNonDataFilePrefix(Configuration conf) {
+ return conf.get(CONF_NON_DATAFILE_PREFIX, SPECIAL_FILE_PREFIX);
+ }
+
+ static int getMinSplitSize(Configuration conf) {
+ return conf.getInt(CONF_MIN_SPLIT_SIZE, DEFAULT_MIN_SPLIT_SIZE);
+ }
+
+ /**
+ * Drop a Column Group, maps to deleting all the files relating to this Column
+ * Group on the FileSystem.
+ *
+ * @param path
+ * the path to the ColumnGroup.
+ * @param conf
+ * The configuration object.
+ */
+ public static void drop(Path path, Configuration conf) throws IOException {
+ FileSystem fs = path.getFileSystem(conf);
+ fs.delete(path, true);
+ // TODO:
+ // fs.close();
+ }
+
+ /**
+ * Scan the file system, looking for TFiles, and build an in-memory index of a
+ * column group.
+ *
+ * @param fs
+ * The file system
+ * @param path
+ * The base path of the column group.
+ * @param dirty
+ * Whether to build dirty index or not. Dirty index is built by only
+ * looking at file-level status and not opening up individual TFiles.
+ * The flag may only be set for unsorted ColumnGroups.
+ * @param conf
+ * The configuration object.
+ * @return The in-memory index object.
+ * @throws IOException
+ */
+ static CGIndex buildIndex(FileSystem fs, Path path, boolean dirty,
+ Configuration conf) throws IOException {
+ CGIndex ret = new CGIndex();
+ FileStatus[] files = fs.listStatus(path, new CGPathFilter(conf));
+ Comparator<RawComparable> comparator = null;
+ for (FileStatus f : files) {
+ if (dirty) {
+ ret.add(f.getLen(), f.getPath().getName());
+ }
+ else {
+ FSDataInputStream dis = null;
+ TFile.Reader tr = null;
+ try {
+ dis = fs.open(f.getPath());
+ tr = new TFile.Reader(dis, f.getLen(), conf);
+ if (comparator == null) {
+ comparator = tr.getComparator();
+ }
+ if (tr.getEntryCount() > 0) {
+ CGIndexEntry range =
+ new CGIndexEntry(f.getPath().getName(), tr.getEntryCount(), tr
+ .getFirstKey(), tr.getLastKey());
+ ret.add(f.getLen(), tr.getEntryCount(), range);
+ }
+ }
+ catch (IOException e) {
+ // TODO: log the error, ignore incorrect TFiles.
+ e.printStackTrace(System.err);
+ }
+ finally {
+ if (tr != null) {
+ tr.close();
+ }
+ if (dis != null) {
+ dis.close();
+ }
+ }
+ }
+ }
+
+ ret.sort(comparator);
+ return ret;
+ }
+
+ /**
+ * ColumnGroup reader.
+ */
+ public static class Reader implements Closeable {
+ Path path;
+ Configuration conf;
+ FileSystem fs;
+ CGSchema cgschema;
+ Comparator<RawComparable> comparator;
+ Projection projection;
+ CGIndex cgindex;
+ ArrayList<SplitColumn> exec;
+ SplitColumn top; // directly associated with logical schema
+ SplitColumn leaf; // corresponding to projection
+ boolean closed;
+
+ /**
+ * Get the Column Group physical schema without loading the full CG index.
+ *
+ * @param path
+ * The path to the ColumnGroup.
+ * @param conf
+ * The configuration object.
+ * @return The ColumnGroup schema.
+ * @throws IOException
+ */
+
+ public static Schema getSchema(Path path, Configuration conf)
+ throws IOException, ParseException {
+ FileSystem fs = path.getFileSystem(conf);
+ CGSchema cgschema = CGSchema.load(fs, path);
+ return cgschema.getSchema();
+ }
+
+ /**
+ * Create a ColumnGroup reader.
+ *
+ * @param path
+ * The directory path to the column group.
+ * @param conf
+ * Optional configuration parameters.
+ * @throws IOException
+ */
+ public Reader(Path path, Configuration conf) throws IOException,
+ ParseException {
+ this(path, true, conf);
+ }
+
+ Reader(Path path, boolean dirty, Configuration conf) throws IOException,
+ ParseException {
+ this.path = path;
+ this.conf = conf;
+
+ fs = path.getFileSystem(conf);
+ // check existence of path
+ if (!fs.exists(path)) {
+ throw new IOException("Path doesn't exist: " + path);
+ }
+
+ if (!fs.getFileStatus(path).isDir()) {
+ throw new IOException("Path exists but not a directory: " + path);
+ }
+
+ cgschema = CGSchema.load(fs, path);
+ if (cgschema.isSorted()) {
+ comparator = TFile.makeComparator(cgschema.getComparator());
+ }
+ projection = new Projection(cgschema.getSchema()); // default projection to CG schema.
+ Path metaFilePath = makeMetaFilePath(path);
+ if (!fs.exists(metaFilePath)) {
+ // special case for unsorted CG that did not create index properly.
+ if (cgschema.isSorted()) {
+ throw new FileNotFoundException(
+ "Missing Meta File for sorted Column Group");
+ }
+ cgindex = buildIndex(fs, path, dirty, conf);
+ }
+ else {
+ MetaFile.Reader metaFile = MetaFile.createReader(metaFilePath, conf);
+ try {
+ cgindex = new CGIndex();
+ DataInputStream dis = metaFile.getMetaBlock(BLOCK_NAME_INDEX);
+ try {
+ cgindex.readFields(dis);
+ }
+ finally {
+ dis.close();
+ }
+ }
+ finally {
+ metaFile.close();
+ }
+ }
+ }
+
+ /**
+ * Set the projection for the reader. This will affect calls to
+ * getScanner(), getStatus(), and getColumnNames().
+ *
+ * @param projection
+ * The projection on the column group for subsequent read
+ * operations. If we want select all columns, pass
+ * projection==null.
+ */
+ public synchronized void setProjection(String projection) throws ParseException {
+ if (projection == null) {
+ this.projection = new Projection(cgschema.getSchema());
+ }
+ else {
+ this.projection = new Projection(cgschema.getSchema(), projection);
+ }
+ }
+
+ /**
+ * Get the schema of columns of the table (possibly through projection).
+ *
+ * @return Schema of the columns of the table (possibly through projection).
+ */
+ public Schema getSchema() throws ParseException {
+ return projection.getSchema();
+ }
+
+ /**
+ * Get the projection
+ * @return Projection of this Reader
+ */
+ public Projection getProjection() {
+ return projection;
+ }
+
+ public String getSerializer() {
+ return cgschema.getSerializer();
+ }
+
+ public String getCompressor() {
+ return cgschema.getCompressor();
+ }
+
+ public CGSchema getCGSchema() {
+ return cgschema;
+ }
+
+ /**
+ * Get a scanner that reads all rows whose row keys fall in a specific
+ * range.
+ *
+ * @param beginKey
+ * The begin key of the scan range.
+ * @param endKey
+ * The end key of the scan range.
+ * @param closeReader
+ * close the underlying Reader object when we close the scanner.
+ * Should be set to true if we have only one scanner on top of the
+ * reader, so that we should release resources after the scanner is
+ * closed.
+ * @return A scanner object.
+ * @throws IOException
+ */
+ public synchronized TableScanner getScanner(BytesWritable beginKey,
+ BytesWritable endKey, boolean closeReader) throws IOException,
+ ParseException {
+ if (closed) {
+ throw new EOFException("Reader already closed");
+ }
+ if (!isSorted()) {
+ throw new IOException(
+ "Cannot get key-bounded scanner for unsorted table");
+ }
+ RawComparable begin =
+ (beginKey != null) ? new ByteArray(beginKey.get(), 0, beginKey
+ .getSize()) : null;
+ RawComparable end =
+ (endKey != null) ? new ByteArray(endKey.get(), 0, endKey.getSize())
+ : null;
+ if (begin != null && end != null) {
+ if (comparator.compare(begin, end) >= 0) {
+ throw new IOException("Zero-key-range split");
+ }
+ }
+
+ return new CGScanner(begin, end, closeReader);
+ }
+
+ /**
+ * Get a scanner that reads a consecutive number of rows as defined in the
+ * CGRangeSplit object, which should be obtained from previous calls of
+ * rangeSplit().
+ *
+ * @param split
+ * The split range. If null, get a scanner to read the complete
+ * column group.
+ * @param closeReader
+ * close the underlying Reader object when we close the scanner.
+ * Should be set to true if we have only one scanner on top of the
+ * reader, so that we should release resources after the scanner is
+ * closed.
+ * @return A scanner object.
+ * @throws IOException
+ */
+ public synchronized TableScanner getScanner(CGRangeSplit split,
+ boolean closeReader) throws IOException, ParseException {
+ if (closed) {
+ throw new EOFException("Reader already closed");
+ }
+
+ if (split == null) {
+ return getScanner(new CGRangeSplit(0, cgindex.size()), closeReader);
+ }
+ if (split.len < 0) {
+ throw new IllegalArgumentException("Illegal range split");
+ }
+
+ if (split.len == 0) {
+ throw new IOException("Zero-length range split");
+ }
+
+ return new CGScanner(split, closeReader);
+ }
+
+ /**
+ * Given a split range, calculate how the file data that fall into the range
+ * are distributed among hosts.
+ *
+ * @param split
+ * The range-based split. If null, return all blocks.
+ * @return a map from host name to the amount of data (in bytes) the host
+ * owns that fall roughly into the key range.
+ */
+ public BlockDistribution getBlockDistribution(CGRangeSplit split)
+ throws IOException {
+ if (split == null) {
+ return getBlockDistribution(new CGRangeSplit(0, cgindex.size()));
+ }
+
+ if ((split.start | split.len | (cgindex.size() - split.start - split.len)) < 0) {
+ throw new IndexOutOfBoundsException("Bad split");
+ }
+
+ BlockDistribution ret = new BlockDistribution();
+ for (int i = split.start; i < split.start + split.len; ++i) {
+ CGIndexEntry dfkr = cgindex.get(i);
+ Path tfilePath = new Path(path, dfkr.getName());
+ FileStatus tfileStatus = fs.getFileStatus(tfilePath);
+ BlockLocation[] locations =
+ fs.getFileBlockLocations(tfileStatus, 0, tfileStatus.getLen());
+ for (BlockLocation l : locations) {
+ ret.add(l);
+ }
+ }
+
+ return ret;
+ }
+
+ /**
+ * Get a sampling of keys and calculate how data are distributed among
+ * key-partitioned buckets. The implementation attempts to calculate all
+ * information in one shot to avoid reading TFile index multiple times.
+ * Special care is also taken that memory requirement is not linear to the
+ * size of total data set for the column group.
+ *
+ * @param n
+ * Targeted size of the sampling.
+ * @return KeyDistribution object.
+ * @throws IOException
+ */
+ public KeyDistribution getKeyDistribution(int n) throws IOException {
+ // TODO: any need for similar capability for unsorted for sorted CGs?
+ if (!isSorted()) {
+ throw new IOException("Cannot get key distribution for unsorted table");
+ }
+ KeyDistribution ret = new KeyDistribution(comparator);
+ Path[] paths = new Path[cgindex.size()];
+ FileStatus[] tfileStatus = new FileStatus[paths.length];
+ long totalBytes = 0;
+ for (int i = 0; i < paths.length; ++i) {
+ paths[i] = cgindex.getPath(i, path);
+ tfileStatus[i] = fs.getFileStatus(paths[i]);
+ totalBytes += tfileStatus[i].getLen();
+ }
+ // variable.
+
+ final long EPSILON = (long) (getMinSplitSize(conf) * (SPLIT_SLOP - 1));
+ long goalSize = totalBytes / n;
+ goalSize = Math.max(getMinSplitSize(conf), goalSize);
+ for (int i = 0; i < paths.length; ++i) {
+ FileStatus fstatus = tfileStatus[i];
+ long blkSize = fstatus.getBlockSize();
+ long fileLen = fstatus.getLen();
+ long stepSize =
+ (goalSize > blkSize) ? goalSize / blkSize * blkSize : blkSize
+ / (blkSize / goalSize);
+ FSDataInputStream fsdis = null;
+ TFile.Reader reader = null;
+ long remainLen = fileLen;
+ boolean done = false;
+ try {
+ fsdis = fs.open(paths[i]);
+ reader = new TFile.Reader(fsdis, tfileStatus[i].getLen(), conf);
+ while ((remainLen > 0) && !done) {
+ long splitBytes =
+ (remainLen > stepSize * SPLIT_SLOP) ? stepSize : remainLen;
+ long offsetBegin = fileLen - remainLen;
+ long offsetEnd = offsetBegin + splitBytes;
+ BlockLocation[] locations =
+ fs.getFileBlockLocations(fstatus, offsetBegin, splitBytes);
+ if (locations.length == 0) {
+ throw new AssertionError(
+ "getFileBlockLocations returns 0 location");
+ }
+
+ Arrays.sort(locations, new Comparator<BlockLocation>() {
+ @Override
+ public int compare(BlockLocation o1, BlockLocation o2) {
+ long diff = o1.getOffset() - o2.getOffset();
+ if (diff < 0) return -1;
+ if (diff > 0) return 1;
+ return 0;
+ }
+ });
+ BlockLocation firstBlock = locations[0];
+ BlockLocation lastBlock = locations[locations.length - 1];
+ long lastBlockOffsetBegin = lastBlock.getOffset();
+ long lastBlockOffsetEnd =
+ lastBlockOffsetBegin + lastBlock.getLength();
+ if ((firstBlock.getOffset() > offsetBegin)
+ || (lastBlockOffsetEnd < offsetEnd)) {
+ throw new AssertionError(
+ "Block locations returned by getFileBlockLocations do not cover requested range");
+ }
+
+ // Adjust offsets
+ if ((offsetEnd > lastBlockOffsetBegin)
+ && (offsetEnd - lastBlockOffsetBegin < EPSILON)) {
+ // the split includes a bit of the next block, remove it.
+ offsetEnd = lastBlockOffsetBegin;
+ splitBytes = offsetEnd - offsetBegin;
+ }
+ else if ((lastBlockOffsetEnd > offsetEnd)
+ && (lastBlockOffsetEnd - offsetEnd < EPSILON)) {
+ // the split includes almost the whole block, fill it.
+ offsetEnd = lastBlockOffsetEnd;
+ splitBytes = offsetEnd - offsetBegin;
+ }
+
+ RawComparable key = reader.getKeyNear(offsetEnd);
+ if (key == null) {
+ offsetEnd = fileLen;
+ splitBytes = offsetEnd - offsetBegin;
+ key = reader.getLastKey();
+ done = true; // TFile index too large? Is it necessary now?
+ }
+ remainLen -= splitBytes;
+
+ BlockDistribution bd = new BlockDistribution();
+ for (BlockLocation l : locations) {
+ long blkBeginOffset = l.getOffset();
+ long blkEndOffset = blkBeginOffset + l.getLength();
+ if (blkBeginOffset < offsetBegin) blkBeginOffset = offsetBegin;
+ if (blkEndOffset > offsetEnd) blkEndOffset = offsetEnd;
+ if (blkEndOffset > blkBeginOffset) {
+ bd.add(l, blkEndOffset - blkBeginOffset);
+ }
+ }
+ ret.add(key, bd);
+ }
+ }
+ finally {
+ if (reader != null) {
+ try {
+ reader.close();
+ }
+ catch (Exception e) {
+ // no-op;
+ }
+ }
+ if (fsdis != null) {
+ try {
+ fsdis.close();
+ }
+ catch (Exception e) {
+ // no-op
+ }
+ }
+ }
+ }
+
+ return ret;
+ }
+
+ /**
+ * Get the status of the ColumnGroup.
+ */
+ public BasicTableStatus getStatus() {
+ return cgindex.status;
+ }
+
+ /**
+ * Split the ColumnGroup by file orders.
+ *
+ * @param n
+ * Targeted number of partitions.
+ * @return A list of range-based splits, whose size may be less than or
+ * equal to n.
+ */
+ public List<CGRangeSplit> rangeSplit(int n) {
+ // The output of this method must be only dependent on the cgindex and
+ // input parameter n - so that horizontally stitched column groups will
+ // get aligned splits.
+ int numFiles = cgindex.size();
+ if ((numFiles < n) || (n < 0)) {
+ return rangeSplit(numFiles);
+ }
+ List<CGRangeSplit> lst = new ArrayList<CGRangeSplit>();
+ int beginIndex = 0;
+ for (int i = 0; i < n; ++i) {
+ int endIndex = (int) ((long) (i + 1) * numFiles / n);
+ lst.add(new CGRangeSplit(beginIndex, endIndex - beginIndex));
+ beginIndex = endIndex;
+ }
+
+ return lst;
+ }
+
+ /**
+ * Is the ColumnGroup sorted?
+ *
+ * @return Whether the ColumnGroup is sorted.
+ */
+ public boolean isSorted() {
+ return cgschema.isSorted();
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (!closed) {
+ closed = true;
+ }
+ }
+
+ /**
+ * A simple wrapper class over TFile.Reader.Scanner to simplify the creation
+ * and resource management.
+ */
+ static class TFileScanner implements Closeable {
+ boolean closed = true;
+ FSDataInputStream ins;
+ TFile.Reader reader;
+ TFile.Reader.Scanner scanner;
+ TupleReader tupleReader;
+
+ TFileScanner(FileSystem fs, Path path, RawComparable begin,
+ RawComparable end, CGSchema cgschema, Projection projection,
+ Configuration conf) throws IOException, ParseException {
+ try {
+ ins = fs.open(path);
+ /*
+ * compressor is inside cgschema
+ */
+ reader = new TFile.Reader(ins, fs.getFileStatus(path).getLen(), conf);
+ scanner = reader.createScanner(begin, end);
+ /*
+ * serializer is inside cgschema: different serializer will require
+ * different Reader: for pig, it's TupleReader
+ */
+ tupleReader = new TupleReader(cgschema.getSchema(), projection);
+ closed = false;
+ }
+ finally {
+ if (closed == true) { // failed to instantiate the object.
+ if (scanner != null) {
+ try {
+ scanner.close();
+ }
+ catch (Exception e) {
+ // no-op
+ }
+ }
+
+ if (reader != null) {
+ try {
+ reader.close();
+ }
+ catch (Exception e) {
+ // no op
+ }
+ }
+
+ if (ins != null) {
+ try {
+ ins.close();
+ }
+ catch (Exception e) {
+ // no op
+ }
+ }
+ }
+ }
+ }
+
+ void rewind() throws IOException {
+ scanner.rewind();
+ }
+
+ void getKey(BytesWritable key) throws IOException {
+ scanner.entry().getKey(key);
+ }
+
+ void getValue(Tuple val) throws IOException, ParseException {
+ DataInputStream dis = scanner.entry().getValueStream();
+ try {
+ tupleReader.get(dis, val);
+ }
+ finally {
+ dis.close();
+ }
+ }
+
+ boolean seekTo(BytesWritable key) throws IOException {
+ return scanner.seekTo(key.get(), 0, key.getSize());
+ }
+
+ boolean advance() throws IOException {
+ return scanner.advance();
+ }
+
+ boolean atEnd() {
+ return scanner.atEnd();
+ }
+
+ void seekToEnd() throws IOException {
+ scanner.seekToEnd();
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (!closed) {
+ closed = true;
+ try {
+ scanner.close();
+ }
+ catch (Exception e) {
+ // no op
+ }
+
+ try {
+ reader.close();
+ }
+ catch (Exception e) {
+ // no op
+ }
+
+ try {
+ ins.close();
+ }
+ catch (Exception e) {
+ // no op
+ }
+ }
+ }
+ }
+
+ /**
+ * ColumnGroup scanner
+ */
+ class CGScanner implements TableScanner {
+ private Projection logicalSchema = null;
+ private TFileScanner[] scanners;
+ private boolean closeReader;
+ private int beginIndex, endIndex;
+ private int current; // current scanner
+ private boolean scannerClosed = true;
+
+ CGScanner(CGRangeSplit split, boolean closeReader) throws IOException,
+ ParseException {
+ if (split == null) {
+ beginIndex = 0;
+ endIndex = cgindex.size();
+ }
+ else {
+ beginIndex = split.start;
+ endIndex = split.start + split.len;
+ }
+ init(null, null, closeReader);
+ }
+
+ CGScanner(RawComparable beginKey, RawComparable endKey,
+ boolean closeReader) throws IOException, ParseException {
+ beginIndex = 0;
+ endIndex = cgindex.size();
+ if (beginKey != null) {
+ beginIndex = cgindex.lowerBound(beginKey, comparator);
+ }
+ if (endKey != null) {
+ endIndex = cgindex.lowerBound(endKey, comparator);
+ if (endIndex < cgindex.size()) {
+ ++endIndex;
+ }
+ }
+ init(beginKey, endKey, closeReader);
+ }
+
+ private void init(RawComparable beginKey, RawComparable endKey,
+ boolean doClose) throws IOException, ParseException {
+ if (beginIndex > endIndex) {
+ throw new IllegalArgumentException("beginIndex > endIndex");
+ }
+ logicalSchema = ColumnGroup.Reader.this.getProjection();
+ List<TFileScanner> tmpScanners =
+ new ArrayList<TFileScanner>(endIndex - beginIndex);
+ try {
+ for (int i = beginIndex; i < endIndex; ++i) {
+ RawComparable begin = (i == beginIndex) ? beginKey : null;
+ RawComparable end = (i == endIndex - 1) ? endKey : null;
+ TFileScanner scanner =
+ new TFileScanner(fs, cgindex.getPath(i, path), begin, end,
+ cgschema, logicalSchema, conf);
+ // skip empty scanners.
+ if (!scanner.atEnd()) {
+ tmpScanners.add(scanner);
+ }
+ else {
+ scanner.close();
+ }
+ }
+ scanners = tmpScanners.toArray(new TFileScanner[tmpScanners.size()]);
+ this.closeReader = doClose;
+ scannerClosed = false;
+ }
+ finally {
+ if (scannerClosed) { // failed to initialize the object.
+ for (int i = 0; i < tmpScanners.size(); ++i) {
+ try {
+ tmpScanners.get(i).close();
+ }
+ catch (Exception e) {
+ // no op
+ }
+ }
+ }
+ }
+ }
+
+ @Override
+ public void getKey(BytesWritable key) throws IOException {
+ if (atEnd()) {
+ throw new EOFException("No more key-value to read");
+ }
+ scanners[current].getKey(key);
+ }
+
+ @Override
+ public void getValue(Tuple row) throws IOException {
+ if (atEnd()) {
+ throw new EOFException("No more key-value to read");
+ }
+ try {
+ scanners[current].getValue(row);
+ } catch (ParseException e) {
+ throw new IOException("Invalid Projection: "+e.getMessage());
+ }
+ }
+
+ @Override
+ public String getProjection() {
+ return logicalSchema.toString();
+ }
+
+ public Schema getSchema() {
+ return logicalSchema.getSchema();
+ }
+
+ @Override
+ public boolean advance() throws IOException {
+ if (atEnd()) {
+ return false;
+ }
+ scanners[current].advance();
+ if (scanners[current].atEnd()) {
+ ++current;
+ if (!atEnd()) {
+ scanners[current].rewind();
+ }
+ }
+ return true;
+ }
+
+ @Override
+ public boolean atEnd() throws IOException {
+ return (current >= scanners.length);
+ }
+
+ @Override
+ public boolean seekTo(BytesWritable key) throws IOException {
+ if (!isSorted()) {
+ throw new IOException("Cannot seek in unsorted Column Gruop");
+ }
+ int index =
+ cgindex.lowerBound(new ByteArray(key.get(), 0, key.getSize()),
+ comparator);
+ if (index > endIndex) {
+ seekToEnd();
+ return false;
+ }
+
+ if ((index < beginIndex)) {
+ // move to the beginning
+ index = beginIndex;
+ }
+
+ current = index - beginIndex;
+ return scanners[current].seekTo(key);
+ }
+
+ @Override
+ public void seekToEnd() throws IOException {
+ current = scanners.length;
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (!scannerClosed) {
+ scannerClosed = true;
+ for (int i = 0; i < scanners.length; ++i) {
+ try {
+ scanners[i].close();
+ }
+ catch (Exception e) {
+ // no-op
+ }
+ }
+ if (closeReader) {
+ Reader.this.close();
+ }
+ }
+ }
+ }
+
+ public static class CGRangeSplit implements Writable {
+ int start = 0; // starting index in the list
+ int len = 0;
+
+ CGRangeSplit(int start, int len) {
+ this.start = start;
+ this.len = len;
+ }
+
+ public CGRangeSplit() {
+ // no-op;
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ start = Utils.readVInt(in);
+ len = Utils.readVInt(in);
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ Utils.writeVInt(out, start);
+ Utils.writeVInt(out, len);
+ }
+ }
+
+ private static class SplitColumn {
+ SplitColumn(Partition.SplitType st) {
+ this.st = st;
+ }
+
+ SplitColumn(int fieldIndex, Partition.SplitType st) {
+ this.fieldIndex = fieldIndex;
+ this.st = st;
+ }
+
+ SplitColumn(int fieldIndex, String key, Partition.SplitType st) {
+ this.fieldIndex = fieldIndex;
+ this.key = key;
+ this.st = st;
+ }
+
+ SplitColumn(int fieldIndex, int projIndex, SplitColumn leaf, String key,
+ Partition.SplitType st) {
+ this(fieldIndex, key, st);
+ this.projIndex = projIndex;
+ }
+
+ int fieldIndex = -1; // field index to parent
+ int projIndex = -1; // index in projection: only used by leaves
+ SplitColumn leaf = null;
+ String key = null; // MAP key to parent
+ ArrayList<SplitColumn> children = null;
+ int index = -1; // index in the logical schema
+ Object field = null;
+ Partition.SplitType st = Partition.SplitType.NONE;
+
+ void dispatch(Object field) {
+ this.field = field;
+ }
+
+ @SuppressWarnings("unchecked")
+ void split() throws ExecException {
+ int size = children.size();
+ if (st == Partition.SplitType.RECORD) {
+ for (int i = 0; i < size; i++) {
+ if (children.get(i).projIndex != -1) // a leaf: set projection
+ // directly
+ ((Tuple) (leaf.field)).set(projIndex, ((Tuple) field).get(children
+ .get(i).fieldIndex));
+ else children.get(i).field =
+ ((Tuple) field).get(children.get(i).fieldIndex);
+ }
+ }
+ else if (st == Partition.SplitType.MAP) {
+ for (int i = 0; i < size; i++) {
+ if (children.get(i).projIndex != -1) // a leaf: set projection
+ // directly
+ ((Tuple) (leaf.field)).set(projIndex, ((Map<String, Object>) field)
+ .get(children.get(i).key));
+ else children.get(i).field =
+ ((Map<String, Object>) field).get(children.get(i).key);
+ }
+ }
+ }
+
+ void addChild(SplitColumn child) {
+ if (children == null) children = new ArrayList<SplitColumn>();
+ children.add(child);
+ }
+ }
+ }
+
+ /**
+ * Column Group writer.
+ */
+ public static class Writer implements Closeable {
+ Path path;
+ Configuration conf;
+ FileSystem fs;
+ CGSchema cgschema;
+ private boolean finished;
+
+ /**
+ * Create a ColumnGroup writer. The semantics are as follows:
+ * <ol>
+ * <li>If path does not exist:
+ * <ul>
+ * <li>create the path directory
+ * <li>write out the meta data file.
+ * </ul>
+ * <li>If path exists and the directory is empty: write out the meta data
+ * file.
+ * <li>If path exists and contains what look like a complete Column Group,
+ * ColumnGroupExists exception will be thrown.
+ * <li>If path exists and overwrite is true, remove all files under the
+ * directory and resume as in Step 2.
+ * <li>If path exists directory not empty and overwrite= false,
+ * ColumnGroupExists will be thrown.
+ * </ol>
+ * This constructor never removes a valid/complete ColumnGroup.
+ *
+ * @param path
+ * The path to the Column Group, either not existent or must be a
+ * directory.
+ * @param schema
+ * The schema of the ColumnGroup. For this version of
+ * implementation, the schema of a table is a comma separated list
+ * of column names, such as "FirstName, LastName, Sex, Department".
+ * @param sorted
+ * Whether the column group to be created is sorted or not. If set
+ * to true, we expect the rows inserted by every inserter created
+ * from this Writer must be sorted. Additionally, there exists an
+ * ordering of the inserters Ins-1, Ins-2, ... such that the rows
+ * created by Ins-1, followed by rows created by Ins-2, ... form a
+ * total order.
+ * @param overwrite
+ * Should we overwrite the path if it already exists?
+ * @param conf
+ * The optional configuration objects.
+ * @throws IOException
+ */
+ public Writer(Path path, String schema, boolean sorted, String serializer,
+ String compressor, boolean overwrite, Configuration conf)
+ throws IOException, ParseException {
+ this(path, new Schema(schema), sorted, serializer, compressor, overwrite,
+ conf);
+ }
+
+ public Writer(Path path, Schema schema, boolean sorted, String serializer,
+ String compressor, boolean overwrite, Configuration conf)
+ throws IOException, ParseException {
+ this.path = path;
+ this.conf = conf;
+
+ fs = path.getFileSystem(conf);
+
+ // If meta file already exists, that means the ColumnGroup is complete and
+ // valid, we will not proceed.
+ checkMetaFile(path);
+
+ // if overwriting, remove everything
+ if (overwrite) {
+ fs.delete(path, true);
+ }
+
+ checkPath(path, true);
+
+ cgschema = new CGSchema(schema, sorted, serializer, compressor);
+ CGSchema sfNew = CGSchema.load(fs, path);
+ if (sfNew != null) {
+ // compare input with on-disk schema.
+ if (!sfNew.equals(cgschema)) {
+ throw new IOException("Schemes are different.");
+ }
+ }
+ else {
+ // create the schema file in FS
+ cgschema.create(fs, path);
+ }
+ }
+
+ /**
+ * Reopen an already created ColumnGroup for writing. RuntimeException will
+ * be thrown if the table is already closed, or if createMetaBlock() is
+ * called by some other process.
+ */
+ public Writer(Path path, Configuration conf) throws IOException,
+ ParseException {
+ this.path = path;
+ this.conf = conf;
+ fs = path.getFileSystem(conf);
+ checkPath(path, false);
+ checkMetaFile(path);
+ // read the schema file
+ cgschema = CGSchema.load(fs, path);
+ }
+
+ /**
+ * Release resources used by the object. Unlike close(), finish() does not
+ * make the table immutable. However, if a user already adds some meta data
+ * into the CG, then finish() would close the column group.
+ */
+ public void finish() {
+ if (!finished) {
+ finished = true;
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (!finished) {
+ finished = true;
+ createIndex();
+ }
+ }
+
+ public Schema getSchema() {
+ return cgschema.getSchema();
+ }
+
+ /**
+ * Get a inserter with a given name.
+ *
+ * @param name
+ * the name of the inserter.
+ * @param finishWriter
+ * finish the underlying Writer object upon the close of the
+ * Inserter. Should be set to true if there is only one inserter
+ * operate on the table, so we should call finish() after the
+ * Inserter is closed.
+ *
+ * @return A table inserter object.
+ * @throws IOException
+ */
+ public TableInserter getInserter(String name, boolean finishWriter)
+ throws IOException {
+ if (finished) {
+ throw new IOException("ColumnGroup has been closed for insertion.");
+ }
+ return new CGInserter(name, finishWriter);
+ }
+
+ private void createIndex() throws IOException {
+ MetaFile.Writer metaFile =
+ MetaFile.createWriter(makeMetaFilePath(path), conf);
+ CGIndex index = buildIndex(fs, path, false, conf);
+ DataOutputStream dos = metaFile.createMetaBlock(BLOCK_NAME_INDEX);
+ try {
+ index.write(dos);
+ }
+ finally {
+ dos.close();
+ }
+ metaFile.close();
+ }
+
+ private void checkPath(Path p, boolean createNew) throws IOException {
+ // check existence of path
+ if (!fs.exists(p)) {
+ if (createNew) {
+ fs.mkdirs(p);
+ }
+ else {
+ throw new IOException("Path doesn't exists for appending: " + p);
+ }
+ }
+ if (!fs.getFileStatus(p).isDir()) {
+ throw new IOException("Path exists but not a directory: " + p);
+ }
+ }
+
+ private void checkMetaFile(Path p) throws IOException {
+ Path pathMeta = new Path(p, META_FILE);
+ if (fs.exists(pathMeta)) {
+ throw new IOException("Index meta file already exists: " + pathMeta);
+ }
+ }
+
+ /**
+ * Inserter for ColumnGroup
+ */
+ class CGInserter implements TableInserter {
+ String name;
+ String tmpName;
+ boolean finishWriter;
+ FSDataOutputStream out;
+ TFile.Writer tfileWriter;
+ TupleWriter tupleWriter;
+ boolean closed = true;
+
+ private void createTempFile() throws IOException {
+ int maxTrial = 10;
+ String prefix = ".tmp." + name + ".";
+ Random random = new Random();
+
+ while (true) {
+ /**
+ * Try to set a real random seed by throwing all the runtime
+ * ingredients into it.
+ */
+ random.setSeed(System.nanoTime() * Thread.currentThread().getId()
+ * Runtime.getRuntime().freeMemory());
+ try {
+ tmpName = prefix + String.format("%08X", random.nextInt());
+ Path tmpPath = new Path(path, tmpName);
+ out = fs.create(tmpPath, false);
+ return;
+ }
+ catch (IOException e) {
+ --maxTrial;
+ if (maxTrial == 0) {
+ throw e;
+ }
+ Thread.yield();
+ }
+ }
+ }
+
+ CGInserter(String name, boolean finishWriter) throws IOException {
+ this.name = name;
+ this.finishWriter = finishWriter;
+ this.tupleWriter = new TupleWriter(getSchema());
+ try {
+ createTempFile();
+ tfileWriter =
+ new TFile.Writer(out, getMinBlockSize(conf),
+ getCompression(conf), cgschema.getComparator(), conf);
+ closed = false;
+ }
+ finally {
+ if (closed) {
+ if (tfileWriter != null) {
+ try {
+ tfileWriter.close();
+ }
+ catch (Exception e) {
+ // no-op
+ }
+ }
+ if (out != null) {
+ try {
+ out.close();
+ }
+ catch (Exception e) {
+ // no-op
+ }
+ }
+ if (tmpName != null) {
+ try {
+ fs.delete(new Path(path, tmpName), false);
+ }
+ catch (Exception e) {
+ // no-op
+ }
+ }
+ }
+ }
+ }
+
+ @Override
+ public Schema getSchema() {
+ return ColumnGroup.Writer.this.getSchema();
+ }
+
+ @Override
+ public void insert(BytesWritable key, Tuple row) throws IOException {
+ TypesUtils.checkCompatible(row, getSchema());
+ DataOutputStream outKey = tfileWriter.prepareAppendKey(key.getSize());
+ try {
+ outKey.write(key.get(), 0, key.getSize());
+ }
+ finally {
+ outKey.close();
+ }
+
+ DataOutputStream outValue = tfileWriter.prepareAppendValue(-1);
+ try {
+ tupleWriter.put(outValue, row);
+ }
+ finally {
+ outValue.close();
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (closed) {
+ return;
+ }
+ closed = true;
+
+ try {
+ // TODO: add schema to each TFile as a meta block?
+
+ tfileWriter.close();
+ tfileWriter = null;
+ out.close();
+ out = null;
+ // do renaming only if all the above is successful.
+ fs.rename(new Path(path, tmpName), new Path(path, name));
+ tmpName = null;
+ if (finishWriter) {
+ finish();
+ }
+ }
+ finally {
+ if (tfileWriter != null) {
+ try {
+ tfileWriter.close();
+ }
+ catch (Exception e) {
+ // no-op
+ }
+ }
+ if (out != null) {
+ try {
+ out.close();
+ }
+ catch (Exception e) {
+ // no-op
+ }
+ }
+ if (tmpName != null) {
+ try {
+ fs.delete(new Path(path, tmpName), false);
+ }
+ catch (Exception e) {
+ // no-op
+ }
+ }
+ if (finishWriter) {
+ try {
+ finish();
+ }
+ catch (Exception e) {
+ // no-op
+ }
+ }
+ }
+ }
+ }
+
+ }
+
+ /**
+ * name, first and last key (inclusive) of a data file
+ */
+ static class CGIndexEntry implements RawComparable, Writable {
+ String name;
+ long rows;
+ RawComparable firstKey;
+ RawComparable lastKey;
+
+ // for reading
+ CGIndexEntry() {
+ // no-op
+ }
+
+ // for writing
+ CGIndexEntry(String name, long rows, RawComparable firstKey,
+ RawComparable lastKey) {
+ this.name = name;
+ this.rows = rows;
+ this.firstKey = firstKey;
+ this.lastKey = lastKey;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public long getRows() {
+ return rows;
+ }
+
+ public RawComparable getFirstKey() {
+ return firstKey;
+ }
+
+ public RawComparable getLastKey() {
+ return lastKey;
+ }
+
+ @Override
+ public byte[] buffer() {
+ return (lastKey != null) ? lastKey.buffer() : null;
+ }
+
+ @Override
+ public int offset() {
+ return (lastKey != null) ? lastKey.offset() : 0;
+ }
+
+ @Override
+ public int size() {
+ return (lastKey != null) ? lastKey.size() : 0;
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ name = Utils.readString(in);
+ rows = Utils.readVLong(in);
+ if (rows == 0) {
+ firstKey = null;
+ lastKey = null;
+ }
+ else {
+ int firstKeyLen = Utils.readVInt(in);
+ byte[] firstKeyBuffer = new byte[firstKeyLen];
+ in.readFully(firstKeyBuffer);
+ int lastKeyLen = Utils.readVInt(in);
+ byte[] lastKeyBuffer = new byte[lastKeyLen];
+ in.readFully(lastKeyBuffer);
+ firstKey = new ByteArray(firstKeyBuffer);
+ lastKey = new ByteArray(lastKeyBuffer);
+ }
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ Utils.writeString(out, name);
+ Utils.writeVLong(out, rows);
+ if (rows > 0) {
+ if ((firstKey == null) && (lastKey == null)) {
+ throw new IOException("In-memory only entry");
+ }
+ Utils.writeVInt(out, firstKey.size());
+ out.write(firstKey.buffer(), firstKey.offset(), firstKey.size());
+ Utils.writeVInt(out, lastKey.size());
+ out.write(lastKey.buffer(), lastKey.offset(), lastKey.size());
+ }
+ }
+ }
+
+ static class CGIndex implements Writable {
+ boolean dirty = false;
+ boolean sorted = true;
+ BasicTableStatus status;
+ ArrayList<CGIndexEntry> index;
+
+ CGIndex() {
+ status = new BasicTableStatus();
+ index = new ArrayList<CGIndexEntry>();
+ }
+
+ int size() {
+ return index.size();
+ }
+
+ CGIndexEntry get(int i) {
+ return index.get(i);
+ }
+
+ List<CGIndexEntry> getIndex() {
+ return index;
+ }
+
+ Path getPath(int i, Path parent) {
+ return new Path(parent, index.get(i).getName());
+ }
+
+ void sort(final Comparator<RawComparable> comparator) throws IOException {
+ if (dirty && comparator != null) {
+ throw new IOException("Cannot sort dirty index");
+ }
+
+ if (comparator != null) {
+ // sort by keys. For empty TFiles, they are always sorted before
+ // non-empty TFiles, and they themselves are sorted by their names.
+ Collections.sort(index, new Comparator<CGIndexEntry>() {
+
+ @Override
+ public int compare(CGIndexEntry o1, CGIndexEntry o2) {
+ if ((o1.getRows() == 0) && (o2.getRows() == 0)) {
+ return o1.getName().compareTo(o2.getName());
+ }
+ if (o1.getRows() == 0) return -1;
+ if (o2.getRows() == 0) return 1;
+ int cmprv = comparator.compare(o1.lastKey, o2.lastKey);
+ if (cmprv == 0) {
+ cmprv = comparator.compare(o1.firstKey, o2.firstKey);
+ if (cmprv == 0) {
+ cmprv = o1.getName().compareTo(o2.getName());
+ }
+ }
+ return cmprv;
+ }
+ });
+
+ for (int i = 0; i < index.size() - 1; ++i) {
+ RawComparable prevLastKey = index.get(i).lastKey;
+ RawComparable nextFirstKey = index.get(i + 1).firstKey;
+ if (nextFirstKey == null) {
+ continue;
+ }
+ if (comparator.compare(prevLastKey, nextFirstKey) > 0) {
+ throw new IOException("Overlapping key ranges");
+ }
+ }
+ }
+ else {
+ // sort by name
+ Collections.sort(index, new Comparator<CGIndexEntry>() {
+
+ @Override
+ public int compare(CGIndexEntry o1, CGIndexEntry o2) {
+ return o1.name.compareTo(o2.name);
+ }
+ });
+ }
+
+ // update status
+ if ((!dirty) && (index.size() > 0)) {
+ RawComparable keyFirst = index.get(0).getFirstKey();
+ status.beginKey = new BytesWritable();
+ status.beginKey.set(keyFirst.buffer(), keyFirst.offset(), keyFirst
+ .size());
+ RawComparable keyLast = index.get(index.size() - 1).getLastKey();
+ status.endKey = new BytesWritable();
+ status.endKey.set(keyLast.buffer(), keyLast.offset(), keyLast.size());
+ }
+ sorted = true;
+ }
+
+ // building full index.
+ void add(long bytes, long rows, CGIndexEntry range) {
+ status.size += bytes;
+ status.rows += rows;
+ index.add(range);
+ sorted = false;
+ }
+
+ // building dirty index
+ void add(long bytes, String name) {
+ dirty = true;
+ status.rows = -1; // reset rows to -1.
+ status.size += bytes;
+ CGIndexEntry next = new CGIndexEntry();
+ next.name = name;
+ index.add(next);
+ sorted = false;
+ }
+
+ int lowerBound(RawComparable key, final Comparator<RawComparable> comparator)
+ throws IOException {
+ if ((key == null) || (comparator == null)) {
+ throw new IllegalArgumentException("CGIndex.lowerBound");
+ }
+
+ if (!sorted) {
+ sort(comparator);
+ }
+
+ // Treat null keys as the least key.
+ return Utils.lowerBound(index, key, new Comparator<RawComparable>() {
+ @Override
+ public int compare(RawComparable o1, RawComparable o2) {
+ if ((o1.buffer() == null) && (o2.buffer() == null)) {
+ return 0;
+ }
+ if (o1.buffer() == null) return -1;
+ if (o2.buffer() == null) return 1;
+ return comparator.compare(o1, o2);
+ }
+ });
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ int n = Utils.readVInt(in);
+ index.clear();
+ index.ensureCapacity(n);
+ for (int i = 0; i < n; ++i) {
+ CGIndexEntry range = new CGIndexEntry();
+ range.readFields(in);
+ index.add(range);
+ }
+ status.readFields(in);
+ dirty = false;
+ sorted = true;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ if (dirty) {
+ throw new IOException("Cannot write dirty index");
+ }
+ if (!sorted) {
+ throw new IOException("Please sort index before calling write");
+ }
+ Utils.writeVInt(out, index.size());
+ for (int i = 0; i < index.size(); ++i) {
+ index.get(i).write(out);
+ }
+ status.write(out);
+ }
+ }
+
+ static class CGPathFilter implements PathFilter {
+ private final Configuration conf;
+
+ CGPathFilter(Configuration conf) {
+ this.conf = conf;
+ }
+
+ public boolean accept(Path p) {
+ return p.getName().equals(META_FILE) || p.getName().equals(SCHEMA_FILE)
+ || p.getName().startsWith(".tmp.")
+ || p.getName().startsWith(getNonDataFilePrefix(conf)) ? false : true;
+ }
+ }
+
+ /**
+ * Dump information about CG.
+ *
+ * @param file
+ * Path string of the CG
+ * @param out
+ * PrintStream to output the information.
+ * @param conf
+ * The configuration object.
+ * @throws IOException
+ */
+ static public void dumpInfo(String file, PrintStream out, Configuration conf)
+ throws IOException, Exception {
+ // final int maxKeySampleLen = 16;
+ dumpInfo(new Path(file), out, conf);
+ }
+
+ static public void dumpInfo(Path path, PrintStream out, Configuration conf)
+ throws IOException, Exception {
+ dumpInfo(path, out, conf, 0);
+ }
+
+ static public void dumpInfo(Path path, PrintStream out, Configuration conf, int indent)
+ throws IOException, Exception {
+ // final int maxKeySampleLen = 16;
+ IOutils.indent(out, indent);
+ out.println();
+ IOutils.indent(out, indent);
+ out.println("Column Group : " + path);
+ ColumnGroup.Reader reader = new ColumnGroup.Reader(path, false, conf);
+ try {
+ LinkedHashMap<String, String> properties =
+ new LinkedHashMap<String, String>();
+ IOutils.indent(out, indent);
+ out.println("Serializer: " + reader.getSerializer());
+ IOutils.indent(out, indent);
+ out.println("Compressor: " + reader.getCompressor());
+ properties.put("Schema", reader.getSchema().toString());
+ // Now output the properties table.
+ int maxKeyLength = 0;
+ Set<Map.Entry<String, String>> entrySet = properties.entrySet();
+ for (Iterator<Map.Entry<String, String>> it = entrySet.iterator(); it
+ .hasNext();) {
+ Map.Entry<String, String> e = it.next();
+ if (e.getKey().length() > maxKeyLength) {
+ maxKeyLength = e.getKey().length();
+ }
+ }
+ for (Iterator<Map.Entry<String, String>> it = entrySet.iterator(); it
+ .hasNext();) {
+ Map.Entry<String, String> e = it.next();
+ IOutils.indent(out, indent);
+ out.printf("%s : %s\n", e.getKey(), e.getValue());
+ }
+ out.println("TFiles within the Column Group :");
+ for (CGIndexEntry entry : reader.cgindex.index) {
+ IOutils.indent(out, indent);
+ out.printf(" *Name : %s\n", entry.name);
+ IOutils.indent(out, indent);
+ out.printf(" Rows : %d\n", entry.rows);
+ if (entry.firstKey != null) {
+ IOutils.indent(out, indent);
+ out.printf(" First Key : %s\n", headToString(entry.firstKey));
+ }
+ if (entry.lastKey != null) {
+ IOutils.indent(out, indent);
+ out.printf(" Larst Key : %s\n", headToString(entry.lastKey));
+ }
+ // dump TFile info
+ // Path pathTFile = new Path(path, entry.name);
+ // TFile.dumpInfo(pathTFile.toString(), out, conf);
+ }
+ }
+ finally {
+ try {
+ reader.close();
+ }
+ catch (Exception e) {
+ // no-op
+ }
+ }
+ }
+
+ private static String headToString(RawComparable raw) {
+ return new String(raw.buffer(), raw.offset(), raw.size() > 70 ? 70 : raw
+ .size());
+ }
+
+ /**
+ * Dumping the CG information.
+ *
+ * @param args
+ * A list of CG paths.
+ */
+ public static void main(String[] args) throws Exception {
+ System.out.printf("ColumnGroup Dumper\n");
+ if (args.length == 0) {
+ System.out
+ .println("Usage: java ... org.apache.hadoop.zebra.io.ColumnGroup cg-path [cg-path ...]");
+ System.exit(0);
+ }
+ Configuration conf = new Configuration();
+ for (String file : args) {
+ try {
+ dumpInfo(file, System.out, conf);
+ }
+ catch (IOException e) {
+ e.printStackTrace(System.err);
+ }
+ }
+ }
+}
Added: hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/IOutils.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/IOutils.java?rev=803312&view=auto
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/IOutils.java (added)
+++ hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/IOutils.java Tue Aug 11 22:27:44 2009
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.zebra.io;
+
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.PrintStream;
+
+public class IOutils {
+ public static void indent(PrintStream os, int amount)
+ {
+ for (int i = 0; i < amount; i++)
+ os.print(' ');
+ }
+}
Added: hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/KeyDistribution.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/KeyDistribution.java?rev=803312&view=auto
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/KeyDistribution.java (added)
+++ hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/KeyDistribution.java Tue Aug 11 22:27:44 2009
@@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.zebra.io;
+
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.file.tfile.RawComparable;
+
+/**
+ * Class used to convey the information of how on-disk data are distributed
+ * among key-partitioned buckets. This class is used by the MapReduce layer to
+ * calculate intelligent splits.
+ */
+public class KeyDistribution {
+ private long uniqueBytes;
+ private SortedMap<RawComparable, BlockDistribution> data;
+
+ KeyDistribution(Comparator<? super RawComparable> comparator) {
+ data = new TreeMap<RawComparable, BlockDistribution>(comparator);
+ }
+
+ void add(RawComparable key, BlockDistribution bucket) {
+ uniqueBytes += bucket.getLength();
+ data.put(key, BlockDistribution.sum(data.get(key), bucket));
+ }
+
+ void add(KeyDistribution other) {
+ this.uniqueBytes += other.uniqueBytes;
+ reduceKeyDistri(this.data, other.data);
+ }
+
+ static void reduceKeyDistri(SortedMap<RawComparable, BlockDistribution> lv,
+ SortedMap<RawComparable, BlockDistribution> rv) {
+ for (Iterator<Map.Entry<RawComparable, BlockDistribution>> it =
+ rv.entrySet().iterator(); it.hasNext();) {
+ Map.Entry<RawComparable, BlockDistribution> e = it.next();
+ RawComparable key = e.getKey();
+ BlockDistribution sum = lv.get(key);
+ BlockDistribution delta = e.getValue();
+ lv.put(key, BlockDistribution.sum(sum, delta));
+ }
+ }
+
+ /**
+ * Aggregate two key distributions.
+ *
+ * @param a
+ * first key distribution (can be null)
+ * @param b
+ * second key distribution (can be null)
+ * @return the aggregated key distribution.
+ */
+ public static KeyDistribution sum(KeyDistribution a, KeyDistribution b) {
+ if (a == null) return b;
+ if (b == null) return a;
+ a.add(b);
+ return a;
+ }
+
+ /**
+ * Get the size of the key sampling.
+ *
+ * @return Number of key samples.
+ */
+ public int size() {
+ return data.size();
+ }
+
+ /**
+ * Get the total unique bytes contained in the key-partitioned buckets.
+ *
+ * @return The total number of bytes contained in the key-partitioned buckets.
+ */
+ public long length() {
+ return uniqueBytes;
+ }
+
+ /**
+ * Resize the key samples
+ *
+ * @param n
+ * targeted sampling size
+ * @return the actual size after the resize().
+ */
+ public int resize(int n) {
+ Iterator<Map.Entry<RawComparable, BlockDistribution>> it =
+ data.entrySet().iterator();
+ KeyDistribution adjusted = new KeyDistribution(data.comparator());
+ for (int i = 0; i < n; ++i) {
+ long targetMarker = (i + 1) * uniqueBytes / n;
+ if (adjusted.uniqueBytes >= targetMarker) {
+ continue;
+ }
+ RawComparable key = null;
+ do {
+ Map.Entry<RawComparable, BlockDistribution> e = it.next();
+ if (key == null) {
+ key = e.getKey();
+ }
+ adjusted.add(key, e.getValue());
+ }
+ while (adjusted.uniqueBytes < targetMarker);
+ }
+
+ swap(adjusted);
+ return data.size();
+ }
+
+ void swap(KeyDistribution other) {
+ long tmp = uniqueBytes;
+ uniqueBytes = other.uniqueBytes;
+ other.uniqueBytes = tmp;
+ SortedMap<RawComparable, BlockDistribution> tmp2 = data;
+ data = other.data;
+ other.data = tmp2;
+ }
+
+ /**
+ * Get the list of sampling keys.
+ *
+ * @return A list of sampling keys.
+ */
+ public RawComparable[] getKeys() {
+ RawComparable[] ret = new RawComparable[data.size()];
+ return data.keySet().toArray(ret);
+
+ }
+
+ /**
+ * Get the block distribution of all data that maps to the key bucket.
+ */
+ public BlockDistribution getBlockDistribution(BytesWritable key) {
+ BlockDistribution bInfo = data.get(key);
+ if (bInfo == null) {
+ throw new IllegalArgumentException("Invalid key");
+ }
+ return bInfo;
+ }
+}
Added: hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/MetaFile.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/MetaFile.java?rev=803312&view=auto
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/MetaFile.java (added)
+++ hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/MetaFile.java Tue Aug 11 22:27:44 2009
@@ -0,0 +1,256 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.zebra.io;
+
+import java.io.Closeable;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.file.tfile.TFile;
+import org.apache.hadoop.io.file.tfile.MetaBlockAlreadyExists;
+import org.apache.hadoop.io.file.tfile.MetaBlockDoesNotExist;
+
+
+/**
+ * simple named meta block management.
+ *
+ * This implementation uses TFile as the storage layer.
+ *
+ * TODO: Use an alternative design to allow writing of meta blocks from
+ * multiple processes with DFS append feature.
+ */
+class MetaFile {
+ // MetaFile is just a namespace object.
+ private MetaFile() {
+ // no-op
+ }
+
+ /**
+ * Create a MetaFile writer.
+ *
+ * @param path
+ * path to the meta file.
+ * @param conf
+ * Configuration object
+ * @return a MataFile.Writer object
+ * @throws IOException
+ * upon error
+ *
+ * The implementation does not always hold a reference to the
+ * underlying file. In fact, the file is not created until the first
+ * time a meta block is added. For this implementation, we use TFile
+ * as storage backend, and only one process will ever succeed in
+ * creating the file and adding meta blocks. So it is possible that
+ * we successfully open the Writer initially, but receive a
+ * file-already-exist error later.
+ */
+ static Writer createWriter(Path path, Configuration conf)
+ throws IOException {
+ return new Writer(path, conf);
+ }
+
+ /**
+ * Create a MetaFile reader.
+ *
+ * @param path
+ * path to the meta file.
+ * @param conf
+ * Configuration object
+ * @return a MataFile.Reader object
+ * @throws IOException
+ * upon error
+ *
+ * The implementation does not always hold a reference to the
+ * underlying file. In fact, the file is opened when the first time
+ * a meta block is requested. So it is possible that we successfully
+ * open the reader, but receive a file-not-exist error later.
+ */
+ static Reader createReader(Path path, Configuration conf)
+ throws IOException {
+ return new Reader(path, conf);
+ }
+
+ /**
+ * Reader
+ */
+ static class Reader implements Closeable {
+ private Path path;
+ private Configuration conf;
+ private FSDataInputStream fsdis = null;
+ private TFile.Reader metaFile = null;
+
+ /**
+ * Remember the settings. Creation of TFile reader is lazy.
+ *
+ * @param path
+ * @param conf
+ */
+ Reader(Path path, Configuration conf) {
+ this.path = path;
+ this.conf = conf;
+ }
+
+ private synchronized void checkFile() throws IOException {
+ if (metaFile != null) return;
+ FileSystem fs = path.getFileSystem(conf);
+ fsdis = fs.open(path);
+ metaFile = new TFile.Reader(fsdis, fs.getFileStatus(path).getLen(), conf);
+ }
+
+ DataInputStream getMetaBlock(String name)
+ throws MetaBlockDoesNotExist, IOException {
+ checkFile();
+ return metaFile.getMetaBlock(name);
+ }
+
+ public void close() throws IOException {
+ try {
+ if (metaFile != null) {
+ metaFile.close();
+ metaFile = null;
+ }
+ if (fsdis != null) {
+ fsdis.close();
+ fsdis = null;
+ }
+ } finally {
+ if (metaFile != null) {
+ try {
+ metaFile.close();
+ } catch (Exception e) {
+ // no-op
+ }
+ metaFile = null;
+ }
+
+ if (fsdis != null) {
+ try {
+ fsdis.close();
+ } catch (Exception e) {
+ // no-op
+ }
+ fsdis = null;
+ }
+ }
+ }
+ }
+
+ /**
+ * Writer
+ */
+ static class Writer implements Closeable {
+ private Path path;
+ private Configuration conf;
+ private FSDataOutputStream fsdos = null;
+ private TFile.Writer metaFile = null;
+
+ // Creation of TFile writer is lazy.
+ Writer(Path path, Configuration conf) {
+ this.path = path;
+ this.conf = conf;
+ }
+
+ /**
+ * Actually opening the file if not opened yet.
+ *
+ * @throws IOException
+ */
+ private synchronized void checkFile() throws IOException {
+ if (metaFile != null) return;
+
+ FileSystem fs = path.getFileSystem(conf);
+ // Throw an exception if the meta file already exists.
+ fsdos = fs.create(path, false);
+ // TODO Move getMinBlockSize to BasicTable
+ // TODO move getCompression to BasicTable
+ metaFile =
+ new TFile.Writer(fsdos, ColumnGroup.getMinBlockSize(conf),
+ ColumnGroup.getCompression(conf), null, conf);
+ }
+
+ /**
+ * Close the Writer if it is opened. Generally, finish() should allow future
+ * writer to append more data. But in this implementation, we are not able
+ * to achieve this.
+ *
+ * @throws IOException
+ */
+ void finish() throws IOException {
+ close();
+ }
+
+ /**
+ * Close the Writer if it is opened.
+ *
+ * @throws IOException
+ */
+ public void close() throws IOException {
+ try {
+ if (metaFile != null) {
+ metaFile.close();
+ metaFile = null;
+ }
+ if (fsdos != null) {
+ fsdos.close();
+ fsdos = null;
+ }
+ } finally {
+ if (metaFile != null) {
+ try {
+ metaFile.close();
+ } catch (Exception e) {
+ // no-op
+ }
+ metaFile = null;
+ }
+
+ if (fsdos != null) {
+ try {
+ fsdos.close();
+ } catch (Exception e) {
+ // no-op
+ }
+ fsdos = null;
+ }
+ }
+ }
+
+ /**
+ * Obtain an output stream for creating a Meta Block with the specific
+ * name. The first time it is called, the meta file will be created.
+ *
+ * @param name
+ * The name of the Meta Block
+ * @return The output stream. Close the stream to conclude the writing.
+ * @throws IOException
+ * @throws MetaBlockAlreadyExists
+ */
+ public DataOutputStream createMetaBlock(String name)
+ throws MetaBlockAlreadyExists, IOException {
+ checkFile();
+ return metaFile.prepareMetaBlock(name);
+ }
+ }
+}
Added: hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/TableInserter.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/TableInserter.java?rev=803312&view=auto
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/TableInserter.java (added)
+++ hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/TableInserter.java Tue Aug 11 22:27:44 2009
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.zebra.io;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.zebra.types.Schema;
+import org.apache.pig.data.Tuple;
+
+/**
+ * Inserter interface allows application to to insert a number of rows into
+ * table.
+ */
+public interface TableInserter extends Closeable {
+ /**
+ * Insert a new row into the table.
+ *
+ * @param key
+ * The row key.
+ * @param row
+ * The row.
+ */
+ public void insert(BytesWritable key, Tuple row) throws IOException;
+
+ /**
+ * Get the schema of the underlying table we are writing to.
+ *
+ * @return The schema of the underlying table.
+ */
+ public Schema getSchema();
+}
Added: hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/TableScanner.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/TableScanner.java?rev=803312&view=auto
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/TableScanner.java (added)
+++ hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/TableScanner.java Tue Aug 11 22:27:44 2009
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.zebra.io;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.zebra.types.Projection;
+import org.apache.hadoop.zebra.types.Schema;
+import org.apache.pig.data.Tuple;
+
+/**
+ * Scanner interface allows application to scan a range of rows of a table. It
+ * is the caller's responsibility to close() the scanner after finishing using
+ * it. Scanner allows one to fetch the key without fetching the value to allow
+ * underlying implementation to instantiating tuple objects (Row) lazily.
+ */
+public interface TableScanner extends Closeable {
+ /**
+ * Test whether the cursor is at the end of the scan range.
+ *
+ * @return Whether the cursor is at the end of the scan range.
+ * @throws IOException
+ */
+ boolean atEnd() throws IOException;
+
+ /**
+ * Advance cursor to the next Row.
+ *
+ * @return true if the cursor is moved. It will return true when the scanner
+ * moves the cursor from the last row to the end position.
+ * @throws IOException
+ */
+ boolean advance() throws IOException;
+
+ /**
+ * Get the row key.
+ *
+ * @param key
+ * The output parameter to hold the result.
+ */
+ void getKey(BytesWritable key) throws IOException;
+
+ /**
+ * Get the row.
+ *
+ * @param row
+ * The output parameter to hold the result. It must conform to the
+ * schema that the scanner is aware of.
+ * @see TableScanner#getSchema()
+ * @throws IOException
+ */
+ void getValue(Tuple row) throws IOException;
+
+ /**
+ * Seek to the key that is greater or equal to the provided key, or we reach
+ * the end. It is only applicable to sorted tables.
+ *
+ * @param key
+ * The input key.
+ * @return true if we find the exact match; false otherwise.
+ * @throws IOException
+ */
+ boolean seekTo(BytesWritable key) throws IOException;
+
+ /**
+ * Seek to the end of the scan range.
+ *
+ * @throws IOException
+ */
+ void seekToEnd() throws IOException;
+
+ /**
+ */
+ public String getProjection();
+
+ /**
+ * Get the projection's schema
+ * @return
+ */
+ public Schema getSchema();
+}
Added: hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/package-info.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/package-info.java?rev=803312&view=auto
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/package-info.java (added)
+++ hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/package-info.java Tue Aug 11 22:27:44 2009
@@ -0,0 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+/**
+ * Physical I/O management of Hadoop Tables.
+ */
+package org.apache.hadoop.zebra.io;
Added: hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/BasicTableExpr.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/BasicTableExpr.java?rev=803312&view=auto
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/BasicTableExpr.java (added)
+++ hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/BasicTableExpr.java Tue Aug 11 22:27:44 2009
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.zebra.mapred;
+
+import java.io.IOException;
+import java.io.StringReader;
+import java.io.PrintStream;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.zebra.io.BasicTable;
+import org.apache.hadoop.zebra.io.TableScanner;
+import org.apache.hadoop.zebra.types.ParseException;
+import org.apache.hadoop.zebra.types.Schema;
+
+/**
+ * Table expression for reading a BasicTable.
+ *
+ * @see <a href="doc-files/examples/ReadABasicTable.java">Usage example for
+ * BasicTableExpr</a>
+ */
+class BasicTableExpr extends TableExpr {
+ private Path path;
+
+ /**
+ * default constructor.
+ */
+ public BasicTableExpr() {
+ // no-op
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param path
+ * Path of the BasicTable.
+ */
+ public BasicTableExpr(Path path) {
+ this.path = path;
+ }
+
+ /**
+ * Set the path.
+ *
+ * @param path
+ * path to the BasicTable.
+ * @return self.
+ */
+ public BasicTableExpr setPath(Path path) {
+ this.path = path;
+ return this;
+ }
+
+ /**
+ * Get the path.
+ *
+ * @return the path to the BasicTable.
+ */
+ public Path getPath() {
+ return path;
+ }
+
+ @Override
+ protected BasicTableExpr decodeParam(StringReader in) throws IOException {
+ String strPath = TableExprUtils.decodeString(in);
+ if (strPath == null) {
+ throw new RuntimeException("Incomplete expression");
+ }
+ path = new Path(strPath);
+ return this;
+ }
+
+ @Override
+ protected BasicTableExpr encodeParam(StringBuilder out) {
+ if (path == null) {
+ throw new RuntimeException("Incomplete expression");
+ }
+ TableExprUtils.encodeString(out, path.toString());
+ return this;
+ }
+
+ @Override
+ public List<LeafTableInfo> getLeafTables(String projection) {
+ ArrayList<LeafTableInfo> ret = new ArrayList<LeafTableInfo>(1);
+ ret.add(new LeafTableInfo(path, projection));
+ return ret;
+ }
+
+ @Override
+ public TableScanner getScanner(BytesWritable begin, BytesWritable end,
+ String projection, Configuration conf) throws IOException {
+ BasicTable.Reader reader = new BasicTable.Reader(path, conf);
+ try {
+ reader.setProjection(projection);
+ } catch (ParseException e) {
+ throw new IOException("Projection parsing failed : "+e.getMessage());
+ }
+ return reader.getScanner(begin, end, true);
+ }
+
+ @Override
+ public Schema getSchema(Configuration conf) throws IOException {
+ return BasicTable.Reader.getSchema(path, conf);
+ }
+
+ @Override
+ public boolean sortedSplitCapable() {
+ return true;
+ }
+
+ @Override
+ protected void dumpInfo(PrintStream ps, Configuration conf, int indent) throws IOException
+ {
+ BasicTable.dumpInfo(path.toString(), ps, conf, indent);
+ }
+}