You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by pr...@apache.org on 2009/11/24 20:54:34 UTC
svn commit: r883836 [2/23] - in /hadoop/pig/branches/load-store-redesign: ./
contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/
contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/
contrib/zebra/ contrib/zebra...
Modified: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/io/BasicTable.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/io/BasicTable.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/io/BasicTable.java (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/io/BasicTable.java Tue Nov 24 19:54:19 2009
@@ -21,6 +21,7 @@
import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.DataOutputStream;
+import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.OutputStream;
import java.io.PrintStream;
@@ -39,15 +40,18 @@
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
-import org.apache.hadoop.io.file.tfile.TFile;
-import org.apache.hadoop.io.file.tfile.Utils;
-import org.apache.hadoop.io.file.tfile.MetaBlockAlreadyExists;
-import org.apache.hadoop.io.file.tfile.MetaBlockDoesNotExist;
-import org.apache.hadoop.io.file.tfile.Utils.Version;
+import org.apache.hadoop.zebra.tfile.TFile;
+import org.apache.hadoop.zebra.tfile.Utils;
+import org.apache.hadoop.zebra.tfile.MetaBlockAlreadyExists;
+import org.apache.hadoop.zebra.tfile.MetaBlockDoesNotExist;
+import org.apache.hadoop.zebra.tfile.Utils.Version;
+import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.zebra.io.ColumnGroup.Reader.CGRangeSplit;
+import org.apache.hadoop.zebra.io.ColumnGroup.Reader.CGRowSplit;
import org.apache.hadoop.zebra.types.CGSchema;
import org.apache.hadoop.zebra.parser.ParseException;
import org.apache.hadoop.zebra.types.Partition;
@@ -55,6 +59,7 @@
import org.apache.hadoop.zebra.schema.Schema;
import org.apache.hadoop.zebra.parser.TableSchemaParser;
import org.apache.hadoop.zebra.types.TypesUtils;
+import org.apache.hadoop.zebra.types.SortInfo;
import org.apache.pig.data.Tuple;
/**
@@ -81,13 +86,11 @@
private final static String BT_SCHEMA_FILE = ".btschema";
// schema version
private final static Version SCHEMA_VERSION =
- new Version((short) 1, (short) 0);
+ new Version((short) 1, (short) 1);
// name of the BasicTable meta-data file
private final static String BT_META_FILE = ".btmeta";
// column group prefix
private final static String CGPathPrefix = "CG";
- // default comparator to "memcmp"
- private final static String DEFAULT_COMPARATOR = TFile.COMPARATOR_MEMCMP;
private final static String DELETED_CG_PREFIX = ".deleted-";
@@ -114,7 +117,11 @@
*
* Dropping a column group that has already been removed is a no-op no
* exception is thrown.
+ * <br> <br>
*
+ * Note that this feature is experimental now and subject to changes in the
+ * future.
+ *
* @param path path to BasicTable
* @param conf Configuration determines file system and other parameters.
* @param cgName name of the column group to drop.
@@ -127,8 +134,25 @@
throws IOException {
FileSystem fs = FileSystem.get(conf);
+ int triedCount = 0;
+ int numCGs = SchemaFile.getNumCGs(path, conf);
+ SchemaFile schemaFile = null;
+
+ /* Retry up to numCGs times accounting for other CG deleting threads or processes.*/
+ while (triedCount ++ < numCGs) {
+ try {
+ schemaFile = new SchemaFile(path, conf);
+ break;
+ } catch (FileNotFoundException e) {
+ LOG.info("Try " + triedCount + " times : " + e.getMessage());
+ } catch (Exception e) {
+ throw new IOException ("Cannot construct SchemaFile : " + e.getMessage());
+ }
+ }
- SchemaFile schemaFile = new SchemaFile(path, conf);
+ if (schemaFile == null) {
+ throw new IOException ("Cannot construct SchemaFile");
+ }
int cgIdx = schemaFile.getCGByName(cgName);
if (cgIdx < 0) {
@@ -137,9 +161,8 @@
}
Path cgPath = new Path(path, schemaFile.getName(cgIdx));
-
- //Clean up any previous unfinished attempts to drop column groups?
-
+
+ //Clean up any previous unfinished attempts to drop column groups?
if (schemaFile.isCGDeleted(cgIdx)) {
// Clean up unfinished delete if it exists. so that clean up can
// complete if the previous deletion was interrupted for some reason.
@@ -271,7 +294,8 @@
schema = schemaFile.getLogical();
projection = new Projection(schema);
String storage = schemaFile.getStorageString();
- partition = new Partition(schema, projection, storage);
+ String comparator = schemaFile.getComparator();
+ partition = new Partition(schema, projection, storage, comparator);
for (int nx = 0; nx < numCGs; nx++) {
if (!schemaFile.isCGDeleted(nx)) {
colGroups[nx] =
@@ -332,6 +356,21 @@
}
/**
+ * @return the list of sorted columns
+ */
+ public SortInfo getSortInfo()
+ {
+ return schemaFile.getSortInfo();
+ }
+
+ /**
+ * @return the name of i-th column group
+ */
+ public String getName(int i) {
+ return schemaFile.getName(i);
+ }
+
+ /**
* Set the projection for the reader. This will affect calls to
* {@link #getScanner(RangeSplit, boolean)},
* {@link #getScanner(BytesWritable, BytesWritable, boolean)},
@@ -351,7 +390,7 @@
this.projection = new Projection(schemaFile.getLogical());
partition =
new Partition(schemaFile.getLogical(), this.projection, schemaFile
- .getStorageString());
+ .getStorageString(), schemaFile.getComparator());
}
else {
/**
@@ -362,7 +401,7 @@
new Projection(schemaFile.getLogical(), projection);
partition =
new Partition(schemaFile.getLogical(), this.projection, schemaFile
- .getStorageString());
+ .getStorageString(), schemaFile.getComparator());
}
inferredMapping = false;
}
@@ -389,13 +428,31 @@
BlockDistribution bd = new BlockDistribution();
for (int nx = 0; nx < colGroups.length; nx++) {
if (!isCGDeleted(nx)) {
- bd.add(colGroups[nx].getBlockDistribution(split == null ? null : split
- .get(nx)));
+ bd.add(colGroups[nx].getBlockDistribution(split == null ? null : split.getCGRangeSplit()));
}
}
return bd;
}
+
+ /**
+ * Given a row-based split, calculate how the file data that fall into the split
+ * are distributed among hosts.
+ *
+ * @param split The row-based split. <i>Cannot</i> be null.
+ * @return An object that conveys how blocks fall into the split are
+ * distributed across hosts.
+ * @see #rowSplit(int)
+ */
+ public BlockDistribution getBlockDistribution(RowSplit split)
+ throws IOException {
+ BlockDistribution bd = new BlockDistribution();
+ int cgIdx = split.getCGIndex();
+ bd.add(colGroups[cgIdx].getBlockDistribution(split.getCGRowSplit()));
+
+ return bd;
+ }
+
/**
* Collect some key samples and use them to partition the table. Only
* applicable to sorted BasicTable. The returned {@link KeyDistribution}
@@ -415,7 +472,7 @@
kd.add(colGroups[nx].getKeyDistribution(n));
}
}
- if (kd.size() > (int) (n * 1.5)) {
+ if (n >= 0 && kd.size() > (int) (n * 1.5)) {
kd.resize(n);
}
return kd;
@@ -473,6 +530,26 @@
}
/**
+ * Get a scanner that reads a consecutive number of rows as defined in the
+ * {@link RowSplit} object.
+ *
+ * @param closeReader
+ * close the underlying Reader object when we close the scanner.
+ * Should be set to true if we have only one scanner on top of the
+ * reader, so that we should release resources after the scanner is
+ * closed.
+ * @param rowSplit split based on row numbers.
+ *
+ * @return A scanner object.
+ * @throws IOException
+ */
+ public synchronized TableScanner getScanner(boolean closeReader,
+ RowSplit rowSplit)
+ throws IOException, ParseException, ParseException {
+ checkInferredMapping();
+ return new BTScanner(rowSplit, closeReader, partition);
+ }
+ /**
* Get the schema of the table. The schema may be different from
* {@link BasicTable.Reader#getSchema(Path, Configuration)} if a projection
* has been set on the table.
@@ -506,61 +583,106 @@
public String getPath() {
return path.toString();
}
+
+ /**
+ * Get the path filter used by the table.
+ */
+ public PathFilter getPathFilter(Configuration conf) {
+ ColumnGroup.CGPathFilter filter = new ColumnGroup.CGPathFilter();
+ ColumnGroup.CGPathFilter.setConf(conf);
+ return filter;
+ }
/**
* Split the table into at most n parts.
*
- * @param n
- * Maximum number of parts in the output list.
+ * @param n Maximum number of parts in the output list.
* @return A list of RangeSplit objects, each of which can be used to
* construct TableScanner later.
*/
- @SuppressWarnings("unchecked")
public List<RangeSplit> rangeSplit(int n) throws IOException {
- // assume all CGs will be split into the same number of horizontal
- // slices
- List<CGRangeSplit>[] cgSplitsAll = new ArrayList[colGroups.length];
- // split each CG
- for (int nx = 0; nx < colGroups.length; nx++) {
- if (!isCGDeleted(nx))
- cgSplitsAll[nx] = colGroups[nx].rangeSplit(n);
- }
-
- // verify all CGs have same number of slices
- int numSlices = -1;
- for (int nx = 0; nx < cgSplitsAll.length; nx++) {
- if (isCGDeleted(nx)) {
- continue;
- }
- if (numSlices < 0) {
- numSlices = cgSplitsAll[nx].size();
- }
- else if (cgSplitsAll[nx].size() != numSlices) {
- throw new IOException(
- "BasicTable's column groups were not equally split.");
+ // use the first non-deleted column group to do split, other column groups will be split exactly the same way.
+ List<RangeSplit> ret;
+ if (firstValidCG >= 0) {
+ List<CGRangeSplit> cgSplits = colGroups[firstValidCG].rangeSplit(n);
+ int numSlices = cgSplits.size();
+ ret = new ArrayList<RangeSplit>(numSlices);
+ for (int slice = 0; slice < numSlices; slice++) {
+ CGRangeSplit oneSliceSplit = cgSplits.get(slice);
+ ret.add(new BasicTable.Reader.RangeSplit(oneSliceSplit));
}
+
+ return ret;
+ } else { // all column groups are dropped.
+ ret = new ArrayList<RangeSplit>(1);
+ // add a dummy split
+ ret.add(new BasicTable.Reader.RangeSplit(new CGRangeSplit(0, 0)));
+ return ret;
}
- if (numSlices <= 0) {
- // This could happen because of various reasons.
- // One possibility is that all the CGs are deleted.
- numSlices = 1;
- }
- // return horizontal slices as RangeSplits
- List<RangeSplit> ret = new ArrayList<RangeSplit>(numSlices);
+ }
+
+ /**
+ * We already use FileInputFormat to create byte offset-based input splits.
+ * Their information is encoded in starts, lengths and paths. This method is
+ * to wrap this information to form RowSplit objects at basic table level.
+ *
+ * @param starts array of starting byte of fileSplits.
+ * @param lengths array of length of fileSplits.
+ * @param paths array of path of fileSplits.
+ * @param splitCGIndex index of column group that is used to create fileSplits.
+ * @return A list of RowSplit objects, each of which can be used to
+ * construct a TableScanner later.
+ *
+ */
+ public List<RowSplit> rowSplit(long[] starts, long[] lengths, Path[] paths, int splitCGIndex) throws IOException {
+ List<RowSplit> ret;
+
+ List<CGRowSplit> cgSplits = colGroups[splitCGIndex].rowSplit(starts, lengths, paths);
+ int numSlices = cgSplits.size();
+ ret = new ArrayList<RowSplit>(numSlices);
for (int slice = 0; slice < numSlices; slice++) {
- CGRangeSplit[] oneSliceSplits = new CGRangeSplit[cgSplitsAll.length];
- for (int cgIndex = 0; cgIndex < cgSplitsAll.length; cgIndex++) {
- if (isCGDeleted(cgIndex)) {
- // set a dummy split
- oneSliceSplits[cgIndex] = new CGRangeSplit(0, 0);
- } else {
- oneSliceSplits[cgIndex] = cgSplitsAll[cgIndex].get(slice);
- }
- }
- ret.add(new BasicTable.Reader.RangeSplit(oneSliceSplits));
+ CGRowSplit cgRowSplit = cgSplits.get(slice);
+ ret.add(new BasicTable.Reader.RowSplit(splitCGIndex, cgRowSplit));
}
+
return ret;
}
+
+
+ /**
+ * Get index of the column group that will be used for row-based split.
+ *
+ */
+ public int getRowSplitCGIndex() {
+ // Try to find the largest non-deleted and used column group by projection;
+ int largestCGIndex = -1;
+ int splitCGIndex = -1;
+ long largestCGSize = -1;
+ for (int i=0; i<colGroups.length; i++) {
+ if (!partition.isCGNeeded(i) || isCGDeleted(i)) {
+ continue;
+ }
+ ColumnGroup.Reader reader = colGroups[i];
+ BasicTableStatus btStatus = reader.getStatus();
+ long size = btStatus.getSize();
+ if (size > largestCGSize) {
+ largestCGIndex = i;
+ largestCGSize = size;
+ }
+ }
+
+ /* We do have a largest non-deleted and used column group,
+ and we use it to do split. */
+ if (largestCGIndex >= 0) {
+ splitCGIndex = largestCGIndex;
+ } else if (firstValidCG >= 0) { /* If all projection columns are either deleted or non-existing,
+ then we use the first non-deleted column group to do split if it exists. */
+ splitCGIndex = firstValidCG;
+ }
+
+ return splitCGIndex;
+ }
+
/**
* Close the BasicTable for reading. Resources are released.
@@ -642,10 +764,11 @@
* implementation-dependent.
*/
public static class RangeSplit implements Writable {
- CGRangeSplit[] slice;
+ //CGRangeSplit[] slice;
+ CGRangeSplit slice;
- RangeSplit(CGRangeSplit[] splits) {
- slice = splits;
+ RangeSplit(CGRangeSplit split) {
+ slice = split;
}
/**
@@ -660,12 +783,10 @@
*/
@Override
public void readFields(DataInput in) throws IOException {
- int count = Utils.readVInt(in);
- slice = new CGRangeSplit[count];
- for (int nx = 0; nx < count; nx++) {
+ for (int nx = 0; nx < 1; nx++) {
CGRangeSplit cgrs = new CGRangeSplit();
cgrs.readFields(in);
- slice[nx] = cgrs;
+ slice = cgrs;
}
}
@@ -674,18 +795,81 @@
*/
@Override
public void write(DataOutput out) throws IOException {
- Utils.writeVInt(out, slice.length);
- for (CGRangeSplit split : slice) {
- split.write(out);
- }
+ //Utils.writeVInt(out, slice.length);
+ //for (CGRangeSplit split : slice) {
+ // split.write(out);
+ //}
+ slice.write(out);
}
- CGRangeSplit get(int index) {
- return slice[index];
+ //CGRangeSplit get(int index) {
+ // return slice[index];
+ //}
+
+ CGRangeSplit getCGRangeSplit() {
+ return slice;
}
}
/**
+ * A row-based split on the zebra table;
+ */
+ public static class RowSplit implements Writable {
+ int cgIndex; // column group index where split lies on;
+ CGRowSplit slice;
+
+ RowSplit(int cgidx, CGRowSplit split) {
+ this.cgIndex = cgidx;
+ this.slice = split;
+ }
+
+ /**
+ * Default constructor.
+ */
+ public RowSplit() {
+ // no-op
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("{cgIndex = " + cgIndex + "}\n");
+ sb.append(slice.toString());
+
+ return sb.toString();
+ }
+
+ /**
+ * @see Writable#readFields(DataInput)
+ */
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ this.cgIndex = Utils.readVInt(in);
+ CGRowSplit cgrs = new CGRowSplit();
+ cgrs.readFields(in);
+ this.slice = cgrs;
+ }
+
+ /**
+ * @see Writable#write(DataOutput)
+ */
+ @Override
+ public void write(DataOutput out) throws IOException {
+ Utils.writeVInt(out, cgIndex);
+ slice.write(out);
+ }
+
+ int getCGIndex() {
+ return cgIndex;
+ }
+
+ CGRowSplit getCGRowSplit() {
+ return slice;
+ }
+ }
+
+
+ /**
* BasicTable scanner class
*/
private class BTScanner implements TableScanner {
@@ -704,74 +888,106 @@
}
public BTScanner(BytesWritable beginKey, BytesWritable endKey,
- boolean closeReader, Partition partition) throws IOException {
- this.partition = partition;
- boolean anyScanner = false;
- try {
- schema = partition.getProjection();
- cgScanners = new TableScanner[colGroups.length];
- for (int i = 0; i < colGroups.length; ++i) {
- if (!isCGDeleted(i) && partition.isCGNeeded(i))
- {
- anyScanner = true;
- cgScanners[i] = colGroups[i].getScanner(beginKey, endKey, false);
- } else
- cgScanners[i] = null;
- }
- if (!anyScanner && firstValidCG >= 0) {
- // if no CG is needed explicitly by projection but the "countRow" still needs to access some column group
- cgScanners[firstValidCG] = colGroups[firstValidCG].
- getScanner(beginKey, endKey, false);
- }
- this.closeReader = closeReader;
- sClosed = false;
- }
- catch (Exception e) {
- throw new IOException("BTScanner constructor failed : "
- + e.getMessage());
+ boolean closeReader, Partition partition) throws IOException {
+ init(null, null, beginKey, endKey, closeReader, partition);
+ }
+
+ public BTScanner(RangeSplit split, Partition partition,
+ boolean closeReader) throws IOException {
+ init(null, split, null, null, closeReader, partition);
+ }
+
+ public BTScanner(RowSplit rowSplit, boolean closeReader,
+ Partition partition) throws IOException {
+ init(rowSplit, null, null, null, closeReader, partition);
+ }
+
+ /**
+ * Creates new CGRowSplit. If the startRow in rowSplit is not set
+ * (i.e. < 0), it sets the startRow and numRows based on 'startByte'
+ * and 'numBytes' from given rowSplit.
+ */
+ private CGRowSplit makeCGRowSplit(RowSplit rowSplit) throws IOException {
+ CGRowSplit inputCGSplit = rowSplit.getCGRowSplit();
+
+ int cgIdx = rowSplit.getCGIndex();
+
+ CGRowSplit cgSplit = new CGRowSplit();
+ cgSplit.fileIndex = inputCGSplit.fileIndex;
+ // startByte and numBytes from inputCGSplit are ignored, since
+ // they make sense for only one CG.
+ cgSplit.startRow = inputCGSplit.startRow;
+ cgSplit.numRows = inputCGSplit.numRows;
+
+ if (cgSplit.startRow >= 0) {
+ //assume the rows are already set up.
+ return cgSplit;
}
- finally {
- if (sClosed) {
- if (cgScanners != null) {
- for (int i = 0; i < cgScanners.length; ++i) {
- if (cgScanners[i] != null) {
- try {
- cgScanners[i].close();
- cgScanners[i] = null;
- }
- catch (Exception e) {
- // no-op
- }
- }
- }
- }
- }
+
+ // Find the row range :
+ if (isCGDeleted(cgIdx)) {
+ throw new IOException("CG " + cgIdx + " is deleted.");
}
+
+ //fill the row numbers.
+ colGroups[cgIdx].fillRowSplit(cgSplit, inputCGSplit.startByte,
+ inputCGSplit.numBytes);
+ return cgSplit;
}
-
- public BTScanner(RangeSplit split, Partition partition,
- boolean closeReader) throws IOException {
+
+ // Helper function for initialization.
+ private TableScanner createCGScanner(int cgIndex, CGRowSplit cgRowSplit,
+ RangeSplit rangeSplit,
+ BytesWritable beginKey,
+ BytesWritable endKey)
+ throws IOException, ParseException,
+ ParseException {
+ if (cgRowSplit != null) {
+ return colGroups[cgIndex].getScanner(false, cgRowSplit);
+ }
+ if (beginKey != null || endKey != null) {
+ return colGroups[cgIndex].getScanner(beginKey, endKey, false);
+ }
+ return colGroups[cgIndex].getScanner
+ ((rangeSplit == null ? null : rangeSplit.getCGRangeSplit()),
+ false);
+ }
+
+ /**
+ * If rowRange is not null, scanners will be created based on the
+ * row range. <br>
+ * If RangeSplit is not null, scaller will be based on the range, <br>
+ * otherwise, these are based on keys.
+ */
+ private void init(RowSplit rowSplit, RangeSplit rangeSplit,
+ BytesWritable beginKey, BytesWritable endKey,
+ boolean closeReader, Partition partition) throws IOException {
+ this.partition = partition;
+ boolean anyScanner = false;
+
+ CGRowSplit cgRowSplit = null;
+ if (rowSplit != null) {
+ cgRowSplit = makeCGRowSplit(rowSplit);
+ }
+
try {
schema = partition.getProjection();
cgScanners = new TableScanner[colGroups.length];
- boolean anyScanner = false;
for (int i = 0; i < colGroups.length; ++i) {
- // if no CG is needed explicitly by projection but the "countRow" still needs to access some column group
- if (!isCGDeleted(i) && partition.isCGNeeded(i))
+ if (!isCGDeleted(i) && partition.isCGNeeded(i))
{
- cgScanners[i] =
- colGroups[i].getScanner(split == null ? null : split.get(i),
- false);
anyScanner = true;
+ cgScanners[i] = createCGScanner(i, cgRowSplit, rangeSplit,
+ beginKey, endKey);
} else
cgScanners[i] = null;
}
if (!anyScanner && firstValidCG >= 0) {
// if no CG is needed explicitly by projection but the "countRow" still needs to access some column group
- cgScanners[firstValidCG] = colGroups[firstValidCG].
- getScanner(split == null ? null : split.get(firstValidCG), false);
+ cgScanners[firstValidCG] = createCGScanner(firstValidCG, cgRowSplit,
+ rangeSplit,
+ beginKey, endKey);
}
- this.partition = partition;
this.closeReader = closeReader;
sClosed = false;
}
@@ -1014,6 +1230,8 @@
private boolean closed = true;
ColumnGroup.Writer[] colGroups;
Partition partition;
+ boolean sorted;
+ private boolean finished;
Tuple[] cgTuples;
/**
@@ -1039,35 +1257,34 @@
* implementation, the schema of a table is a comma or
* semicolon-separated list of column names, such as
* "FirstName, LastName; Sex, Department".
- * @param sorted
- * Whether the table to be created is sorted or not. If set to
- * true, we expect the rows inserted by every inserter created from
- * this Writer must be sorted. Additionally, there exists an
- * ordering of the inserters Ins-1, Ins-2, ... such that the rows
- * created by Ins-1, followed by rows created by Ins-2, ... form a
- * total order.
+ * @param sortColumns
+ * String of comma-separated sorted columns: null for unsorted tables
+ * @param comparator
+ * Name of the comparator used in sorted tables
* @param conf
* Optional Configuration objects.
*
* @throws IOException
* @see Schema
*/
- public Writer(Path path, String btSchemaString, String btStorageString,
- boolean sorted, Configuration conf) throws IOException {
+ public Writer(Path path, String btSchemaString, String btStorageString, String sortColumns,
+ String comparator, Configuration conf) throws IOException {
try {
schemaFile =
- new SchemaFile(path, btSchemaString, btStorageString,
- DEFAULT_COMPARATOR, sorted, conf);
+ new SchemaFile(path, btSchemaString, btStorageString, sortColumns,
+ comparator, conf);
partition = schemaFile.getPartition();
int numCGs = schemaFile.getNumOfPhysicalSchemas();
colGroups = new ColumnGroup.Writer[numCGs];
cgTuples = new Tuple[numCGs];
+ sorted = schemaFile.isSorted();
for (int nx = 0; nx < numCGs; nx++) {
colGroups[nx] =
new ColumnGroup.Writer(
new Path(path, schemaFile.getName(nx)),
schemaFile.getPhysicalSchema(nx),
sorted,
+ comparator,
schemaFile.getName(nx),
schemaFile.getSerializer(nx),
schemaFile.getCompressor(nx),
@@ -1113,7 +1330,16 @@
}
/**
- * Reopen an already created BasicTable for writing. Excepiton will be
+ * a wrapper to support backward compatible constructor
+ */
+ public Writer(Path path, String btSchemaString, String btStorageString,
+ Configuration conf) throws IOException {
+ this(path, btSchemaString, btStorageString, null, null, conf);
+ }
+
+ /**
+ /**
+ * Reopen an already created BasicTable for writing. Exception will be
* thrown if the table is already closed, or is in the process of being
* closed.
*/
@@ -1122,6 +1348,7 @@
schemaFile = new SchemaFile(path, conf);
int numCGs = schemaFile.getNumOfPhysicalSchemas();
partition = schemaFile.getPartition();
+ sorted = schemaFile.isSorted();
colGroups = new ColumnGroup.Writer[numCGs];
cgTuples = new Tuple[numCGs];
for (int nx = 0; nx < numCGs; nx++) {
@@ -1167,8 +1394,8 @@
* make the table immutable.
*/
public void finish() throws IOException {
- if (closed) return;
- closed = true;
+ if (finished) return;
+ finished = true;
try {
for (int nx = 0; nx < colGroups.length; nx++) {
if (colGroups[nx] != null) {
@@ -1203,6 +1430,8 @@
public void close() throws IOException {
if (closed) return;
closed = true;
+ if (!finished)
+ finish();
try {
for (int nx = 0; nx < colGroups.length; nx++) {
if (colGroups[nx] != null) {
@@ -1237,6 +1466,22 @@
public Schema getSchema() {
return schemaFile.getLogical();
}
+
+ /**
+ * @return sortness
+ */
+ public boolean isSorted() {
+ return sorted;
+ }
+
+ /**
+ * Get the list of sorted columns.
+ * @return the list of sorted columns
+ */
+ public SortInfo getSortInfo()
+ {
+ return schemaFile.getSortInfo();
+ }
/**
* Get a inserter with a given name.
@@ -1369,7 +1614,7 @@
}
if (finishWriter) {
try {
- BasicTable.Writer.this.close();
+ BasicTable.Writer.this.finish();
}
catch (Exception e) {
// no-op
@@ -1401,6 +1646,7 @@
Schema[] physical;
Partition partition;
boolean sorted;
+ SortInfo sortInfo = null;
String storage;
CGSchema[] cgschemas;
@@ -1419,17 +1665,21 @@
}
// ctor for writing
- public SchemaFile(Path path, String btSchemaStr, String btStorageStr,
- String btComparator, boolean sorted, Configuration conf)
+ public SchemaFile(Path path, String btSchemaStr, String btStorageStr, String sortColumns,
+ String btComparator, Configuration conf)
throws IOException {
storage = btStorageStr;
- this.comparator = btComparator;
try {
- partition = new Partition(btSchemaStr, btStorageStr);
+ partition = new Partition(btSchemaStr, btStorageStr, btComparator, sortColumns);
}
catch (Exception e) {
throw new IOException("Partition constructor failed :" + e.getMessage());
}
+ this.sortInfo = partition.getSortInfo();
+ this.sorted = partition.isSorted();
+ this.comparator = (this.sortInfo == null ? null : this.sortInfo.getComparator());
+ if (this.comparator == null)
+ this.comparator = "";
logical = partition.getSchema();
cgschemas = partition.getCGSchemas();
physical = new Schema[cgschemas.length];
@@ -1437,7 +1687,7 @@
physical[nx] = cgschemas[nx].getSchema();
}
cgDeletedFlags = new boolean[physical.length];
- this.sorted = sorted;
+
version = SCHEMA_VERSION;
// write out the schema
@@ -1456,6 +1706,10 @@
return sorted;
}
+ public SortInfo getSortInfo() {
+ return sortInfo;
+ }
+
public Schema getLogical() {
return logical;
}
@@ -1480,9 +1734,9 @@
return cgschemas[nx].getCompressor();
}
- /**
- * Returns the index for CG with the given name.
- * -1 indicates that there is no CG with the name.
+ /**
+ * Returns the index for CG with the given name. -1 indicates that there is
+ * no CG with the name.
*/
int getCGByName(String cgName) {
for(int i=0; i<physical.length; i++) {
@@ -1538,6 +1792,15 @@
WritableUtils.writeString(outSchema, physical[nx].toString());
}
WritableUtils.writeVInt(outSchema, sorted ? 1 : 0);
+ WritableUtils.writeVInt(outSchema, sortInfo == null ? 0 : sortInfo.size());
+ if (sortInfo != null && sortInfo.size() > 0)
+ {
+ String[] sortedCols = sortInfo.getSortColumnNames();
+ for (int i = 0; i < sortInfo.size(); i++)
+ {
+ WritableUtils.writeString(outSchema, sortedCols[i]);
+ }
+ }
outSchema.close();
}
@@ -1566,7 +1829,7 @@
}
storage = WritableUtils.readString(in);
try {
- partition = new Partition(logicalStr, storage);
+ partition = new Partition(logicalStr, storage, comparator);
}
catch (Exception e) {
throw new IOException("Partition constructor failed :" + e.getMessage());
@@ -1589,9 +1852,48 @@
}
sorted = WritableUtils.readVInt(in) == 1 ? true : false;
setCGDeletedFlags(path, conf);
+ if (version.compareTo(new Version((short)1, (short)0)) > 0)
+ {
+ int numSortColumns = WritableUtils.readVInt(in);
+ if (numSortColumns > 0)
+ {
+ String[] sortColumnStr = new String[numSortColumns];
+ for (int i = 0; i < numSortColumns; i++)
+ {
+ sortColumnStr[i] = WritableUtils.readString(in);
+ }
+ sortInfo = SortInfo.parse(SortInfo.toSortString(sortColumnStr), logical, comparator);
+ }
+ }
in.close();
}
+ private static int getNumCGs(Path path, Configuration conf) throws IOException {
+ Path pathSchema = makeSchemaFilePath(path);
+ if (!path.getFileSystem(conf).exists(pathSchema)) {
+ throw new IOException("BT Schema file doesn't exist: " + pathSchema);
+ }
+ // read schema file
+ FSDataInputStream in = path.getFileSystem(conf).open(pathSchema);
+ Version version = new Version(in);
+ // verify compatibility against SCHEMA_VERSION
+ if (!version.compatibleWith(SCHEMA_VERSION)) {
+ new IOException("Incompatible versions, expecting: " + SCHEMA_VERSION
+ + "; found in file: " + version);
+ }
+
+ // read comparator
+ WritableUtils.readString(in);
+ // read logicalStr
+ WritableUtils.readString(in);
+ // read storage
+ WritableUtils.readString(in);
+ int numCGs = WritableUtils.readVInt(in);
+ in.close();
+
+ return numCGs;
+ }
+
private static Path makeSchemaFilePath(Path parent) {
return new Path(parent, BT_SCHEMA_FILE);
}
@@ -1607,23 +1909,24 @@
for (FileStatus file : path.getFileSystem(conf).listStatus(path)) {
if (!file.isDir()) {
- String fname = file.getPath().getName();
- if (fname.startsWith(DELETED_CG_PREFIX)) {
- deletedCGs.add(fname.substring(DELETED_CG_PREFIX.length()));
- }
+ String fname = file.getPath().getName();
+ if (fname.startsWith(DELETED_CG_PREFIX)) {
+ deletedCGs.add(fname.substring(DELETED_CG_PREFIX.length()));
+ }
}
}
for(int i=0; i<physical.length; i++) {
- cgDeletedFlags[i] =
- deletedCGs.contains(getName(i));
+ cgDeletedFlags[i] = deletedCGs.contains(getName(i));
}
}
+
+
}
static public void dumpInfo(String file, PrintStream out, Configuration conf)
throws IOException {
- dumpInfo(file, out, conf, 0);
+ dumpInfo(file, out, conf, 0);
}
static public void dumpInfo(String file, PrintStream out, Configuration conf, int indent)
@@ -1633,10 +1936,25 @@
Path path = new Path(file);
try {
BasicTable.Reader reader = new BasicTable.Reader(path, conf);
+ String schemaStr = reader.getBTSchemaString();
+ String storageStr = reader.getStorageString();
IOutils.indent(out, indent);
- out.printf("Schema : %s\n", reader.getBTSchemaString());
+ out.printf("Schema : %s\n", schemaStr);
IOutils.indent(out, indent);
- out.printf("Storage Information : %s\n", reader.getStorageString());
+ out.printf("Storage Information : %s\n", storageStr);
+ SortInfo sortInfo = reader.getSortInfo();
+ if (sortInfo != null && sortInfo.size() > 0)
+ {
+ IOutils.indent(out, indent);
+ String[] sortedCols = sortInfo.getSortColumnNames();
+ out.println("Sorted Columns :");
+ for (int nx = 0; nx < sortedCols.length; nx++) {
+ if (nx > 0)
+ out.printf(" , ");
+ out.printf("%s", sortedCols[nx]);
+ }
+ out.printf("\n");
+ }
IOutils.indent(out, indent);
out.println("Column Groups within the Basic Table :");
for (int nx = 0; nx < reader.colGroups.length; nx++) {
Modified: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/io/BasicTableStatus.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/io/BasicTableStatus.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/io/BasicTableStatus.java (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/io/BasicTableStatus.java Tue Nov 24 19:54:19 2009
@@ -22,7 +22,7 @@
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.file.tfile.Utils;
+import org.apache.hadoop.zebra.tfile.Utils;
/**
* Status of a BasicTable. The status may be reported under some projection.
Modified: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/io/ColumnGroup.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/io/ColumnGroup.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/io/ColumnGroup.java (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/io/ColumnGroup.java Tue Nov 24 19:54:19 2009
@@ -37,6 +37,8 @@
import java.util.Random;
import java.util.Set;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.permission.*;
import org.apache.hadoop.fs.BlockLocation;
@@ -48,10 +50,10 @@
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.file.tfile.TFile;
-import org.apache.hadoop.io.file.tfile.Utils;
-import org.apache.hadoop.io.file.tfile.ByteArray;
-import org.apache.hadoop.io.file.tfile.RawComparable;
+import org.apache.hadoop.zebra.tfile.TFile;
+import org.apache.hadoop.zebra.tfile.Utils;
+import org.apache.hadoop.zebra.tfile.ByteArray;
+import org.apache.hadoop.zebra.tfile.RawComparable;
import org.apache.hadoop.zebra.types.CGSchema;
import org.apache.hadoop.zebra.parser.ParseException;
import org.apache.hadoop.zebra.types.Partition;
@@ -78,6 +80,8 @@
* </ul>
*/
class ColumnGroup {
+ static Log LOG = LogFactory.getLog(ColumnGroup.class);
+
private final static String CONF_COMPRESS = "table.output.tfile.compression";
private final static String DEFAULT_COMPRESS = "gz";
private final static String CONF_MIN_BLOCK_SIZE = "table.tfile.minblock.size";
@@ -156,7 +160,10 @@
static CGIndex buildIndex(FileSystem fs, Path path, boolean dirty,
Configuration conf) throws IOException {
CGIndex ret = new CGIndex();
- FileStatus[] files = fs.listStatus(path, new CGPathFilter(conf));
+ CGPathFilter cgPathFilter = new CGPathFilter();
+ CGPathFilter.setConf(conf);
+ FileStatus[] files = fs.listStatus(path, cgPathFilter);
+
Comparator<RawComparable> comparator = null;
for (FileStatus f : files) {
if (dirty) {
@@ -194,9 +201,15 @@
}
ret.sort(comparator);
+
+ int idx = 0;
+ for (CGIndexEntry e : ret.getIndex()) {
+ e.setIndex(idx++);
+ }
+
return ret;
- }
-
+ }
+
/**
* ColumnGroup reader.
*/
@@ -266,7 +279,8 @@
}
projection = new Projection(cgschema.getSchema()); // default projection to CG schema.
Path metaFilePath = makeMetaFilePath(path);
- if (!fs.exists(metaFilePath)) {
+ /* If index file is not existing or loading from an unsorted table. */
+ if (!fs.exists(metaFilePath) || !cgschema.isSorted() ) {
// special case for unsorted CG that did not create index properly.
if (cgschema.isSorted()) {
throw new FileNotFoundException(
@@ -421,14 +435,31 @@
throw new IllegalArgumentException("Illegal range split");
}
- if (split.len == 0) {
- throw new IOException("Zero-length range split");
- }
-
return new CGScanner(split, closeReader);
}
/**
+ * Get a scanner that reads the rows defined by rowRange.
+ *
+ * @param closeReader
+ * close the underlying Reader object when we close the scanner.
+ * Should be set to true if we have only one scanner on top of the
+ * reader, so that we should release resources after the scanner is
+ * closed.
+ * @param rowSplit specifies part index, start row, and end row.
+ * @return A scanner object.
+ */
+ public synchronized TableScanner getScanner(boolean closeReader,
+ CGRowSplit rowSplit)
+ throws IOException, ParseException {
+ if (closed) {
+ throw new EOFException("Reader already closed");
+ }
+
+ return new CGScanner(rowSplit, closeReader);
+ }
+
+ /**
* Given a split range, calculate how the file data that fall into the range
* are distributed among hosts.
*
@@ -461,7 +492,69 @@
return ret;
}
+
+ /**
+ * Given a row range, calculate how the file data that fall into the range
+ * are distributed among hosts.
+ *
+ * @param split
+ * The row-based split. If null, return all blocks.
+ * @return a map from host name to the amount of data (in bytes) the host
+ * owns that fall roughly into the key range.
+ */
+ public BlockDistribution getBlockDistribution(CGRowSplit split)
+ throws IOException {
+ if (split == null) {
+ throw new IOException("Row-based split cannot be null for getBlockDistribution()");
+ }
+
+ BlockDistribution ret = new BlockDistribution();
+ CGIndexEntry entry = cgindex.get(split.fileIndex);
+ FileStatus tfileStatus = fs.getFileStatus(new Path(path, entry.getName()));
+
+ BlockLocation[] locations = fs.getFileBlockLocations(tfileStatus, split.startByte, split.numBytes);
+ for (BlockLocation l : locations) {
+ ret.add(l);
+ }
+
+ return ret;
+ }
+
+ /**
+ * Sets startRow and number of rows in rowSplit based on
+ * startOffset and length.
+ *
+ * It is assumed that 'startByte' and 'numBytes' in rowSplit itself
+ * are not valid.
+ */
+ void fillRowSplit(CGRowSplit rowSplit, long startOffset, long length)
+ throws IOException {
+
+ Path tfPath = new Path(path, cgindex.get(rowSplit.fileIndex).getName());
+ FileStatus tfile = fs.getFileStatus(tfPath);
+
+ TFile.Reader reader = null;
+
+ try {
+ reader = new TFile.Reader(fs.open(tfPath),
+ tfile.getLen(), conf);
+ long startRow = reader.getRecordNumNear(startOffset);
+ long endRow = reader.getRecordNumNear(startOffset + length);
+
+ if (endRow < startRow) {
+ endRow = startRow;
+ }
+
+ rowSplit.startRow = startRow;
+ rowSplit.numRows = endRow - startRow;
+ } finally {
+ if (reader != null) {
+ reader.close();
+ }
+ }
+ }
+
/**
* Get a sampling of keys and calculate how data are distributed among
* key-partitioned buckets. The implementation attempts to calculate all
@@ -637,6 +730,32 @@
}
/**
+ * We already use FileInputFormat to create byte offset-based input splits.
+ * Their information is encoded in starts, lengths and paths. This method is
+ * to wrap this information to form CGRowSplit objects at column group level.
+ *
+ * @param starts array of starting byte of fileSplits.
+ * @param lengths array of length of fileSplits.
+ * @param paths array of path of fileSplits.
+ * @return A list of CGRowSplit objects.
+ *
+ */
+ public List<CGRowSplit> rowSplit(long[] starts, long[] lengths, Path[] paths) throws IOException {
+ List<CGRowSplit> lst = new ArrayList<CGRowSplit>();
+
+ for (int i=0; i<starts.length; i++) {
+ long start = starts[i];
+ long length = lengths[i];
+ Path path = paths[i];
+ int idx = cgindex.getFileIndex(path);
+
+ lst.add(new CGRowSplit(idx, start, length));
+ }
+
+ return lst;
+ }
+
+ /**
* Is the ColumnGroup sorted?
*
* @return Whether the ColumnGroup is sorted.
@@ -663,8 +782,9 @@
TFile.Reader.Scanner scanner;
TupleReader tupleReader;
- TFileScanner(FileSystem fs, Path path, RawComparable begin,
- RawComparable end, CGSchema cgschema, Projection projection,
+ TFileScanner(FileSystem fs, Path path, CGRowSplit rowRange,
+ RawComparable begin, RawComparable end,
+ CGSchema cgschema, Projection projection,
Configuration conf) throws IOException, ParseException {
try {
ins = fs.open(path);
@@ -672,7 +792,17 @@
* compressor is inside cgschema
*/
reader = new TFile.Reader(ins, fs.getFileStatus(path).getLen(), conf);
- scanner = reader.createScanner(begin, end);
+ if (rowRange != null && rowRange.fileIndex >= 0) {
+ scanner = reader.createScannerByRecordNum(rowRange.startRow,
+ rowRange.startRow + rowRange.numRows);
+ } else {
+ /* using deprecated API just so that zebra can work with
+ * hadoop jar that does not contain HADOOP-6218 (Record ids for
+ * TFile). This is expected to be temporary. Later we should
+ * use the undeprecated API.
+ */
+ scanner = reader.createScanner(begin, end);
+ }
/*
* serializer is inside cgschema: different serializer will require
* different Reader: for pig, it's TupleReader
@@ -795,9 +925,29 @@
beginIndex = split.start;
endIndex = split.start + split.len;
}
- init(null, null, closeReader);
+ init(null, null, null, closeReader);
}
-
+
+ /**
+ * Scanner for a range specified by the given row range.
+ *
+ * @param rowRange see {@link CGRowSplit}
+ * @param closeReader
+ */
+ CGScanner(CGRowSplit rowRange, boolean closeReader)
+ throws IOException, ParseException {
+ beginIndex = 0;
+ endIndex = cgindex.size();
+ if (rowRange != null && rowRange.fileIndex>= 0) {
+ if (rowRange.fileIndex >= cgindex.size()) {
+ throw new IllegalArgumentException("Part Index is out of range.");
+ }
+ beginIndex = rowRange.fileIndex;
+ endIndex = beginIndex+1;
+ }
+ init(rowRange, null, null, closeReader);
+ }
+
CGScanner(RawComparable beginKey, RawComparable endKey,
boolean closeReader) throws IOException, ParseException {
beginIndex = 0;
@@ -811,11 +961,12 @@
++endIndex;
}
}
- init(beginKey, endKey, closeReader);
+ init(null, beginKey, endKey, closeReader);
}
- private void init(RawComparable beginKey, RawComparable endKey,
- boolean doClose) throws IOException, ParseException {
+ private void init(CGRowSplit rowRange, RawComparable beginKey,
+ RawComparable endKey, boolean doClose)
+ throws IOException, ParseException {
if (beginIndex > endIndex) {
throw new IllegalArgumentException("beginIndex > endIndex");
}
@@ -827,8 +978,9 @@
RawComparable begin = (i == beginIndex) ? beginKey : null;
RawComparable end = (i == endIndex - 1) ? endKey : null;
TFileScanner scanner =
- new TFileScanner(fs, cgindex.getPath(i, path), begin, end,
- cgschema, logicalSchema, conf);
+ new TFileScanner(fs, cgindex.getPath(i, path), rowRange,
+ begin, end,
+ cgschema, logicalSchema, conf);
// skip empty scanners.
if (!scanner.atEnd()) {
tmpScanners.add(scanner);
@@ -975,7 +1127,55 @@
Utils.writeVInt(out, len);
}
}
+
+ public static class CGRowSplit implements Writable {
+ int fileIndex = -1;
+ long startByte = -1;
+ long numBytes = -1;
+ long startRow = -1;
+ long numRows = -1;
+
+ CGRowSplit(int fileIdx, long start, long len) {
+ this.fileIndex = fileIdx;
+ this.startByte = start;
+ this.numBytes = len;
+ }
+
+ public CGRowSplit() {
+ // no-op;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("{fileIndex = " + fileIndex + "}\n");
+ sb.append("{startByte = " + startByte + "}\n");
+ sb.append("{numBytes = " + numBytes + "}\n");
+ sb.append("{startRow = " + startRow + "}\n");
+ sb.append("{numRows = " + numRows + "}\n");
+
+ return sb.toString();
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ fileIndex = Utils.readVInt(in);
+ startByte = Utils.readVLong(in);
+ numBytes = Utils.readVLong(in);
+ startRow = Utils.readVLong(in);
+ numRows = Utils.readVLong(in);
+ }
+ @Override
+ public void write(DataOutput out) throws IOException {
+ Utils.writeVInt(out, fileIndex);
+ Utils.writeVLong(out, startByte);
+ Utils.writeVLong(out, numBytes);
+ Utils.writeVLong(out, startRow);
+ Utils.writeVLong(out, numRows);
+ }
+ }
+
private static class SplitColumn {
SplitColumn(Partition.SplitType st) {
this.st = st;
@@ -1051,7 +1251,7 @@
Configuration conf;
FileSystem fs;
CGSchema cgschema;
- private boolean finished;
+ private boolean finished, closed;
/**
* Create a ColumnGroup writer. The semantics are as follows:
@@ -1095,11 +1295,25 @@
public Writer(Path path, String schema, boolean sorted, String name, String serializer,
String compressor, String owner, String group, short perm,boolean overwrite, Configuration conf)
throws IOException, ParseException {
- this(path, new Schema(schema), sorted, name, serializer, compressor, owner, group, perm, overwrite,
+ this(path, new Schema(schema), sorted, null, name, serializer, compressor, owner, group, perm, overwrite,
conf);
}
public Writer(Path path, Schema schema, boolean sorted, String name, String serializer,
+ String compressor, String owner, String group, short perm,boolean overwrite, Configuration conf)
+ throws IOException, ParseException {
+ this(path, schema, sorted, null, name, serializer, compressor, owner, group, perm, overwrite,
+ conf);
+ }
+
+ public Writer(Path path, String schema, boolean sorted, String comparator, String name, String serializer,
+ String compressor, String owner, String group, short perm,boolean overwrite, Configuration conf)
+ throws IOException, ParseException {
+ this(path, new Schema(schema), sorted, comparator, name, serializer, compressor, owner, group, perm, overwrite,
+ conf);
+ }
+
+ public Writer(Path path, Schema schema, boolean sorted, String comparator, String name, String serializer,
String compressor, String owner, String group, short perm, boolean overwrite, Configuration conf)
throws IOException, ParseException {
this.path = path;
@@ -1118,7 +1332,7 @@
checkPath(path, true);
- cgschema = new CGSchema(schema, sorted, name, serializer, compressor, owner, group, perm);
+ cgschema = new CGSchema(schema, sorted, comparator, name, serializer, compressor, owner, group, perm);
CGSchema sfNew = CGSchema.load(fs, path);
if (sfNew != null) {
// compare input with on-disk schema.
@@ -1162,7 +1376,10 @@
@Override
public void close() throws IOException {
if (!finished) {
- finished = true;
+ finish();
+ }
+ if (!closed) {
+ closed = true;
createIndex();
}
}
@@ -1196,13 +1413,23 @@
private void createIndex() throws IOException {
MetaFile.Writer metaFile =
MetaFile.createWriter(makeMetaFilePath(path), conf);
- CGIndex index = buildIndex(fs, path, false, conf);
- DataOutputStream dos = metaFile.createMetaBlock(BLOCK_NAME_INDEX);
- try {
- index.write(dos);
- }
- finally {
- dos.close();
+ if (cgschema.isSorted()) {
+ CGIndex index = buildIndex(fs, path, false, conf);
+ DataOutputStream dos = metaFile.createMetaBlock(BLOCK_NAME_INDEX);
+ try {
+ index.write(dos);
+ }
+ finally {
+ dos.close();
+ }
+ } else { /* Create an empty data meta file for unsorted table. */
+ DataOutputStream dos = metaFile.createMetaBlock(BLOCK_NAME_INDEX);
+ try {
+ Utils.writeString(dos, "");
+ }
+ finally {
+ dos.close();
+ }
}
metaFile.close();
}
@@ -1429,18 +1656,19 @@
* name, first and last key (inclusive) of a data file
*/
static class CGIndexEntry implements RawComparable, Writable {
+ int index;
String name;
long rows;
RawComparable firstKey;
RawComparable lastKey;
// for reading
- CGIndexEntry() {
+ public CGIndexEntry() {
// no-op
}
// for writing
- CGIndexEntry(String name, long rows, RawComparable firstKey,
+ public CGIndexEntry(String name, long rows, RawComparable firstKey,
RawComparable lastKey) {
this.name = name;
this.rows = rows;
@@ -1448,6 +1676,10 @@
this.lastKey = lastKey;
}
+ public int getIndex() {
+ return index;
+ }
+
public String getName() {
return name;
}
@@ -1463,6 +1695,10 @@
public RawComparable getLastKey() {
return lastKey;
}
+
+ void setIndex (int idx) {
+ this.index = idx;
+ }
@Override
public byte[] buffer() {
@@ -1525,6 +1761,16 @@
status = new BasicTableStatus();
index = new ArrayList<CGIndexEntry>();
}
+
+ int getFileIndex(Path path) throws IOException {
+ String filename = path.getName();
+ for (CGIndexEntry cgie : index) {
+ if (cgie.getName().equals(filename)) {
+ return cgie.getIndex();
+ }
+ }
+ throw new IOException("File " + filename + " is not in the column group index");
+ }
int size() {
return index.size();
@@ -1679,16 +1925,17 @@
}
}
- static class CGPathFilter implements PathFilter {
- private final Configuration conf;
-
- CGPathFilter(Configuration conf) {
- this.conf = conf;
+ public static class CGPathFilter implements PathFilter {
+ private static Configuration conf;
+
+ public static void setConf(Configuration c) {
+ conf = c;
}
public boolean accept(Path p) {
return p.getName().equals(META_FILE) || p.getName().equals(SCHEMA_FILE)
|| p.getName().startsWith(".tmp.")
+ || p.getName().startsWith("ttt")
|| p.getName().startsWith(getNonDataFilePrefix(conf)) ? false : true;
}
}
Modified: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/io/IOutils.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/io/IOutils.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/io/IOutils.java (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/io/IOutils.java Tue Nov 24 19:54:19 2009
@@ -21,7 +21,18 @@
import java.io.DataOutputStream;
import java.io.PrintStream;
+/**
+ * Helper for Zebra I/O
+ */
public class IOutils {
+ /**
+ * indent of some spaces
+ *
+ * @param os
+ * print stream the indent space to be inserted
+ * @param amount
+ * the number of spaces to be indented
+ */
public static void indent(PrintStream os, int amount)
{
for (int i = 0; i < amount; i++)
Modified: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/io/KeyDistribution.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/io/KeyDistribution.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/io/KeyDistribution.java (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/io/KeyDistribution.java Tue Nov 24 19:54:19 2009
@@ -23,7 +23,8 @@
import java.util.TreeMap;
import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.file.tfile.RawComparable;
+import org.apache.hadoop.zebra.tfile.RawComparable;
+import org.apache.hadoop.zebra.tfile.ByteArray;
/**
* Class used to convey the information of how on-disk data are distributed
@@ -149,7 +150,8 @@
* Get the block distribution of all data that maps to the key bucket.
*/
public BlockDistribution getBlockDistribution(BytesWritable key) {
- BlockDistribution bInfo = data.get(key);
+ ByteArray key0 = new ByteArray(key.get(), 0, key.getSize());
+ BlockDistribution bInfo = data.get(key0);
if (bInfo == null) {
throw new IllegalArgumentException("Invalid key");
}
Modified: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/io/MetaFile.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/io/MetaFile.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/io/MetaFile.java (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/io/MetaFile.java Tue Nov 24 19:54:19 2009
@@ -28,9 +28,9 @@
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.file.tfile.TFile;
-import org.apache.hadoop.io.file.tfile.MetaBlockAlreadyExists;
-import org.apache.hadoop.io.file.tfile.MetaBlockDoesNotExist;
+import org.apache.hadoop.zebra.tfile.TFile;
+import org.apache.hadoop.zebra.tfile.MetaBlockAlreadyExists;
+import org.apache.hadoop.zebra.tfile.MetaBlockDoesNotExist;
/**
Modified: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/io/TableScanner.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/io/TableScanner.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/io/TableScanner.java (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/io/TableScanner.java Tue Nov 24 19:54:19 2009
@@ -93,7 +93,6 @@
/**
* Get the projection's schema
- * @return
*/
public Schema getSchema();
}
Modified: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/io/package-info.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/io/package-info.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/io/package-info.java (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/io/package-info.java Tue Nov 24 19:54:19 2009
@@ -16,6 +16,6 @@
*/
/**
- * Physical I/O management of Hadoop Tables.
+ * Physical I/O management of Hadoop Zebra Tables.
*/
package org.apache.hadoop.zebra.io;
Modified: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/BasicTableOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/BasicTableOutputFormat.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/BasicTableOutputFormat.java (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/BasicTableOutputFormat.java Tue Nov 24 19:54:19 2009
@@ -30,8 +30,11 @@
import org.apache.hadoop.zebra.io.TableInserter;
import org.apache.hadoop.zebra.parser.ParseException;
import org.apache.hadoop.zebra.types.Partition;
+import org.apache.hadoop.zebra.types.SortInfo;
import org.apache.hadoop.zebra.schema.Schema;
import org.apache.pig.data.Tuple;
+import org.apache.hadoop.zebra.pig.comparator.*;
+
/**
* {@link org.apache.hadoop.mapred.OutputFormat} class for creating a
@@ -113,7 +116,10 @@
private static final String OUTPUT_SCHEMA = "mapred.lib.table.output.schema";
private static final String OUTPUT_STORAGEHINT =
"mapred.lib.table.output.storagehint";
- private static final String OUTPUT_SORTED = "mapred.lib.table.output.sorted";
+ private static final String OUTPUT_SORTCOLUMNS =
+ "mapred.lib.table.output.sortcolumns";
+ private static final String OUTPUT_COMPARATOR =
+ "mapred.lib.table.output.comparator";
/**
* Set the output path of the BasicTable in JobConf
@@ -174,10 +180,66 @@
return new Schema(schema);
}
+ private static KeyGenerator makeKeyBuilder(byte[] elems) {
+ ComparatorExpr[] exprs = new ComparatorExpr[elems.length];
+ for (int i = 0; i < elems.length; ++i) {
+ exprs[i] = ExprUtils.primitiveComparator(i, elems[i]);
+ }
+ return new KeyGenerator(ExprUtils.tupleComparator(exprs));
+ }
+
+ /**
+ * Generates a zebra specific sort key generator which is used to generate BytesWritable key
+ * Sort Key(s) are used to generate this object
+ *
+ * @param conf
+ * The JobConf object.
+ * @return Object of type zebra.pig.comaprator.KeyGenerator.
+ *
+ */
+ public static Object getSortKeyGenerator(JobConf conf) throws IOException, ParseException {
+
+ SortInfo sortInfo = getSortInfo(conf);
+ Schema schema = getSchema(conf);
+ String[] sortColNames = sortInfo.getSortColumnNames();
+
+ byte[] types = new byte[sortColNames.length];
+ for(int i =0 ; i < sortColNames.length; ++i){
+ types[i] = schema.getColumn(sortColNames[i]).getType().pigDataType();
+ }
+ KeyGenerator builder = makeKeyBuilder(types);
+ return builder;
+
+ }
+
+
+ /**
+ * Generates a BytesWritable key for the input key
+ * using keygenerate provided. Sort Key(s) are used to generate this object
+ *
+ * @param builder
+ * Opaque key generator created by getSortKeyGenerator() method
+ * @param t
+ * Tuple to create sort key from
+ * @return ByteWritable Key
+ *
+ */
+ public static BytesWritable getSortKey(Object builder, Tuple t) throws Exception {
+ KeyGenerator kg = (KeyGenerator) builder;
+ return kg.generateKey(t);
+ }
+
+
+
+
/**
* Set the table storage hint in JobConf, should be called after setSchema is
* called.
+ * <br> <br>
*
+ * Note that the "secure by" feature is experimental now and subject to
+ * changes in the future.
+ *
* @param conf
* The JobConf object.
* @param storehint
@@ -194,7 +256,7 @@
throw new ParseException("Schema has not been set");
// for sanity check purpose only
- Partition partition = new Partition(schema, storehint);
+ Partition partition = new Partition(schema, storehint, null);
conf.set(OUTPUT_STORAGEHINT, storehint);
}
@@ -214,34 +276,84 @@
}
/**
- * Set sorted-ness of the table. It is disabled now (by making it package
- * private). So only unsorted BasicTables may be created for now.
- *
- * TODO: must also allow users to specify customized comparator.
+ * Set the sort info
+ *
+ * @param conf
+ * The JobConf object.
+ *
+ * @param sortColumns
+ * Comma-separated sort column names
+ *
+ * @param comparator
+ * comparator class name; null for default
+ *
+ */
+ public static void setSortInfo(JobConf conf, String sortColumns, String comparator) {
+ conf.set(OUTPUT_SORTCOLUMNS, sortColumns);
+ conf.set(OUTPUT_COMPARATOR, comparator);
+ }
+
+ /**
+ * Set the sort info
+ *
+ * @param conf
+ * The JobConf object.
+ *
+ * @param sortColumns
+ * Comma-separated sort column names
*/
- public static void setSorted(JobConf conf, boolean sorted) {
- conf.setBoolean(OUTPUT_SORTED, sorted);
+ public static void setSortInfo(JobConf conf, String sortColumns) {
+ conf.set(OUTPUT_SORTCOLUMNS, sortColumns);
+ }
+
+ /**
+ * Get the SortInfo object
+ *
+ * @param conf
+ * The JobConf object.
+ * @return SortInfo object; null if the Zebra table is unsorted
+ *
+ */
+ public static SortInfo getSortInfo(JobConf conf)throws IOException
+ {
+ String sortColumns = conf.get(OUTPUT_SORTCOLUMNS);
+ if (sortColumns == null)
+ return null;
+ Schema schema = null;
+ try {
+ schema = getSchema(conf);
+ } catch (ParseException e) {
+ throw new IOException("Schema parsing failure : "+e.getMessage());
+ }
+ if (schema == null)
+ throw new IOException("Schema not defined");
+ String comparator = getComparator(conf);
+ return SortInfo.parse(sortColumns, schema, comparator);
}
/**
- * Is the table to be created should be sorted? It is disabled now (by making
- * it package private).
+ * Get the comparator for sort columns
+ *
+ * @param conf
+ * The JobConf object.
+ * @return comparator String
+ *
*/
- static boolean getSorted(JobConf conf) {
- return conf.getBoolean(OUTPUT_SORTED, false);
+ private static String getComparator(JobConf conf)
+ {
+ return conf.get(OUTPUT_COMPARATOR);
}
/**
* Get the output table as specified in JobConf. It is useful for applications
* to add more meta data after all rows have been added to the table.
- * Currently it is disabled (by setting it to package private).
*
* @param conf
* The JobConf object.
* @return The output BasicTable.Writer object.
* @throws IOException
*/
- public static BasicTable.Writer getOutput(JobConf conf) throws IOException {
+ private static BasicTable.Writer getOutput(JobConf conf) throws IOException {
String path = conf.get(OUTPUT_PATH);
if (path == null) {
throw new IllegalArgumentException("Cannot find output path");
@@ -268,16 +380,17 @@
if (schema == null) {
throw new IllegalArgumentException("Cannot find output schema");
}
- String storehint;
+ String storehint, sortColumns, comparator;
try {
storehint = getStorageHint(conf);
+ sortColumns = (getSortInfo(conf) == null ? null : SortInfo.toSortString(getSortInfo(conf).getSortColumnNames()));
+ comparator = getComparator(conf);
}
catch (ParseException e) {
throw new IOException(e);
}
BasicTable.Writer writer =
- new BasicTable.Writer(new Path(path), schema, storehint,
- getSorted(conf), conf); // will
+ new BasicTable.Writer(new Path(path), schema, storehint, sortColumns, comparator, conf);
writer.finish();
}
@@ -299,14 +412,13 @@
/**
* Close the output BasicTable, No more rows can be added into the table. A
- * BasicTable is not visible for reading until it is "closed". This call is
- * required for sorted TFile, but not required for unsorted TFile.
+ * BasicTable is not visible for reading until it is "closed".
*
* @param conf
* The JobConf object.
* @throws IOException
*/
- static void close(JobConf conf) throws IOException {
+ public static void close(JobConf conf) throws IOException {
BasicTable.Writer table = getOutput(conf);
table.close();
}
Modified: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/CachedTableScanner.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/CachedTableScanner.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/CachedTableScanner.java (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/CachedTableScanner.java Tue Nov 24 19:54:19 2009
@@ -36,6 +36,7 @@
private Tuple row;
private boolean keyReady;
private boolean rowReady;
+ private int index;
private TableScanner scanner;
/**
@@ -45,11 +46,12 @@
* The scanner to be encapsulated
* @throws IOException
*/
- public CachedTableScanner(TableScanner scanner) throws IOException {
+ public CachedTableScanner(TableScanner scanner, int index) throws IOException {
key = new BytesWritable();
row = TypesUtils.createTuple(Projection.getNumColumns(scanner.getProjection()));
keyReady = false;
rowReady = false;
+ this.index = index;
this.scanner = scanner;
}
@@ -82,6 +84,15 @@
}
/**
+ * Get the table index in a union
+ *
+ * @return the table index in union
+ */
+ public int getIndex() {
+ return index;
+
+ }
+ /**
* Seek to a row whose key is greater than or equal to the input key.
*
* @param inKey
Modified: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableExpr.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableExpr.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableExpr.java (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableExpr.java Tue Nov 24 19:54:19 2009
@@ -34,6 +34,7 @@
* Table Expression - expression to describe an input table.
*/
abstract class TableExpr {
+ private boolean sorted = false;
/**
* Factory method to create a TableExpr from a string.
*
@@ -132,6 +133,26 @@
}
/**
+ * Get a scanner with an unsorted split.
+ *
+ * @param split
+ * The range split.
+ * @param projection
+ * The projection schema. It should never be null.
+ * @param conf
+ * The configuration
+ * @return A table scanner.
+ * @throws IOException
+ */
+ public TableScanner getScanner(RowTableSplit split, String projection,
+ Configuration conf) throws IOException, ParseException, ParseException {
+ BasicTable.Reader reader =
+ new BasicTable.Reader(new Path(split.getPath()), conf);
+ reader.setProjection(projection);
+ return reader.getScanner(true, split.getSplit());
+ }
+
+ /**
* A leaf table corresponds to a materialized table. It is represented by the
* path to the BasicTable and the projection.
*/
@@ -179,7 +200,14 @@
* @return Whether this expression may only be split by key.
*/
public boolean sortedSplitRequired() {
- return false;
+ return sorted;
+ }
+
+ /**
+ * Set the requirement for sorted table
+ */
+ public void setSortedSplit() {
+ sorted = true;
}
/**