You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ya...@apache.org on 2010/03/22 07:26:42 UTC
svn commit: r925971 - in /hadoop/pig/trunk/contrib/zebra: ./
src/java/org/apache/hadoop/zebra/io/ src/java/org/apache/hadoop/zebra/mapred/
src/java/org/apache/hadoop/zebra/mapreduce/
src/java/org/apache/hadoop/zebra/tfile/ src/test/org/apache/hadoop/ze...
Author: yanz
Date: Mon Mar 22 06:26:41 2010
New Revision: 925971
URL: http://svn.apache.org/viewvc?rev=925971&view=rev
Log:
PIG-1258 Number of sorted input splits is unusually high (yanz)
Modified:
hadoop/pig/trunk/contrib/zebra/CHANGES.txt
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/BasicTable.java
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/BlockDistribution.java
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/ColumnGroup.java
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/KeyDistribution.java
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableInputFormat.java
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableRecordReader.java
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableInputFormat.java
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/tfile/TFile.java
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestBasicTable.java
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroup.java
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupWithWorkPath.java
Modified: hadoop/pig/trunk/contrib/zebra/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/CHANGES.txt?rev=925971&r1=925970&r2=925971&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/CHANGES.txt (original)
+++ hadoop/pig/trunk/contrib/zebra/CHANGES.txt Mon Mar 22 06:26:41 2010
@@ -66,6 +66,8 @@ Trunk (unreleased changes)
BUG FIXES
+ PIG-1258 Number of sorted input splits is unusually high (yanz)
+
PIG-1269 Restrict schema definition for collection (xuefuz via yanz)
PIG-1253: make map/reduce test cases run on real cluster (chaow via yanz)
Modified: hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/BasicTable.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/BasicTable.java?rev=925971&r1=925970&r2=925971&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/BasicTable.java (original)
+++ hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/BasicTable.java Mon Mar 22 06:26:41 2010
@@ -44,7 +44,6 @@ import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
-import org.apache.hadoop.zebra.tfile.RawComparable;
import org.apache.hadoop.zebra.tfile.TFile;
import org.apache.hadoop.zebra.tfile.Utils;
import org.apache.hadoop.zebra.tfile.MetaBlockAlreadyExists;
@@ -474,21 +473,18 @@ public class BasicTable {
*
* @param n
* Targeted size of the sampling.
+ * @param nTables
+ * Number of tables in union
* @return KeyDistribution object.
* @throws IOException
*/
- public KeyDistribution getKeyDistribution(int n) throws IOException {
- KeyDistribution kd =
- new KeyDistribution(TFile.makeComparator(schemaFile.getComparator()));
- for (int nx = 0; nx < colGroups.length; nx++) {
- if (!isCGDeleted(nx)) {
- kd.add(colGroups[nx].getKeyDistribution(n));
- }
- }
- if (n >= 0 && kd.size() > (int) (n * 1.5)) {
- kd.resize(n);
+ public KeyDistribution getKeyDistribution(int n, int nTables, BlockDistribution lastBd) throws IOException {
+ if (firstValidCG >= 0)
+ {
+ // pick the largest CG as in the row split case
+ return colGroups[getRowSplitCGIndex()].getKeyDistribution(n, nTables, lastBd);
}
- return kd;
+ return null;
}
/**
@@ -650,7 +646,8 @@ public class BasicTable {
* construct a TableScanner later.
*
*/
- public List<RowSplit> rowSplit(long[] starts, long[] lengths, Path[] paths, int splitCGIndex, int[] batchSizes, int numBatches) throws IOException {
+ public List<RowSplit> rowSplit(long[] starts, long[] lengths, Path[] paths,
+ int splitCGIndex, int[] batchSizes, int numBatches) throws IOException {
List<RowSplit> ret;
List<CGRowSplit> cgSplits = colGroups[splitCGIndex].rowSplit(starts, lengths, paths, batchSizes, numBatches);
int numSlices = cgSplits.size();
@@ -679,6 +676,7 @@ public class BasicTable {
*/
public int getRowSplitCGIndex() throws IOException {
// Try to find the largest non-deleted and used column group by projection;
+ // Try to find the largest non-deleted and used column group by projection;
if (rowSplitCGIndex == -1)
{
int largestCGIndex = -1;
@@ -702,7 +700,7 @@ public class BasicTable {
rowSplitCGIndex = largestCGIndex;
} else if (firstValidCG >= 0) { /* If all projection columns are either deleted or non-existing,
then we use the first non-deleted column group to do split if it exists. */
- rowSplitCGIndex = firstValidCG;
+ rowSplitCGIndex = firstValidCG;
}
}
return rowSplitCGIndex;
@@ -844,8 +842,7 @@ public class BasicTable {
* A row-based split on the zebra table;
*/
public static class RowSplit implements Writable {
-
- int cgIndex; // column group index where split lies on;
+ int cgIndex; // column group index where split lies on;
CGRowSplit slice;
RowSplit(int cgidx, CGRowSplit split) {
@@ -931,7 +928,7 @@ public class BasicTable {
Partition partition) throws IOException {
init(rowSplit, null, null, null, closeReader, partition);
}
-
+
/**
* Creates new CGRowSplit. If the startRow in rowSplit is not set
* (i.e. < 0), it sets the startRow and numRows based on 'startByte'
@@ -943,12 +940,11 @@ public class BasicTable {
int cgIdx = rowSplit.getCGIndex();
CGRowSplit cgSplit = new CGRowSplit();
-
+
// Find the row range :
if (isCGDeleted(cgIdx)) {
throw new IOException("CG " + cgIdx + " is deleted.");
}
-
//fill the row numbers.
colGroups[cgIdx].fillRowSplit(cgSplit, inputCGSplit);
return cgSplit;
Modified: hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/BlockDistribution.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/BlockDistribution.java?rev=925971&r1=925970&r2=925971&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/BlockDistribution.java (original)
+++ hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/BlockDistribution.java Mon Mar 22 06:26:41 2010
@@ -39,10 +39,10 @@ public class BlockDistribution {
private long uniqueBytes;
private Map<String, Long> dataDistri; // map from host names to bytes.
- BlockDistribution() {
+ public BlockDistribution() {
dataDistri = new HashMap<String, Long>();
}
-
+
void add(long bytes, Map<String, Long> distri) {
this.uniqueBytes += bytes;
reduceDataDistri(dataDistri, distri);
@@ -58,7 +58,7 @@ public class BlockDistribution {
lv.put(key, (sum == null) ? delta : sum + delta);
}
}
-
+
void add(BlockLocation blkLocation) throws IOException {
long blkLen = blkLocation.getLength();
Map<String, Long> tmp = new HashMap<String, Long>();
Modified: hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/ColumnGroup.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/ColumnGroup.java?rev=925971&r1=925970&r2=925971&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/ColumnGroup.java (original)
+++ hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/ColumnGroup.java Mon Mar 22 06:26:41 2010
@@ -90,7 +90,7 @@ class ColumnGroup {
private final static String CONF_MIN_SPLIT_SIZE = "table.input.split.minSize";
private final static int DEFAULT_MIN_SPLIT_SIZE = 64 * 1024;
- private static final double SPLIT_SLOP = 1.1; // 10% slop
+ static final double SPLIT_SLOP = 1.1; // 10% slop
// excluding files start with the following prefix, may change to regex
private final static String CONF_NON_DATAFILE_PREFIX =
@@ -102,6 +102,9 @@ class ColumnGroup {
// meta data TFile for entire CG, used as a flag of closed CG
final static String META_FILE = ".meta";
+ // sorted table key ranges for default sorted table split generations
+ private final static String KEY_RANGE_FOR_DEFAULT_SORTED_SPLIT = ".keyrange";
+
static final String BLOCK_NAME_INDEX = "ColumnGroup.index";
static Path makeMetaFilePath(Path parent) {
@@ -527,23 +530,50 @@ class ColumnGroup {
FileStatus tfileStatus = fs.getFileStatus(new Path(path, split.names[i]));
BlockLocation[] locations = null;
+ long len = 0;
if (i == 0) {
- if (split.startByteFirst != -1)
- locations = fs.getFileBlockLocations(tfileStatus, split.startByteFirst, split.numBytesFirst);
+ if (split.startByteFirst != -1)
+ {
+ len = split.numBytesFirst;
+ locations = fs.getFileBlockLocations(tfileStatus, split.startByteFirst, len);
+ }
} else if (i == split.length - 1) {
- if (split.startByteLast != -1)
- locations = fs.getFileBlockLocations(tfileStatus, split.startByteLast, split.numBytesLast);
+ if (split.numBytesLast != -1)
+ {
+ len = split.numBytesLast;
+ locations = fs.getFileBlockLocations(tfileStatus, 0, len);
+ }
}
+
if (locations == null)
- locations = fs.getFileBlockLocations(tfileStatus, 0, tfileStatus.getLen());
+ {
+ len = tfileStatus.getLen();
+ locations = fs.getFileBlockLocations(tfileStatus, 0, len);
+ }
for (BlockLocation l : locations) {
ret.add(l);
}
- }
+ }
return ret;
}
+ private int getStartBlockIndex(long[] startOffsets, long offset)
+ {
+ int index = Arrays.binarySearch(startOffsets, offset);
+ if (index < 0)
+ index = -index - 2;
+ return index;
+ }
+
+ private int getEndBlockIndex(long[] startOffsets, long offset)
+ {
+ int index = Arrays.binarySearch(startOffsets, offset);
+ if (index < 0)
+ index = -index - 1;
+ return index;
+ }
+
/**
* Sets startRow and number of rows in rowSplit based on
* startOffset and length.
@@ -551,7 +581,7 @@ class ColumnGroup {
* It is assumed that 'startByte' and 'numBytes' in rowSplit itself
* are not valid.
*/
- void fillRowSplit(CGRowSplit rowSplit, CGRowSplit src)throws IOException {
+ void fillRowSplit(CGRowSplit rowSplit, CGRowSplit src) throws IOException {
if (src.names == null || src.length == 0)
return;
@@ -570,7 +600,6 @@ class ColumnGroup {
rowSplit.length = src.length;
rowSplit.startByteFirst = src.startByteFirst;
rowSplit.numBytesFirst = src.numBytesFirst;
- rowSplit.startByteLast = src.startByteLast;
rowSplit.numBytesLast = src.numBytesLast;
Path firstPath = null, lastPath;
@@ -600,7 +629,7 @@ class ColumnGroup {
throw e;
}
}
- if (src.startByteLast != -1 && rowSplit.length > 1)
+ if (src.numBytesLast != -1 && rowSplit.length > 1)
{
lastPath = new Path(path, rowSplit.names[rowSplit.length - 1]);
if (reader == null || !firstPath.equals(lastPath))
@@ -617,13 +646,8 @@ class ColumnGroup {
reader = new TFile.Reader(fs.open(lastPath), size, conf);
}
try {
- long startRow = reader.getRecordNumNear(src.startByteLast);
- long endRow = reader.getRecordNumNear(src.startByteLast + src.numBytesLast);
-
- if (endRow < startRow)
- endRow = startRow;
- rowSplit.startRowLast = startRow;
- rowSplit.numRowsLast = endRow - startRow;
+ long endRow = reader.getRecordNumNear(src.numBytesLast);
+ rowSplit.numRowsLast = endRow;
} catch (IOException e) {
reader.close();
throw e;
@@ -642,15 +666,45 @@ class ColumnGroup {
*
* @param n
* Targeted size of the sampling.
+ * @param nTables
+ * Number of tables in a union
* @return KeyDistribution object.
* @throws IOException
*/
- public KeyDistribution getKeyDistribution(int n) throws IOException {
+ public KeyDistribution getKeyDistribution(int n, int nTables, BlockDistribution lastBd) throws IOException {
// TODO: any need for similar capability for unsorted for sorted CGs?
if (!isSorted()) {
throw new IOException("Cannot get key distribution for unsorted table");
}
KeyDistribution ret = new KeyDistribution(comparator);
+
+ if (n < 0)
+ {
+ /*
+ Path keyRangeFile = new Path(path, KEY_RANGE_FOR_DEFAULT_SORTED_SPLIT);
+ if (fs.exists(keyRangeFile))
+ {
+ try {
+ FSDataInputStream ins = fs.open(keyRangeFile);
+ long minStepSize = ins.readLong();
+ int size = ins.readInt();
+ for (int i = 0; i < size; i++)
+ {
+ BytesWritable keyIn = new BytesWritable();
+ keyIn.readFields(ins);
+ ByteArray key = new ByteArray(keyIn.getBytes());
+ ret.add(key);
+ }
+ ret.setMinStepSize(minStepSize);
+ return ret;
+ } catch (Exception e) {
+ // no-op
+ }
+ }
+ */
+ n = 1;
+ }
+
Path[] paths = new Path[cgindex.size()];
FileStatus[] tfileStatus = new FileStatus[paths.length];
long totalBytes = 0;
@@ -659,48 +713,73 @@ class ColumnGroup {
tfileStatus[i] = fs.getFileStatus(paths[i]);
totalBytes += tfileStatus[i].getLen();
}
- // variable.
- final long EPSILON = (long) (getMinSplitSize(conf) * (SPLIT_SLOP - 1));
+ final long minSize = getMinSplitSize(conf);
+ final long EPSILON = (long) (minSize * (SPLIT_SLOP - 1));
long goalSize = totalBytes / n;
- goalSize = Math.max(getMinSplitSize(conf), goalSize);
+ long batchSize = 0;
+ BlockDistribution bd = new BlockDistribution();;
+ RawComparable prevKey = null;
+
+ long minStepSize = -1;
+ FSDataInputStream nextFsdis = null;
+ TFile.Reader nextReader = null;
for (int i = 0; i < paths.length; ++i) {
FileStatus fstatus = tfileStatus[i];
long blkSize = fstatus.getBlockSize();
long fileLen = fstatus.getLen();
- long stepSize =
- (goalSize > blkSize) ? goalSize / blkSize * blkSize : blkSize
- / (blkSize / goalSize);
+ long stepSize = Math.max(minSize,
+ (goalSize < blkSize) ? goalSize : blkSize);
+ if (minStepSize== -1 || minStepSize > stepSize)
+ minStepSize = stepSize;
+ // adjust the block size by the scaling factor
+ blkSize /= nTables;
+ stepSize = Math.max(minSize,
+ (goalSize < blkSize) ? goalSize : blkSize);
FSDataInputStream fsdis = null;
TFile.Reader reader = null;
long remainLen = fileLen;
- boolean done = false;
try {
- fsdis = fs.open(paths[i]);
- reader = new TFile.Reader(fsdis, tfileStatus[i].getLen(), conf);
+ if (nextReader == null)
+ {
+ fsdis = fs.open(paths[i]);
+ reader = new TFile.Reader(fsdis, fileLen, conf);
+ } else {
+ fsdis = nextFsdis;
+ reader = nextReader;
+ }
+ BlockLocation[] locations =
+ fs.getFileBlockLocations(fstatus, 0, fileLen);
+ if (locations.length == 0) {
+ throw new AssertionError(
+ "getFileBlockLocations returns 0 location");
+ }
+
+ Arrays.sort(locations, new Comparator<BlockLocation>() {
+ @Override
+ public int compare(BlockLocation o1, BlockLocation o2) {
+ long diff = o1.getOffset() - o2.getOffset();
+ if (diff < 0) return -1;
+ if (diff > 0) return 1;
+ return 0;
+ }
+ });
+
+ long[] startOffsets = new long[locations.length];
+
+ for (int ii = 0; ii < locations.length; ii++)
+ startOffsets[ii] = locations[ii].getOffset();
+
+ boolean done = false;
while ((remainLen > 0) && !done) {
long splitBytes =
- (remainLen > stepSize * SPLIT_SLOP) ? stepSize : remainLen;
+ remainLen > stepSize ? stepSize : remainLen;
long offsetBegin = fileLen - remainLen;
long offsetEnd = offsetBegin + splitBytes;
- BlockLocation[] locations =
- fs.getFileBlockLocations(fstatus, offsetBegin, splitBytes);
- if (locations.length == 0) {
- throw new AssertionError(
- "getFileBlockLocations returns 0 location");
- }
-
- Arrays.sort(locations, new Comparator<BlockLocation>() {
- @Override
- public int compare(BlockLocation o1, BlockLocation o2) {
- long diff = o1.getOffset() - o2.getOffset();
- if (diff < 0) return -1;
- if (diff > 0) return 1;
- return 0;
- }
- });
- BlockLocation firstBlock = locations[0];
- BlockLocation lastBlock = locations[locations.length - 1];
+ int indexBegin = getStartBlockIndex(startOffsets, offsetBegin);
+ int indexEnd = getEndBlockIndex(startOffsets, offsetEnd);
+ BlockLocation firstBlock = locations[indexBegin];
+ BlockLocation lastBlock = locations[indexEnd-1];
long lastBlockOffsetBegin = lastBlock.getOffset();
long lastBlockOffsetEnd =
lastBlockOffsetBegin + lastBlock.getLength();
@@ -719,6 +798,7 @@ class ColumnGroup {
// only if this is not the last chunk
offsetEnd = lastBlockOffsetBegin;
splitBytes = offsetEnd - offsetBegin;
+ indexEnd--;
}
}
else if ((lastBlockOffsetEnd > offsetEnd)
@@ -732,22 +812,44 @@ class ColumnGroup {
if (key == null) {
offsetEnd = fileLen;
splitBytes = offsetEnd - offsetBegin;
- key = reader.getLastKey();
+ if (i < paths.length-1)
+ {
+ nextFsdis = fs.open(paths[i+1]);
+ nextReader = new TFile.Reader(nextFsdis, tfileStatus[i+1].getLen(), conf);
+ key = nextReader.getFirstKey();
+ }
done = true; // TFile index too large? Is it necessary now?
}
remainLen -= splitBytes;
+ batchSize += splitBytes;
- BlockDistribution bd = new BlockDistribution();
- for (BlockLocation l : locations) {
- long blkBeginOffset = l.getOffset();
- long blkEndOffset = blkBeginOffset + l.getLength();
- if (blkBeginOffset < offsetBegin) blkBeginOffset = offsetBegin;
- if (blkEndOffset > offsetEnd) blkEndOffset = offsetEnd;
- if (blkEndOffset > blkBeginOffset) {
- bd.add(l, blkEndOffset - blkBeginOffset);
+ if (key != null && batchSize >= stepSize)
+ {
+ if (batchSize - splitBytes < EPSILON || splitBytes < EPSILON)
+ {
+ // the last chunk or this chunk is small enough to create a new range for this key
+ setBlockDistribution(bd, reader, locations, fstatus, startOffsets, prevKey, key);
+ ret.add(key, bd);
+ batchSize = 0;
+ bd = new BlockDistribution();
+ } else {
+ ret.add(prevKey, bd);
+ batchSize = splitBytes;
+ bd = new BlockDistribution();
+ if (batchSize >= stepSize)
+ {
+ setBlockDistribution(bd, reader, locations, fstatus, startOffsets, prevKey, key);
+ ret.add(key, bd);
+ batchSize = 0;
+ bd = new BlockDistribution();
+ } else {
+ setBlockDistribution(bd, reader, locations, fstatus, startOffsets, prevKey, key);
+ }
}
+ } else {
+ setBlockDistribution(bd, reader, locations, fstatus, startOffsets, prevKey, key);
}
- ret.add(key, bd);
+ prevKey = key;
}
}
finally {
@@ -769,10 +871,47 @@ class ColumnGroup {
}
}
}
-
+ if (lastBd != null)
+ lastBd.add(bd);
+ ret.setMinStepSize(minStepSize);
+
return ret;
}
+ private void setBlockDistribution(BlockDistribution bd, TFile.Reader reader,
+ BlockLocation[] locations, FileStatus fileStatus, long[] startOffsets,
+ RawComparable begin, RawComparable end) throws IOException
+ {
+ long beginOffset, endOffset = -1;
+ if (begin == null)
+ beginOffset = 0;
+ else
+ beginOffset = reader.getOffsetForKey(begin);
+ if (end != null)
+ {
+ if (begin == null)
+ begin = reader.getFirstKey();
+ /* Only if the key range is empty. This is needed because TFile has a 16-byte
+ * Magic that causes getOffsetForKey to return 16 (not 0) even on the first key.
+ */
+ if (comparator.compare(begin, end) != 0)
+ endOffset = reader.getOffsetForKey(end);
+ }
+ int startBlockIndex = (beginOffset == 0 ? 0 : getStartBlockIndex(startOffsets, beginOffset));
+ BlockLocation l;
+ int endBlockIndex = (end == null ? locations.length : endOffset == -1 ?
+ startBlockIndex : getEndBlockIndex(startOffsets, endOffset));
+ for (int ii = startBlockIndex; ii < endBlockIndex; ii++) {
+ l = locations[ii];
+ long blkBeginOffset = l.getOffset();
+ long blkEndOffset = blkBeginOffset + l.getLength();
+ if (blkEndOffset > blkBeginOffset) {
+ bd.add(l, blkEndOffset - blkBeginOffset);
+ }
+ }
+ return;
+ }
+
/**
* Get the status of the ColumnGroup.
*/
@@ -807,7 +946,6 @@ class ColumnGroup {
lst.add(new CGRangeSplit(beginIndex, endIndex - beginIndex));
beginIndex = endIndex;
}
-
return lst;
}
@@ -822,21 +960,35 @@ class ColumnGroup {
* @return A list of CGRowSplit objects.
*
*/
- public List<CGRowSplit> rowSplit(long[] starts, long[] lengths, Path[] paths, int[] batches, int numBatches) throws IOException {
+ public List<CGRowSplit> rowSplit(long[] starts, long[] lengths, Path[] paths,
+ int[] batches, int numBatches) throws IOException {
List<CGRowSplit> lst = new ArrayList<CGRowSplit>();
CGRowSplit cgRowSplit;
- long startFirst, bytesFirst, startLast, bytesLast;
+ long startFirst, bytesFirst, bytesLast;
int length;
- if (numBatches > 0 && cgindex == null)
+ if (numBatches == 0)
+ {
+ cgRowSplit = new CGRowSplit(null, null, 0, -1, 0, 0);
+ lst.add(cgRowSplit);
+ return lst;
+ }
+
+ if (cgindex == null)
cgindex = buildIndex(fs, this.path, dirty, conf);
+ if (cgindex.size() == 0)
+ {
+ cgRowSplit = new CGRowSplit(null, null, 0, -1, 0, 0);
+ lst.add(cgRowSplit);
+ return lst;
+ }
+
for (int i=0; i< numBatches; i++) {
int indexFirst = batches[i];
int indexLast = batches[i+1] - 1;
startFirst = starts[indexFirst];
bytesFirst = lengths[indexFirst];
- startLast = starts[indexLast];
bytesLast = lengths[indexLast];
length = batches[i+1] - batches[i];
String[] namesInSplit = new String[length];
@@ -846,8 +998,8 @@ class ColumnGroup {
namesInSplit[j] = paths[indexFirst+j].getName();
sizesInSplit[j] = cgindex.get(cgindex.getFileIndex(paths[indexFirst+j])).bytes;
}
- cgRowSplit = new CGRowSplit(namesInSplit, sizesInSplit, fs, conf, length,
- startFirst, bytesFirst, startLast, bytesLast);
+ cgRowSplit = new CGRowSplit(namesInSplit, sizesInSplit, length,
+ startFirst, bytesFirst, bytesLast);
lst.add(cgRowSplit);
}
@@ -912,9 +1064,8 @@ class ColumnGroup {
if (first && rowRange.startByteFirst != -1)
scanner = reader.createScannerByRecordNum(rowRange.startRowFirst,
rowRange.startRowFirst + rowRange.numRowsFirst);
- else if (last && rowRange.startByteLast != -1)
- scanner = reader.createScannerByRecordNum(rowRange.startRowLast,
- rowRange.startRowLast + rowRange.numRowsLast);
+ else if (last && rowRange.numBytesLast != -1)
+ scanner = reader.createScannerByRecordNum(0, rowRange.numRowsLast);
else
scanner = reader.createScanner();
} else {
@@ -977,6 +1128,7 @@ class ColumnGroup {
DataInputStream dis = scanner.entry().getValueStream();
try {
tupleReader.get(dis, val);
+
}
finally {
dis.close();
@@ -1349,16 +1501,13 @@ class ColumnGroup {
long numBytesFirst;
long startRowFirst = -1;
long numRowsFirst = -1;
- long startByteLast = -1;
- long numBytesLast;
- long startRowLast = -1;
+ long numBytesLast = -1;
long numRowsLast = -1;
String[] names;
long[] sizes = null;
- CGRowSplit(String[] names, long[] sizes, FileSystem fs, Configuration conf,
- int length, long startFirst, long bytesFirst,
- long startLast, long bytesLast) throws IOException {
+ CGRowSplit(String[] names, long[] sizes, int length, long startFirst, long bytesFirst,
+ long bytesLast) throws IOException {
this.names = names;
this.sizes = sizes;
this.length = length;
@@ -1368,9 +1517,8 @@ class ColumnGroup {
startByteFirst = startFirst;
numBytesFirst = bytesFirst;
}
- if (startLast != -1 && this.length > 1)
+ if (bytesLast != -1 && this.length > 1)
{
- startByteLast = startLast;
numBytesLast = bytesLast;
}
}
@@ -1392,9 +1540,7 @@ class ColumnGroup {
sb.append("{numBytesFirst = " + numBytesFirst + "}\n");
sb.append("{startRowFirst = " + startRowFirst + "}\n");
sb.append("{numRowsFirst = " + numRowsFirst + "}\n");
- sb.append("{startByteLast = " + startByteLast + "}\n");
sb.append("{numBytesLast = " + numBytesLast + "}\n");
- sb.append("{startRowLast = " + startRowLast + "}\n");
sb.append("{numRowsLast = " + numRowsLast + "}\n");
return sb.toString();
@@ -1417,9 +1563,7 @@ class ColumnGroup {
numBytesFirst = Utils.readVLong(in);
startRowFirst = Utils.readVLong(in);
numRowsFirst = Utils.readVLong(in);
- startByteLast = Utils.readVLong(in);
numBytesLast = Utils.readVLong(in);
- startRowLast = Utils.readVLong(in);
numRowsLast = Utils.readVLong(in);
}
@@ -1435,9 +1579,7 @@ class ColumnGroup {
Utils.writeVLong(out, numBytesFirst);
Utils.writeVLong(out, startRowFirst);
Utils.writeVLong(out, numRowsFirst);
- Utils.writeVLong(out, startByteLast);
Utils.writeVLong(out, numBytesLast);
- Utils.writeVLong(out, startRowLast);
Utils.writeVLong(out, numRowsLast);
}
}
@@ -1876,7 +2018,6 @@ class ColumnGroup {
out.close();
out = null;
// do renaming only if all the above is successful.
-// fs.rename(new Path(path, tmpName), new Path(path, name));
fs.rename(new Path(path, tmpName), new Path(finalOutputPath, name));
/*
Modified: hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/KeyDistribution.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/KeyDistribution.java?rev=925971&r1=925970&r2=925971&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/KeyDistribution.java (original)
+++ hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/KeyDistribution.java Mon Mar 22 06:26:41 2010
@@ -21,6 +21,7 @@ import java.util.Iterator;
import java.util.Map;
import java.util.SortedMap;
import java.util.TreeMap;
+import java.io.IOException;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.zebra.tfile.RawComparable;
@@ -33,50 +34,37 @@ import org.apache.hadoop.zebra.tfile.Byt
*/
public class KeyDistribution {
private long uniqueBytes;
+ private long minStepSize = -1;
private SortedMap<RawComparable, BlockDistribution> data;
KeyDistribution(Comparator<? super RawComparable> comparator) {
data = new TreeMap<RawComparable, BlockDistribution>(comparator);
}
- void add(RawComparable key, BlockDistribution bucket) {
+ void add(RawComparable key) {
+ data.put(key, null);
+ }
+
+ void add(RawComparable key, BlockDistribution bucket)
+ {
uniqueBytes += bucket.getLength();
data.put(key, BlockDistribution.sum(data.get(key), bucket));
}
-
- void add(KeyDistribution other) {
- this.uniqueBytes += other.uniqueBytes;
- reduceKeyDistri(this.data, other.data);
- }
-
- static void reduceKeyDistri(SortedMap<RawComparable, BlockDistribution> lv,
- SortedMap<RawComparable, BlockDistribution> rv) {
- for (Iterator<Map.Entry<RawComparable, BlockDistribution>> it =
- rv.entrySet().iterator(); it.hasNext();) {
- Map.Entry<RawComparable, BlockDistribution> e = it.next();
- RawComparable key = e.getKey();
- BlockDistribution sum = lv.get(key);
- BlockDistribution delta = e.getValue();
- lv.put(key, BlockDistribution.sum(sum, delta));
- }
+
+ void setMinStepSize(long minStepSize)
+ {
+ this.minStepSize = minStepSize;
}
/**
- * Aggregate two key distributions.
+ * Get the total unique bytes contained in the key-partitioned buckets.
*
- * @param a
- * first key distribution (can be null)
- * @param b
- * second key distribution (can be null)
- * @return the aggregated key distribution.
+ * @return The total number of bytes contained in the key-partitioned buckets.
*/
- public static KeyDistribution sum(KeyDistribution a, KeyDistribution b) {
- if (a == null) return b;
- if (b == null) return a;
- a.add(b);
- return a;
+ public long length() {
+ return uniqueBytes;
}
-
+
/**
* Get the size of the key sampling.
*
@@ -87,74 +75,160 @@ public class KeyDistribution {
}
/**
- * Get the total unique bytes contained in the key-partitioned buckets.
+ * Get the minimum split step size from all tables in union
+ */
+ public long getMinStepSize() {
+ return minStepSize;
+ }
+
+ /** Get the list of sampling keys
*
- * @return The total number of bytes contained in the key-partitioned buckets.
+ * @return A list of sampling keys
*/
- public long length() {
- return uniqueBytes;
+ public RawComparable[] getKeys() {
+ RawComparable[] ret = new RawComparable[data.size()];
+ return data.keySet().toArray(ret);
}
-
+
+ public BlockDistribution getBlockDistribution(RawComparable key) {
+ return data.get(key);
+ }
+
/**
- * Resize the key samples
+ * Merge the key samples
*
- * @param n
- * targeted sampling size
- * @return the actual size after the resize().
- */
- public int resize(int n) {
- Iterator<Map.Entry<RawComparable, BlockDistribution>> it =
- data.entrySet().iterator();
- KeyDistribution adjusted = new KeyDistribution(data.comparator());
- for (int i = 0; i < n; ++i) {
- long targetMarker = (i + 1) * uniqueBytes / n;
- if (adjusted.uniqueBytes >= targetMarker) {
- continue;
+ * Algorithm: select the smallest key from all clean source ranges and ranges subsequent to
+ * respective dirty ranges. A dirty range is a range that has been partially needed
+ * by one or more of the previous final ranges.
+ *
+ * @param sourceKeys
+ * key samples to be merged
+ * @return the merged key samples
+ */
+ public static KeyDistribution merge(KeyDistribution[] sourceKeys) throws IOException {
+ if (sourceKeys == null || sourceKeys.length == 0)
+ return null;
+ int srcSize = sourceKeys.length;
+ if (srcSize == 1)
+ return sourceKeys[0];
+
+ Comparator<? super RawComparable> comp = sourceKeys[0].data.comparator();
+ // TODO check the identical comparators used in the source keys
+ /*
+ for (int i = 1; i < srcSize; i++)
+ if (!comp.equals(sourceKeys[i].data.comparator()))
+ throw new IOException("Incompatible sort keys found:" + comp.toString() + " vs. "+ sourceKeys[i].data.comparator().toString());
+ */
+
+ KeyDistribution result = new KeyDistribution(comp);
+
+ result.minStepSize = sourceKeys[0].minStepSize;
+ for (int i = 1; i < srcSize; i++)
+ if (result.minStepSize > sourceKeys[i].minStepSize)
+ result.minStepSize = sourceKeys[i].minStepSize;
+
+ RawComparable[][] its = new RawComparable[srcSize][];
+ for (int i = 0; i < srcSize; i++)
+ its[i] = sourceKeys[i].getKeys();
+ RawComparable min, current;
+ int minIndex = -1;
+ int[] index = new int[srcSize];
+ boolean[] dirty = new boolean[srcSize];
+ while (true)
+ {
+ min = null;
+ BlockDistribution bd = new BlockDistribution();
+ for (int i = 0; i < srcSize; i++)
+ {
+ if (index[i] >= its[i].length)
+ continue;
+ current = its[i][index[i]];
+ bd.add(sourceKeys[i].getBlockDistribution(current));
+ if (min == null || comp.compare(min, current) > 0)
+ {
+ min = current;
+ minIndex = i;
+ }
}
- RawComparable key = null;
- do {
- Map.Entry<RawComparable, BlockDistribution> e = it.next();
- if (key == null) {
- key = e.getKey();
+ if (min == null)
+ break;
+
+ result.add(min, bd);
+ for (int i = 0; i < srcSize; i++)
+ {
+ if (index[i] >= its[i].length)
+ continue;
+ current = its[i][index[i]];
+ if (i != minIndex)
+ {
+ if (comp.compare(min, current) != 0)
+ {
+ if (!dirty[i])
+ {
+ dirty[i] = true;
+ index[i]++;
+ } else if (comp.compare(min, its[i][index[i] - 1]) > 0 )
+ index[i]++;
+ } else {
+ if (dirty[i])
+ dirty[i] = false;
+ index[i]++;
+ }
+ } else {
+ if (dirty[i])
+ dirty[i] = false;
+ index[i]++;
}
- adjusted.add(key, e.getValue());
}
- while (adjusted.uniqueBytes < targetMarker);
}
-
+ return result;
+ }
+
+ public int resize(BlockDistribution lastBd)
+ {
+ Iterator<Map.Entry<RawComparable, BlockDistribution>> it =
+ data.entrySet().iterator();
+ KeyDistribution adjusted = new KeyDistribution(data.comparator());
+ long realSize = 0, mySize = 0;
+ RawComparable key = null;
+ BlockDistribution bd = null, bd0 = null;
+ while (it.hasNext())
+ {
+ Map.Entry<RawComparable, BlockDistribution> mapEntry = it.next();
+ bd0 = mapEntry.getValue();
+ mySize = bd0.getLength();
+ if (realSize >= minStepSize/2 ||
+ (realSize + mySize >= minStepSize*ColumnGroup.SPLIT_SLOP &&
+ realSize >= minStepSize * (ColumnGroup.SPLIT_SLOP-1)))
+ {
+ adjusted.add(key, bd);
+ bd = null;
+ realSize = 0;
+ }
+ key = mapEntry.getKey();
+ realSize += mySize;
+ bd = BlockDistribution.sum(bd, bd0);
+ }
+ if (bd != null)
+ {
+ realSize += lastBd.getLength();
+ if (realSize >= minStepSize/2 || adjusted.size() == 0)
+ {
+ // the last plus would contain more than liked, don't merge them.
+ adjusted.add(key, bd);
+ } else
+ BlockDistribution.sum(lastBd, bd);
+ }
swap(adjusted);
return data.size();
}
- void swap(KeyDistribution other) {
- long tmp = uniqueBytes;
- uniqueBytes = other.uniqueBytes;
- other.uniqueBytes = tmp;
+ private void swap(KeyDistribution other) {
+ long tmp = minStepSize;
+ minStepSize = other.minStepSize;
+ other.minStepSize = tmp;
SortedMap<RawComparable, BlockDistribution> tmp2 = data;
data = other.data;
other.data = tmp2;
}
-
- /**
- * Get the list of sampling keys.
- *
- * @return A list of sampling keys.
- */
- public RawComparable[] getKeys() {
- RawComparable[] ret = new RawComparable[data.size()];
- return data.keySet().toArray(ret);
-
- }
-
- /**
- * Get the block distribution of all data that maps to the key bucket.
- */
- public BlockDistribution getBlockDistribution(BytesWritable key) {
- ByteArray key0 = new ByteArray(key.getBytes(), 0, key.getLength());
- BlockDistribution bInfo = data.get(key0);
- if (bInfo == null) {
- throw new IllegalArgumentException("Invalid key");
- }
- return bInfo;
- }
}
Modified: hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableInputFormat.java?rev=925971&r1=925970&r2=925971&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableInputFormat.java (original)
+++ hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableInputFormat.java Mon Mar 22 06:26:41 2010
@@ -382,7 +382,7 @@ public class TableInputFormat implements
{
throw new IOException("The table is not properly sorted");
}
- } else {
+ } else {
List<LeafTableInfo> leaves = expr.getLeafTables(null);
for (Iterator<LeafTableInfo> it = leaves.iterator(); it.hasNext(); )
{
@@ -406,8 +406,8 @@ public class TableInputFormat implements
}
}
}
- // need key range input splits for sorted table union
- setSorted(conf);
+ // need key range input splits for sorted table union
+ setSorted(conf);
}
/**
@@ -509,7 +509,7 @@ public class TableInputFormat implements
BlockDistribution bd = null;
for (Iterator<BasicTable.Reader> it = readers.iterator(); it.hasNext();) {
BasicTable.Reader reader = it.next();
- bd = BlockDistribution.sum(bd, reader.getBlockDistribution((RangeSplit)null));
+ bd = BlockDistribution.sum(bd, reader.getBlockDistribution((RangeSplit) null));
}
SortedTableSplit split = new SortedTableSplit(null, null, bd, conf);
@@ -519,44 +519,49 @@ public class TableInputFormat implements
// TODO: Does it make sense to interleave keys for all leaf tables if
// numSplits <= 0 ?
int nLeaves = readers.size();
- KeyDistribution keyDistri = null;
+ BlockDistribution lastBd = new BlockDistribution();
+ ArrayList<KeyDistribution> btKeyDistributions = new ArrayList<KeyDistribution>();
for (int i = 0; i < nLeaves; ++i) {
KeyDistribution btKeyDistri =
readers.get(i).getKeyDistribution(
(numSplits <= 0) ? -1 :
- Math.max(numSplits * 5 / nLeaves, numSplits));
- keyDistri = KeyDistribution.sum(keyDistri, btKeyDistri);
+ Math.max(numSplits * 5 / nLeaves, numSplits), nLeaves, lastBd);
+ btKeyDistributions.add(btKeyDistri);
}
+ int btSize = btKeyDistributions.size();
+ KeyDistribution[] btKds = new KeyDistribution[btSize];
+ Object[] btArray = btKeyDistributions.toArray();
+ for (int i = 0; i < btSize; i++)
+ btKds[i] = (KeyDistribution) btArray[i];
+ KeyDistribution keyDistri = KeyDistribution.merge(btKds);
+
if (keyDistri == null) {
// should never happen.
SortedTableSplit split = new SortedTableSplit(null, null, null, conf);
return new InputSplit[] { split };
}
- if (numSplits > 0) {
- keyDistri.resize(numSplits);
- }
-
- RawComparable[] rawKeys = keyDistri.getKeys();
- BytesWritable[] keys = new BytesWritable[rawKeys.length];
- for (int i=0; i<keys.length; ++i) {
- RawComparable rawKey = rawKeys[i];
- keys[i] = new BytesWritable();
- keys[i].setSize(rawKey.size());
- System.arraycopy(rawKey.buffer(), rawKey.offset(), keys[i].get(), 0,
- rawKey.size());
- }
+ keyDistri.resize(lastBd);
- // TODO: Should we change to RawComparable to avoid the creation of
- // BytesWritables?
- for (int i = 0; i < keys.length; ++i) {
- BytesWritable begin = (i == 0) ? null : keys[i - 1];
- BytesWritable end = (i == keys.length - 1) ? null : keys[i];
- BlockDistribution bd = keyDistri.getBlockDistribution(keys[i]);
- SortedTableSplit split = new SortedTableSplit(begin, end, bd, conf);
+ RawComparable[] keys = keyDistri.getKeys();
+ for (int i = 0; i <= keys.length; ++i) {
+ RawComparable begin = (i == 0) ? null : keys[i - 1];
+ RawComparable end = (i == keys.length) ? null : keys[i];
+ BlockDistribution bd;
+ if (i < keys.length)
+ bd = keyDistri.getBlockDistribution(keys[i]);
+ else
+ bd = lastBd;
+ BytesWritable beginB = null, endB = null;
+ if (begin != null)
+ beginB = new BytesWritable(begin.buffer());
+ if (end != null)
+ endB = new BytesWritable(end.buffer());
+ SortedTableSplit split = new SortedTableSplit(beginB, endB, bd, conf);
splits.add(split);
}
+
return splits.toArray(new InputSplit[splits.size()]);
}
Modified: hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableRecordReader.java?rev=925971&r1=925970&r2=925971&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableRecordReader.java (original)
+++ hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableRecordReader.java Mon Mar 22 06:26:41 2010
@@ -53,16 +53,17 @@ public class TableRecordReader implement
public TableRecordReader(TableExpr expr, String projection,
InputSplit split,
JobConf conf) throws IOException, ParseException {
- if (expr.sortedSplitRequired()) {
+ if (split != null && split instanceof RowTableSplit) {
+ RowTableSplit rowSplit = (RowTableSplit) split;
+ if (!expr.sortedSplitRequired() && Projection.getVirtualColumnIndices(projection) != null)
+ throw new IllegalArgumentException("virtual column requires union of multiple sorted tables");
+ scanner = expr.getScanner(rowSplit, projection, conf);
+ } else if (expr.sortedSplitRequired()) {
SortedTableSplit tblSplit = (SortedTableSplit) split;
scanner =
expr.getScanner(tblSplit.getBegin(), tblSplit.getEnd(), projection,
conf);
- } else if (split != null && split instanceof RowTableSplit) {
- RowTableSplit rowSplit = (RowTableSplit) split;
- scanner = expr.getScanner(rowSplit, projection, conf);
- }
- else {
+ } else {
UnsortedTableSplit tblSplit = (UnsortedTableSplit) split;
scanner = expr.getScanner(tblSplit, projection, conf);
}
Modified: hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableInputFormat.java?rev=925971&r1=925970&r2=925971&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableInputFormat.java (original)
+++ hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableInputFormat.java Mon Mar 22 06:26:41 2010
@@ -546,7 +546,7 @@ public class TableInputFormat extends In
BlockDistribution bd = null;
for (Iterator<BasicTable.Reader> it = readers.iterator(); it.hasNext();) {
BasicTable.Reader reader = it.next();
- bd = BlockDistribution.sum(bd, reader.getBlockDistribution((RangeSplit)null));
+ bd = BlockDistribution.sum(bd, reader.getBlockDistribution((RangeSplit) null));
}
SortedTableSplit split = new SortedTableSplit(null, null, bd, conf);
@@ -557,41 +557,46 @@ public class TableInputFormat extends In
// TODO: Does it make sense to interleave keys for all leaf tables if
// numSplits <= 0 ?
int nLeaves = readers.size();
- KeyDistribution keyDistri = null;
+ BlockDistribution lastBd = new BlockDistribution();
+ ArrayList<KeyDistribution> btKeyDistributions = new ArrayList<KeyDistribution>();
for (int i = 0; i < nLeaves; ++i) {
KeyDistribution btKeyDistri =
readers.get(i).getKeyDistribution(
(numSplits <= 0) ? -1 :
- Math.max(numSplits * 5 / nLeaves, numSplits));
- keyDistri = KeyDistribution.sum(keyDistri, btKeyDistri);
+ Math.max(numSplits * 5 / nLeaves, numSplits), nLeaves, lastBd);
+ btKeyDistributions.add(btKeyDistri);
}
+ int btSize = btKeyDistributions.size();
+ KeyDistribution[] btKds = new KeyDistribution[btSize];
+ Object[] btArray = btKeyDistributions.toArray();
+ for (int i = 0; i < btSize; i++)
+ btKds[i] = (KeyDistribution) btArray[i];
+
+ KeyDistribution keyDistri = KeyDistribution.merge(btKds);
+
if (keyDistri == null) {
// should never happen.
return splits;
}
-
- if (numSplits > 0) {
- keyDistri.resize(numSplits);
- }
-
- RawComparable[] rawKeys = keyDistri.getKeys();
- BytesWritable[] keys = new BytesWritable[rawKeys.length];
- for (int i=0; i<keys.length; ++i) {
- RawComparable rawKey = rawKeys[i];
- keys[i] = new BytesWritable();
- keys[i].setSize(rawKey.size());
- System.arraycopy(rawKey.buffer(), rawKey.offset(), keys[i].getBytes(), 0,
- rawKey.size());
- }
-
- // TODO: Should we change to RawComparable to avoid the creation of
- // BytesWritables?
- for (int i = 0; i < keys.length; ++i) {
- BytesWritable begin = (i == 0) ? null : keys[i - 1];
- BytesWritable end = (i == keys.length - 1) ? null : keys[i];
- BlockDistribution bd = keyDistri.getBlockDistribution(keys[i]);
- SortedTableSplit split = new SortedTableSplit(begin, end, bd, conf);
+
+ keyDistri.resize(lastBd);
+
+ RawComparable[] keys = keyDistri.getKeys();
+ for (int i = 0; i <= keys.length; ++i) {
+ RawComparable begin = (i == 0) ? null : keys[i - 1];
+ RawComparable end = (i == keys.length) ? null : keys[i];
+ BlockDistribution bd;
+ if (i < keys.length)
+ bd = keyDistri.getBlockDistribution(keys[i]);
+ else
+ bd = lastBd;
+ BytesWritable beginB = null, endB = null;
+ if (begin != null)
+ beginB = new BytesWritable(begin.buffer());
+ if (end != null)
+ endB = new BytesWritable(end.buffer());
+ SortedTableSplit split = new SortedTableSplit(beginB, endB, bd, conf);
splits.add(split);
}
return splits;
Modified: hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/tfile/TFile.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/tfile/TFile.java?rev=925971&r1=925970&r2=925971&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/tfile/TFile.java (original)
+++ hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/tfile/TFile.java Mon Mar 22 06:26:41 2010
@@ -1059,6 +1059,25 @@ public class TFile {
return new ByteArray(tfileIndex.getEntry(blockIndex).key);
}
+ public long getOffsetForKey(RawComparable key) throws IOException {
+ Location l = getBlockContainsKey(key, false);
+ int blockIndex = l.getBlockIndex();
+ if (blockIndex == end.blockIndex)
+ {
+ if (blockIndex > 0)
+ {
+ BCFile.Reader.BlockReader blkReader = readerBCF.getDataBlock(blockIndex - 1);
+ return blkReader.getStartPos() + blkReader.getCompressedSize();
+ } else
+ return 0;
+ } else
+ return readerBCF.getDataBlock(blockIndex).getStartPos();
+ }
+
+ public long getLastDataOffset() throws IOException {
+ BCFile.Reader.BlockReader blkReader = readerBCF.getDataBlock(end.blockIndex - 1);
+ return blkReader.getStartPos() + blkReader.getCompressedSize();
+ }
/**
* Get a scanner than can scan the whole TFile.
*
Modified: hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestBasicTable.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestBasicTable.java?rev=925971&r1=925970&r2=925971&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestBasicTable.java (original)
+++ hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestBasicTable.java Mon Mar 22 06:26:41 2010
@@ -170,13 +170,14 @@ public class TestBasicTable {
BasicTable.Reader reader = new BasicTable.Reader(path, conf);
reader.setProjection(strProjection);
long totalBytes = reader.getStatus().getSize();
- KeyDistribution keyDistri = reader.getKeyDistribution(numSplits * 10);
- Assert.assertEquals(totalBytes, keyDistri.length());
+ BlockDistribution lastBd = new BlockDistribution();
+ KeyDistribution keyDistri = reader.getKeyDistribution(numSplits * 10, 1, lastBd);
+ Assert.assertEquals(totalBytes, keyDistri.length()+lastBd.getLength());
reader.close();
BytesWritable[] keys = null;
if (keyDistri.size() >= numSplits) {
- keyDistri.resize(numSplits);
- Assert.assertEquals(totalBytes, keyDistri.length());
+ keyDistri.resize(lastBd);
+ Assert.assertEquals(totalBytes, keyDistri.length()+lastBd.getLength());
RawComparable[] rawComparables = keyDistri.getKeys();
keys = new BytesWritable[rawComparables.length];
for (int i = 0; i < keys.length; ++i) {
Modified: hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroup.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroup.java?rev=925971&r1=925970&r2=925971&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroup.java (original)
+++ hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroup.java Mon Mar 22 06:26:41 2010
@@ -285,13 +285,14 @@ public class TestColumnGroup {
ColumnGroup.Reader reader = new ColumnGroup.Reader(path, conf);
reader.setProjection(strProjection);
long totalBytes = reader.getStatus().getSize();
- KeyDistribution keyDistri = reader.getKeyDistribution(numSplits * 10);
- Assert.assertEquals(totalBytes, keyDistri.length());
+ BlockDistribution lastBd = new BlockDistribution();
+ KeyDistribution keyDistri = reader.getKeyDistribution(numSplits * 10, 1, lastBd);
+ Assert.assertEquals(totalBytes, keyDistri.length()+lastBd.getLength());
reader.close();
BytesWritable[] keys = null;
if (keyDistri.size() >= numSplits) {
- keyDistri.resize(numSplits);
- Assert.assertEquals(totalBytes, keyDistri.length());
+ keyDistri.resize(lastBd);
+ Assert.assertEquals(totalBytes, keyDistri.length()+lastBd.getLength());
RawComparable[] rawComparables = keyDistri.getKeys();
keys = new BytesWritable[rawComparables.length];
for (int i = 0; i < keys.length; ++i) {
Modified: hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupWithWorkPath.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupWithWorkPath.java?rev=925971&r1=925970&r2=925971&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupWithWorkPath.java (original)
+++ hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupWithWorkPath.java Mon Mar 22 06:26:41 2010
@@ -286,13 +286,14 @@ public class TestColumnGroupWithWorkPath
ColumnGroup.Reader reader = new ColumnGroup.Reader(path, conf);
reader.setProjection(strProjection);
long totalBytes = reader.getStatus().getSize();
- KeyDistribution keyDistri = reader.getKeyDistribution(numSplits * 10);
- Assert.assertEquals(totalBytes, keyDistri.length());
+ BlockDistribution lastBd = new BlockDistribution();
+ KeyDistribution keyDistri = reader.getKeyDistribution(numSplits * 10, 1, lastBd);
+ Assert.assertEquals(totalBytes, keyDistri.length()+lastBd.getLength());
reader.close();
BytesWritable[] keys = null;
if (keyDistri.size() >= numSplits) {
- keyDistri.resize(numSplits);
- Assert.assertEquals(totalBytes, keyDistri.length());
+ keyDistri.resize(lastBd);
+ Assert.assertEquals(totalBytes, keyDistri.length()+lastBd.getLength());
RawComparable[] rawComparables = keyDistri.getKeys();
keys = new BytesWritable[rawComparables.length];
for (int i = 0; i < keys.length; ++i) {