You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ya...@apache.org on 2010/03/09 00:20:34 UTC
svn commit: r920562 - in /hadoop/pig/trunk/contrib/zebra: ./
src/java/org/apache/hadoop/zebra/io/ src/java/org/apache/hadoop/zebra/mapred/
src/java/org/apache/hadoop/zebra/mapreduce/
src/test/org/apache/hadoop/zebra/io/
Author: yanz
Date: Mon Mar 8 23:20:34 2010
New Revision: 920562
URL: http://svn.apache.org/viewvc?rev=920562&view=rev
Log:
PIG-1198: performance improvements through use of unsorted input splits that span multiple files (yanz)
Modified:
hadoop/pig/trunk/contrib/zebra/CHANGES.txt
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/BasicTable.java
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/ColumnGroup.java
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableInputFormat.java
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableInputFormat.java
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestDropColumnGroup.java
Modified: hadoop/pig/trunk/contrib/zebra/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/CHANGES.txt?rev=920562&r1=920561&r2=920562&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/CHANGES.txt (original)
+++ hadoop/pig/trunk/contrib/zebra/CHANGES.txt Mon Mar 8 23:20:34 2010
@@ -58,6 +58,8 @@
OPTIMIZATIONS
+ PIG-1198: performance improvements through use of unsorted input splits that span multiple files (yanz)
+
BUG FIXES
PIG-1276: Pig resource schema interface changed, so Zebra needs to catch exception thrown from the new interfaces. (xuefuz via yanz)
Modified: hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/BasicTable.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/BasicTable.java?rev=920562&r1=920561&r2=920562&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/BasicTable.java (original)
+++ hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/BasicTable.java Mon Mar 8 23:20:34 2010
@@ -44,6 +44,7 @@
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.zebra.tfile.RawComparable;
import org.apache.hadoop.zebra.tfile.TFile;
import org.apache.hadoop.zebra.tfile.Utils;
import org.apache.hadoop.zebra.tfile.MetaBlockAlreadyExists;
@@ -231,6 +232,7 @@
private MetaFile.Reader metaReader;
private BasicTableStatus status;
private int firstValidCG = -1; /// First column group that exists.
+ private int rowSplitCGIndex = -1;
Partition partition;
ColumnGroup.Reader[] colGroups;
Tuple[] cgTuples;
@@ -257,6 +259,7 @@
}
partition.setSource(cgTuples);
inferredMapping = true;
+ buildStatus();
}
else {
// the projection is not changed, so we do not need to recalculate the
@@ -434,9 +437,12 @@
public BlockDistribution getBlockDistribution(RangeSplit split)
throws IOException {
BlockDistribution bd = new BlockDistribution();
- for (int nx = 0; nx < colGroups.length; nx++) {
- if (!isCGDeleted(nx)) {
- bd.add(colGroups[nx].getBlockDistribution(split == null ? null : split.getCGRangeSplit()));
+ if (firstValidCG >= 0)
+ {
+ for (int nx = 0; nx < colGroups.length; nx++) {
+ if (partition.isCGNeeded(nx) && !isCGDeleted(nx)) {
+ bd.add(colGroups[nx].getBlockDistribution(split == null ? null : split.getCGRangeSplit()));
+ }
}
}
return bd;
@@ -644,10 +650,9 @@
* construct a TableScanner later.
*
*/
- public List<RowSplit> rowSplit(long[] starts, long[] lengths, Path[] paths, int splitCGIndex) throws IOException {
- List<RowSplit> ret;
-
- List<CGRowSplit> cgSplits = colGroups[splitCGIndex].rowSplit(starts, lengths, paths);
+ public List<RowSplit> rowSplit(long[] starts, long[] lengths, Path[] paths, int splitCGIndex, int[] batchSizes, int numBatches) throws IOException {
+ List<RowSplit> ret;
+ List<CGRowSplit> cgSplits = colGroups[splitCGIndex].rowSplit(starts, lengths, paths, batchSizes, numBatches);
int numSlices = cgSplits.size();
ret = new ArrayList<RowSplit>(numSlices);
for (int slice = 0; slice < numSlices; slice++) {
@@ -658,6 +663,15 @@
return ret;
}
+ /**
+ * Rearrange the files according to the column group index ordering
+ *
+ * @param filestatus array of FileStatus to be rearraged on
+ */
+ public void rearrangeFileIndices(FileStatus[] fileStatus) throws IOException
+ {
+ colGroups[getRowSplitCGIndex()].rearrangeFileIndices(fileStatus);
+ }
/**
* Get index of the column group that will be used for row-based split.
@@ -665,32 +679,33 @@
*/
public int getRowSplitCGIndex() throws IOException {
// Try to find the largest non-deleted and used column group by projection;
- int largestCGIndex = -1;
- int splitCGIndex = -1;
- long largestCGSize = -1;
- for (int i=0; i<colGroups.length; i++) {
- if (!partition.isCGNeeded(i) || isCGDeleted(i)) {
- continue;
- }
- ColumnGroup.Reader reader = colGroups[i];
- BasicTableStatus btStatus = reader.getStatus();
- long size = btStatus.getSize();
- if (size > largestCGSize) {
- largestCGIndex = i;
- largestCGSize = size;
- }
- }
-
- /* We do have a largest non-deleted and used column group,
- and we use it to do split. */
- if (largestCGIndex >= 0) {
- splitCGIndex = largestCGIndex;
- } else if (firstValidCG >= 0) { /* If all projection columns are either deleted or non-existing,
- then we use the first non-deleted column group to do split if it exists. */
- splitCGIndex = firstValidCG;
+ if (rowSplitCGIndex == -1)
+ {
+ int largestCGIndex = -1;
+ long largestCGSize = -1;
+ for (int i=0; i<colGroups.length; i++) {
+ if (!partition.isCGNeeded(i) || isCGDeleted(i)) {
+ continue;
+ }
+ ColumnGroup.Reader reader = colGroups[i];
+ BasicTableStatus btStatus = reader.getStatus();
+ long size = btStatus.getSize();
+ if (size > largestCGSize) {
+ largestCGIndex = i;
+ largestCGSize = size;
+ }
+ }
+
+ /* We do have a largest non-deleted and used column group,
+ and we use it to do split. */
+ if (largestCGIndex >= 0) {
+ rowSplitCGIndex = largestCGIndex;
+ } else if (firstValidCG >= 0) { /* If all projection columns are either deleted or non-existing,
+ then we use the first non-deleted column group to do split if it exists. */
+ rowSplitCGIndex = firstValidCG;
+ }
}
-
- return splitCGIndex;
+ return rowSplitCGIndex;
}
@@ -916,7 +931,7 @@
Partition partition) throws IOException {
init(rowSplit, null, null, null, closeReader, partition);
}
-
+
/**
* Creates new CGRowSplit. If the startRow in rowSplit is not set
* (i.e. < 0), it sets the startRow and numRows based on 'startByte'
@@ -928,17 +943,6 @@
int cgIdx = rowSplit.getCGIndex();
CGRowSplit cgSplit = new CGRowSplit();
- cgSplit.name = inputCGSplit.name;
- // startByte and numBytes from inputCGSplit are ignored, since
- // they make sense for only one CG.
- cgSplit.startRow = inputCGSplit.startRow;
- cgSplit.numRows = inputCGSplit.numRows;
- cgSplit.size = inputCGSplit.size;
-
- if (cgSplit.startRow >= 0) {
- //assume the rows are already set up.
- return cgSplit;
- }
// Find the row range :
if (isCGDeleted(cgIdx)) {
@@ -946,8 +950,7 @@
}
//fill the row numbers.
- colGroups[cgIdx].fillRowSplit(cgSplit, inputCGSplit.startByte,
- inputCGSplit.numBytes);
+ colGroups[cgIdx].fillRowSplit(cgSplit, inputCGSplit);
return cgSplit;
}
@@ -985,7 +988,7 @@
if (rowSplit != null) {
cgRowSplit = makeCGRowSplit(rowSplit);
}
-
+
try {
schema = partition.getProjection();
cgScanners = new CGScanner[colGroups.length];
Modified: hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/ColumnGroup.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/ColumnGroup.java?rev=920562&r1=920561&r2=920562&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/ColumnGroup.java (original)
+++ hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/ColumnGroup.java Mon Mar 8 23:20:34 2010
@@ -522,11 +522,21 @@
}
BlockDistribution ret = new BlockDistribution();
- if (split.name != null)
+ for (int i = 0; i < split.length; i++)
{
- FileStatus tfileStatus = fs.getFileStatus(new Path(path, split.name));
+ FileStatus tfileStatus = fs.getFileStatus(new Path(path, split.names[i]));
- BlockLocation[] locations = fs.getFileBlockLocations(tfileStatus, split.startByte, split.numBytes);
+ BlockLocation[] locations = null;
+ if (i == 0) {
+ if (split.startByteFirst != -1)
+ locations = fs.getFileBlockLocations(tfileStatus, split.startByteFirst, split.numBytesFirst);
+ } else if (i == split.length - 1) {
+ if (split.startByteLast != -1)
+ locations = fs.getFileBlockLocations(tfileStatus, split.startByteLast, split.numBytesLast);
+ }
+ if (locations == null)
+ locations = fs.getFileBlockLocations(tfileStatus, 0, tfileStatus.getLen());
+
for (BlockLocation l : locations) {
ret.add(l);
}
@@ -541,44 +551,86 @@
* It is assumed that 'startByte' and 'numBytes' in rowSplit itself
* are not valid.
*/
- void fillRowSplit(CGRowSplit rowSplit, long startOffset, long length)
- throws IOException {
+ void fillRowSplit(CGRowSplit rowSplit, CGRowSplit src)throws IOException {
- if (rowSplit.name == null)
+ if (src.names == null || src.length == 0)
return;
- Path tfPath = new Path(path, rowSplit.name);
-
- long size = rowSplit.size;
- if (size == 0)
+ boolean noSizeInIndex = false;
+ long[] sizes = rowSplit.sizes;
+ if (sizes == null)
{
/* the on disk table is sorted. Later this will be made unnecessary when
* CGIndexEntry serializes its bytes field and the meta file versioning is
* supported.
*/
- FileStatus tfile = fs.getFileStatus(tfPath);
- size = tfile.getLen();
+ noSizeInIndex = true;
}
+ rowSplit.names = src.names;
+ rowSplit.length = src.length;
+ rowSplit.startByteFirst = src.startByteFirst;
+ rowSplit.numBytesFirst = src.numBytesFirst;
+ rowSplit.startByteLast = src.startByteLast;
+ rowSplit.numBytesLast = src.numBytesLast;
+
+ Path firstPath = null, lastPath;
TFile.Reader reader = null;
- try {
- reader = new TFile.Reader(fs.open(tfPath),
- size, conf);
-
- long startRow = reader.getRecordNumNear(startOffset);
- long endRow = reader.getRecordNumNear(startOffset + length);
+ if (src.startByteFirst != -1)
+ {
+ firstPath = new Path(path, rowSplit.names[0]);
+ long size;
+ if (noSizeInIndex)
+ {
+ FileStatus tfile = fs.getFileStatus(firstPath);
+ size = tfile.getLen();
+ } else
+ size = sizes[0];
+ reader = new TFile.Reader(fs.open(firstPath), size, conf);
+ try {
+ long startRow = reader.getRecordNumNear(src.startByteFirst);
+ long endRow = reader.getRecordNumNear(src.startByteFirst + src.numBytesFirst);
- if (endRow < startRow) {
- endRow = startRow;
+ if (endRow < startRow)
+ endRow = startRow;
+ rowSplit.startRowFirst = startRow;
+ rowSplit.numRowsFirst = endRow - startRow;
+ } catch (IOException e) {
+ reader.close();
+ throw e;
}
+ }
+ if (src.startByteLast != -1 && rowSplit.length > 1)
+ {
+ lastPath = new Path(path, rowSplit.names[rowSplit.length - 1]);
+ if (reader == null || !firstPath.equals(lastPath))
+ {
+ if (reader != null)
+ reader.close();
+ long size;
+ if (noSizeInIndex)
+ {
+ FileStatus tfile = fs.getFileStatus(lastPath);
+ size = tfile.getLen();
+ } else
+ size = sizes[rowSplit.length - 1];
+ reader = new TFile.Reader(fs.open(lastPath), size, conf);
+ }
+ try {
+ long startRow = reader.getRecordNumNear(src.startByteLast);
+ long endRow = reader.getRecordNumNear(src.startByteLast + src.numBytesLast);
- rowSplit.startRow = startRow;
- rowSplit.numRows = endRow - startRow;
- } finally {
- if (reader != null) {
+ if (endRow < startRow)
+ endRow = startRow;
+ rowSplit.startRowLast = startRow;
+ rowSplit.numRowsLast = endRow - startRow;
+ } catch (IOException e) {
reader.close();
+ throw e;
}
}
+ if (reader != null)
+ reader.close();
}
/**
@@ -770,22 +822,55 @@
* @return A list of CGRowSplit objects.
*
*/
- public List<CGRowSplit> rowSplit(long[] starts, long[] lengths, Path[] paths) throws IOException {
+ public List<CGRowSplit> rowSplit(long[] starts, long[] lengths, Path[] paths, int[] batches, int numBatches) throws IOException {
List<CGRowSplit> lst = new ArrayList<CGRowSplit>();
+ CGRowSplit cgRowSplit;
+ long startFirst, bytesFirst, startLast, bytesLast;
+ int length;
- for (int i=0; i<starts.length; i++) {
- long start = starts[i];
- long length = lengths[i];
- Path path = paths[i];
- if (cgindex == null)
- cgindex = buildIndex(fs, this.path, dirty, conf);
- long size = cgindex.get(cgindex.getFileIndex(path)).bytes;
- lst.add(new CGRowSplit(path.getName(), start, length, size));
+ if (numBatches > 0 && cgindex == null)
+ cgindex = buildIndex(fs, this.path, dirty, conf);
+
+ for (int i=0; i< numBatches; i++) {
+ int indexFirst = batches[i];
+ int indexLast = batches[i+1] - 1;
+ startFirst = starts[indexFirst];
+ bytesFirst = lengths[indexFirst];
+ startLast = starts[indexLast];
+ bytesLast = lengths[indexLast];
+ length = batches[i+1] - batches[i];
+ String[] namesInSplit = new String[length];
+ long[] sizesInSplit = new long[length];
+ for (int j = 0; j < length; j++)
+ {
+ namesInSplit[j] = paths[indexFirst+j].getName();
+ sizesInSplit[j] = cgindex.get(cgindex.getFileIndex(paths[indexFirst+j])).bytes;
+ }
+ cgRowSplit = new CGRowSplit(namesInSplit, sizesInSplit, fs, conf, length,
+ startFirst, bytesFirst, startLast, bytesLast);
+ lst.add(cgRowSplit);
}
return lst;
- }
+ }
+ void rearrangeFileIndices(FileStatus[] fileStatus) throws IOException {
+ int size = fileStatus.length;
+ FileStatus[] result = new FileStatus[size];
+ if (cgindex == null)
+ cgindex = buildIndex(fs, path, dirty, conf);
+ if (size < cgindex.size())
+ throw new AssertionError("Incorrect file list size");
+ for (int j, i = 0; i < size; i++)
+ {
+ j = cgindex.getFileIndex(fileStatus[i].getPath());
+ if (j != -1)
+ result[j] = fileStatus[i];
+ }
+ for (int i = 0; i < size; i++)
+ fileStatus[i] = result[i];
+ }
+
/**
* Is the ColumnGroup sorted?
*
@@ -814,7 +899,7 @@
TupleReader tupleReader;
TFileScanner(FileSystem fs, Path path, CGRowSplit rowRange,
- RawComparable begin, RawComparable end,
+ RawComparable begin, RawComparable end, boolean first, boolean last,
CGSchema cgschema, Projection projection,
Configuration conf) throws IOException, ParseException {
try {
@@ -823,9 +908,15 @@
* compressor is inside cgschema
*/
reader = new TFile.Reader(ins, fs.getFileStatus(path).getLen(), conf);
- if (rowRange != null) {
- scanner = reader.createScannerByRecordNum(rowRange.startRow,
- rowRange.startRow + rowRange.numRows);
+ if (rowRange != null && rowRange.startByteFirst != -1) {
+ if (first && rowRange.startByteFirst != -1)
+ scanner = reader.createScannerByRecordNum(rowRange.startRowFirst,
+ rowRange.startRowFirst + rowRange.numRowsFirst);
+ else if (last && rowRange.startByteLast != -1)
+ scanner = reader.createScannerByRecordNum(rowRange.startRowLast,
+ rowRange.startRowLast + rowRange.numRowsLast);
+ else
+ scanner = reader.createScanner();
} else {
/* TODO: more investigation is needed for the following.
* using deprecated API just so that zebra can work with
@@ -941,11 +1032,36 @@
*/
class CGScanner implements TableScanner {
private Projection logicalSchema = null;
- private TFileScanner[] scanners;
+ private TFileScannerInfo[] scanners;
private boolean closeReader;
private int beginIndex, endIndex;
private int current; // current scanner
private boolean scannerClosed = true;
+ private CGRowSplit rowRange;
+ private TFileScanner scanner;
+
+ private class TFileScannerInfo {
+ boolean first, last;
+ Path path;
+ RawComparable begin, end;
+ TFileScannerInfo(boolean first, boolean last, Path path, RawComparable begin, RawComparable end) {
+ this.first = first;
+ this.last = last;
+ this.begin = begin;
+ this.end = end;
+ this.path = path;
+ }
+
+ TFileScanner getTFileScanner() throws IOException {
+ try {
+ return new TFileScanner(fs, path, rowRange,
+ begin, end, first, last, cgschema, logicalSchema, conf);
+ } catch (ParseException e) {
+ throw new IOException(e.getMessage());
+ }
+ }
+ }
+
CGScanner(CGRangeSplit split, boolean closeReader) throws IOException,
ParseException {
@@ -970,9 +1086,8 @@
*/
CGScanner(CGRowSplit rowRange, boolean closeReader)
throws IOException, ParseException {
-
beginIndex = 0;
- endIndex = 1;
+ endIndex = rowRange.length;
init(rowRange, null, null, closeReader);
}
@@ -995,49 +1110,52 @@
private void init(CGRowSplit rowRange, RawComparable beginKey,
RawComparable endKey, boolean doClose)
throws IOException, ParseException {
+ this.rowRange = rowRange;
if (beginIndex > endIndex) {
throw new IllegalArgumentException("beginIndex > endIndex");
}
logicalSchema = ColumnGroup.Reader.this.getProjection();
- List<TFileScanner> tmpScanners =
- new ArrayList<TFileScanner>(endIndex - beginIndex);
+ List<TFileScannerInfo> tmpScanners =
+ new ArrayList<TFileScannerInfo>(endIndex - beginIndex);
try {
+ boolean first, last, realFirst = true;
+ Path myPath;
for (int i = beginIndex; i < endIndex; ++i) {
- RawComparable begin = (i == beginIndex) ? beginKey : null;
- RawComparable end = (i == endIndex - 1) ? endKey : null;
- TFileScanner scanner;
- if (rowRange != null)
- scanner =
- new TFileScanner(fs, new Path(path, rowRange.name), rowRange,
- begin, end,
- cgschema, logicalSchema, conf);
+ first = (i == beginIndex);
+ last = (i == endIndex -1);
+ RawComparable begin = first ? beginKey : null;
+ RawComparable end = last ? endKey : null;
+ TFileScannerInfo scanner;
+ if (rowRange == null)
+ myPath = cgindex.getPath(i, path);
else
- scanner =
- new TFileScanner(fs, cgindex.getPath(i, path), null,
- begin, end,
- cgschema, logicalSchema, conf);
- // skip empty scanners.
- if (!scanner.atEnd()) {
- tmpScanners.add(scanner);
- }
- else {
- scanner.close();
+ myPath = new Path(path, rowRange.names[i]);
+ scanner =
+ new TFileScannerInfo(first, last, myPath, begin, end);
+ if (realFirst) {
+ this.scanner = scanner.getTFileScanner();
+ if (this.scanner.atEnd()) {
+ this.scanner.close();
+ this.scanner = null;
+ } else {
+ realFirst = false;
+ tmpScanners.add(scanner);
+ }
+ } else {
+ TFileScanner myScanner = scanner.getTFileScanner();
+ if (!myScanner.atEnd())
+ tmpScanners.add(scanner);
+ myScanner.close();
}
}
- scanners = tmpScanners.toArray(new TFileScanner[tmpScanners.size()]);
+ scanners = tmpScanners.toArray(new TFileScannerInfo[tmpScanners.size()]);
this.closeReader = doClose;
scannerClosed = false;
}
finally {
if (scannerClosed) { // failed to initialize the object.
- for (int i = 0; i < tmpScanners.size(); ++i) {
- try {
- tmpScanners.get(i).close();
- }
- catch (Exception e) {
- // no op
- }
- }
+ if (scanner != null)
+ scanner.close();
}
}
}
@@ -1047,7 +1165,7 @@
if (atEnd()) {
throw new EOFException("No more key-value to read");
}
- scanners[current].getKey(key);
+ scanner.getKey(key);
}
@Override
@@ -1056,19 +1174,19 @@
throw new EOFException("No more key-value to read");
}
try {
- scanners[current].getValue(row);
+ scanner.getValue(row);
} catch (ParseException e) {
throw new IOException("Invalid Projection: "+e.getMessage());
}
}
public void getCGKey(BytesWritable key) throws IOException {
- scanners[current].getKey(key);
+ scanner.getKey(key);
}
public void getCGValue(Tuple row) throws IOException {
try {
- scanners[current].getValue(row);
+ scanner.getValue(row);
} catch (ParseException e) {
throw new IOException("Invalid Projection: "+e.getMessage());
}
@@ -1085,29 +1203,41 @@
@Override
public boolean advance() throws IOException {
- if (atEnd()) {
- return false;
- }
- scanners[current].advance();
- if (scanners[current].atEnd()) {
+ if (atEnd()) {
+ return false;
+ }
+ scanner.advance();
+ while (true)
+ {
+ if (scanner.atEnd()) {
+ scanner.close();
+ scanner = null;
++current;
if (!atEnd()) {
- scanners[current].rewind();
- }
- }
- return true;
+ scanner = scanners[current].getTFileScanner();
+ } else
+ return false;
+ } else
+ return true;
}
+ }
public boolean advanceCG() throws IOException {
- scanners[current].advance();
- if (scanners[current].atEnd()) {
+ scanner.advance();
+ while (true)
+ {
+ if (scanner.atEnd()) {
+ scanner.close();
+ scanner = null;
++current;
if (!atEnd()) {
- scanners[current].rewind();
- }
- }
- return true;
+ scanner = scanners[current].getTFileScanner();
+ } else
+ return false;
+ } else
+ return true;
}
+ }
@Override
public boolean atEnd() throws IOException {
@@ -1136,12 +1266,34 @@
index = beginIndex;
}
+ int prevCurrent = current;
current = index - beginIndex;
- return scanners[current].seekTo(key);
+ if (current != prevCurrent)
+ {
+ if (scanner != null)
+ {
+ try {
+ scanner.close();
+ } catch (Exception e) {
+ // no-op
+ }
+ }
+ scanner = scanners[current].getTFileScanner();
+ }
+ return scanner.seekTo(key);
}
@Override
public void seekToEnd() throws IOException {
+ if (scanner != null)
+ {
+ try {
+ scanner.close();
+ } catch (Exception e) {
+ // no-op
+ }
+ }
+ scanner = null;
current = scanners.length;
}
@@ -1149,11 +1301,12 @@
public void close() throws IOException {
if (!scannerClosed) {
scannerClosed = true;
- for (int i = 0; i < scanners.length; ++i) {
+ if (scanner != null)
+ {
try {
- scanners[i].close();
- }
- catch (Exception e) {
+ scanner.close();
+ scanner = null;
+ } catch (Exception e) {
// no-op
}
}
@@ -1191,18 +1344,35 @@
}
public static class CGRowSplit implements Writable {
- String name;
- long startByte = -1;
- long numBytes = -1;
- long startRow = -1;
- long numRows = -1;
- long size = 0; // size of the file in the selected CG
+ int length; // number of files in the batch
+ long startByteFirst = -1;
+ long numBytesFirst;
+ long startRowFirst = -1;
+ long numRowsFirst = -1;
+ long startByteLast = -1;
+ long numBytesLast;
+ long startRowLast = -1;
+ long numRowsLast = -1;
+ String[] names;
+ long[] sizes = null;
+
+ CGRowSplit(String[] names, long[] sizes, FileSystem fs, Configuration conf,
+ int length, long startFirst, long bytesFirst,
+ long startLast, long bytesLast) throws IOException {
+ this.names = names;
+ this.sizes = sizes;
+ this.length = length;
- CGRowSplit(String name, long start, long len, long size) {
- this.name = name;
- this.startByte = start;
- this.numBytes = len;
- this.size = size;
+ if (startFirst != -1)
+ {
+ startByteFirst = startFirst;
+ numBytesFirst = bytesFirst;
+ }
+ if (startLast != -1 && this.length > 1)
+ {
+ startByteLast = startLast;
+ numBytesLast = bytesLast;
+ }
}
public CGRowSplit() {
@@ -1212,35 +1382,64 @@
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
- sb.append("{name = " + name + "}\n");
- sb.append("{startByte = " + startByte + "}\n");
- sb.append("{numBytes = " + numBytes + "}\n");
- sb.append("{startRow = " + startRow + "}\n");
- sb.append("{numRows = " + numRows + "}\n");
- sb.append("{size = " + size + "}\n");
+ sb.append("{length = " + length + "}\n");
+ for (int i = 0; i < length; i++)
+ {
+ sb.append("{name = " + names[i] + "}\n");
+ sb.append("{size = " + sizes[i] + "}\n");
+ }
+ sb.append("{startByteFirst = " + startByteFirst + "}\n");
+ sb.append("{numBytesFirst = " + numBytesFirst + "}\n");
+ sb.append("{startRowFirst = " + startRowFirst + "}\n");
+ sb.append("{numRowsFirst = " + numRowsFirst + "}\n");
+ sb.append("{startByteLast = " + startByteLast + "}\n");
+ sb.append("{numBytesLast = " + numBytesLast + "}\n");
+ sb.append("{startRowLast = " + startRowLast + "}\n");
+ sb.append("{numRowsLast = " + numRowsLast + "}\n");
return sb.toString();
}
@Override
public void readFields(DataInput in) throws IOException {
- name = Utils.readString(in);
- startByte = Utils.readVLong(in);
- numBytes = Utils.readVLong(in);
- startRow = Utils.readVLong(in);
- numRows = Utils.readVLong(in);
- size = Utils.readVLong(in);
+ length = Utils.readVInt(in);
+ if (length > 0)
+ {
+ names = new String[length];
+ sizes = new long[length];
+ }
+ for (int i = 0; i < length; i++)
+ {
+ names[i] = Utils.readString(in);
+ sizes[i] = Utils.readVLong(in);
+ }
+ startByteFirst = Utils.readVLong(in);
+ numBytesFirst = Utils.readVLong(in);
+ startRowFirst = Utils.readVLong(in);
+ numRowsFirst = Utils.readVLong(in);
+ startByteLast = Utils.readVLong(in);
+ numBytesLast = Utils.readVLong(in);
+ startRowLast = Utils.readVLong(in);
+ numRowsLast = Utils.readVLong(in);
}
@Override
public void write(DataOutput out) throws IOException {
- Utils.writeString(out, name);
- Utils.writeVLong(out, startByte);
- Utils.writeVLong(out, numBytes);
- Utils.writeVLong(out, startRow);
- Utils.writeVLong(out, numRows);
- Utils.writeVLong(out, size);
- }
+ Utils.writeVInt(out, length);
+ for (int i = 0; i < length; i++)
+ {
+ Utils.writeString(out, names[i]);
+ Utils.writeVLong(out, sizes[i]);
+ }
+ Utils.writeVLong(out, startByteFirst);
+ Utils.writeVLong(out, numBytesFirst);
+ Utils.writeVLong(out, startRowFirst);
+ Utils.writeVLong(out, numRowsFirst);
+ Utils.writeVLong(out, startByteLast);
+ Utils.writeVLong(out, numBytesLast);
+ Utils.writeVLong(out, startRowLast);
+ Utils.writeVLong(out, numRowsLast);
+ }
}
private static class SplitColumn {
@@ -1852,7 +2051,7 @@
return cgie.getIndex();
}
}
- throw new IOException("File " + filename + " is not in the column group index");
+ return -1;
}
int size() {
Modified: hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableInputFormat.java?rev=920562&r1=920561&r2=920562&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableInputFormat.java (original)
+++ hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableInputFormat.java Mon Mar 8 23:20:34 2010
@@ -29,6 +29,8 @@
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.io.RawComparator;
@@ -38,6 +40,7 @@
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.InvalidInputException;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.zebra.io.BasicTable;
@@ -379,7 +382,6 @@
{
throw new IOException("The table is not properly sorted");
}
- setSorted(conf);
} else {
List<LeafTableInfo> leaves = expr.getLeafTables(null);
for (Iterator<LeafTableInfo> it = leaves.iterator(); it.hasNext(); )
@@ -403,9 +405,9 @@
}
}
}
- // need key range input splits for sorted table union
- setSorted(conf);
}
+ // need key range input splits for sorted table union
+ setSorted(conf);
}
/**
@@ -635,8 +637,48 @@
}
private static class DummyFileInputFormat extends FileInputFormat<BytesWritable, Tuple> {
- public DummyFileInputFormat(long minSplitSize) {
+ /**
+ * the next constant and class are copies from FileInputFormat
+ */
+ private static final PathFilter hiddenFileFilter = new PathFilter(){
+ public boolean accept(Path p){
+ String name = p.getName();
+ return !name.startsWith("_") && !name.startsWith(".");
+ }
+ };
+
+ /**
+ * Proxy PathFilter that accepts a path only if all filters given in the
+ * constructor do. Used by the listPaths() to apply the built-in
+ * hiddenFileFilter together with a user provided one (if any).
+ */
+ private static class MultiPathFilter implements PathFilter {
+ private List<PathFilter> filters;
+
+ public MultiPathFilter(List<PathFilter> filters) {
+ this.filters = filters;
+ }
+
+ public boolean accept(Path path) {
+ for (PathFilter filter : filters) {
+ if (!filter.accept(path)) {
+ return false;
+ }
+ }
+ return true;
+ }
+ }
+ private Integer[] fileNumbers = null;
+
+ private List<BasicTable.Reader> readers;
+
+ public Integer[] getFileNumbers() {
+ return fileNumbers;
+ }
+
+ public DummyFileInputFormat(long minSplitSize, List<BasicTable.Reader> readers) {
super.setMinSplitSize(minSplitSize);
+ this.readers = readers;
}
@Override
@@ -645,12 +687,93 @@
// no-op
return null;
}
+
+ @Override
+ public long computeSplitSize(long goalSize, long minSize, long blockSize) {
+ return super.computeSplitSize(goalSize, minSize, blockSize);
+ }
+
+ /**
+ * copy from FileInputFormat: add assignment to table file numbers
+ */
+ @Override
+ public FileStatus[] listStatus(JobConf job) throws IOException {
+ Path[] dirs = getInputPaths(job);
+ if (dirs.length == 0) {
+ throw new IOException("No input paths specified in job");
+ }
+
+ List<FileStatus> result = new ArrayList<FileStatus>();
+ List<IOException> errors = new ArrayList<IOException>();
+
+ // creates a MultiPathFilter with the hiddenFileFilter and the
+ // user provided one (if any).
+ List<PathFilter> filters = new ArrayList<PathFilter>();
+ filters.add(hiddenFileFilter);
+ PathFilter jobFilter = getInputPathFilter(job);
+ if (jobFilter != null) {
+ filters.add(jobFilter);
+ }
+ PathFilter inputFilter = new MultiPathFilter(filters);
+
+ ArrayList<Integer> fileNumberList = new ArrayList<Integer>();
+ int index = 0;
+ for (Path p: dirs) {
+ FileSystem fs = p.getFileSystem(job);
+ FileStatus[] matches = fs.globStatus(p, inputFilter);
+ if (matches == null) {
+ errors.add(new IOException("Input path does not exist: " + p));
+ } else if (matches.length == 0) {
+ errors.add(new IOException("Input Pattern " + p + " matches 0 files"));
+ } else {
+ for (FileStatus globStat: matches) {
+ if (globStat.isDir()) {
+ FileStatus[] fileStatuses = fs.listStatus(globStat.getPath(), inputFilter);
+ // reorder according to CG index
+ BasicTable.Reader reader = readers.get(index);
+ reader.rearrangeFileIndices(fileStatuses);
+ for(FileStatus stat: fileStatuses) {
+ if (stat != null)
+ result.add(stat);
+ }
+ fileNumberList.add(fileStatuses.length);
+ } else {
+ result.add(globStat);
+ fileNumberList.add(1);
+ }
+ }
+ }
+ index++;
+ }
+ fileNumbers = new Integer[fileNumberList.size()];
+ fileNumberList.toArray(fileNumbers);
+
+ if (!errors.isEmpty()) {
+ throw new InvalidInputException(errors);
+ }
+ LOG.info("Total input paths to process : " + result.size());
+ return result.toArray(new FileStatus[result.size()]);
+ }
}
private static InputSplit[] getRowSplits(JobConf conf, int numSplits,
- TableExpr expr, List<BasicTable.Reader> readers) throws IOException {
+ TableExpr expr, List<BasicTable.Reader> readers,
+ List<BasicTableStatus> status) throws IOException {
ArrayList<InputSplit> ret = new ArrayList<InputSplit>();
- DummyFileInputFormat helper = new DummyFileInputFormat(getMinSplitSize(conf));
+
+ long minSplitSize = getMinSplitSize(conf);
+
+ long minSize = Math.max(conf.getLong("mapred.min.split.size", 1), minSplitSize);
+ long totalBytes = 0;
+ for (Iterator<BasicTableStatus> it = status.iterator(); it.hasNext(); )
+ {
+ totalBytes += it.next().getSize();
+ }
+ long goalSize = totalBytes / (numSplits < 1 ? 1 : numSplits);
+ StringBuilder sb = new StringBuilder();
+ boolean first = true;
+ PathFilter filter = null;
+ List<BasicTable.Reader> realReaders = new ArrayList<BasicTable.Reader>();
for (int i = 0; i < readers.size(); ++i) {
BasicTable.Reader reader = readers.get(i);
@@ -660,28 +783,116 @@
/* We can create input splits only if there does exist a valid column group for split.
* Otherwise, we do not create input splits. */
if (splitCGIndex >= 0) {
- Path path = new Path (reader.getPath().toString() + "/" + reader.getName(splitCGIndex));
- DummyFileInputFormat.setInputPaths(conf, path);
- PathFilter filter = reader.getPathFilter(conf);
- DummyFileInputFormat.setInputPathFilter(conf, filter.getClass());
- InputSplit[] inputSplits = helper.getSplits(conf, (numSplits < 1 ? 1 : numSplits));
+ realReaders.add(reader);
+ if (first)
+ {
+ // filter is identical across tables
+ filter = reader.getPathFilter(conf);
+ first = false;
+ } else
+ sb.append(",");
+ sb.append(reader.getPath().toString() + "/" + reader.getName(splitCGIndex));
+ }
+ }
+
+ DummyFileInputFormat helper = new DummyFileInputFormat(minSplitSize, realReaders);
+
+ if (!realReaders.isEmpty())
+ {
+ DummyFileInputFormat.setInputPaths(conf, sb.toString());
+ DummyFileInputFormat.setInputPathFilter(conf, filter.getClass());
+ InputSplit[] inputSplits = helper.getSplits(conf, (numSplits < 1 ? 1 : numSplits));
+
+ int batchesPerSplit = inputSplits.length / (numSplits < 1 ? 1 : numSplits);
+ if (batchesPerSplit <= 0)
+ batchesPerSplit = 1;
+
+ /*
+ * Potential file batching optimizations include:
+ * 1) sort single file inputSplits in the descending order of their sizes so
+ * that the ops of new file opens are spread to a maximum degree;
+ * 2) batching the files with maximum block distribution affinities into the same input split
+ */
+
+ int[] inputSplitBoundaries = new int[realReaders.size()];
+ long start, prevStart = Long.MIN_VALUE;
+ int tableIndex = 0, fileNumber = 0;
+ Integer[] fileNumbers = helper.getFileNumbers();
+ if (fileNumbers.length != realReaders.size())
+ throw new IOException("Number of tables in input paths of input splits is incorrect.");
+ for (int j=0; j<inputSplits.length; j++) {
+ FileSplit fileSplit = (FileSplit) inputSplits[j];
+ start = fileSplit.getStart();
+ if (start <= prevStart)
+ {
+ fileNumber++;
+ if (fileNumber >= fileNumbers[tableIndex])
+ {
+ inputSplitBoundaries[tableIndex++] = j;
+ fileNumber = 0;
+ }
+ }
+ prevStart = start;
+ }
+ inputSplitBoundaries[tableIndex++] = inputSplits.length;
+ if (tableIndex != realReaders.size())
+ throw new IOException("Number of tables in input splits is incorrect.");
+ for (tableIndex = 0; tableIndex < realReaders.size(); tableIndex++)
+ {
+ int startSplitIndex = (tableIndex == 0 ? 0 : inputSplitBoundaries[tableIndex - 1]);
+ int splitLen = (tableIndex == 0 ? inputSplitBoundaries[0] :
+ inputSplitBoundaries[tableIndex] - inputSplitBoundaries[tableIndex-1]);
+ BasicTable.Reader reader = realReaders.get(tableIndex);
+ /* Get the index of the column group that will be used for row-split.*/
+ int splitCGIndex = reader.getRowSplitCGIndex();
- long starts[] = new long[inputSplits.length];
- long lengths[] = new long[inputSplits.length];
- Path paths[] = new Path [inputSplits.length];
- for (int j=0; j<inputSplits.length; j++) {
+ long starts[] = new long[splitLen];
+ long lengths[] = new long[splitLen];
+ int batches[] = new int[splitLen + 1];
+ batches[0] = 0;
+ int numBatches = 0;
+ int batchSize = 0;
+ Path paths[] = new Path [splitLen];
+ long totalLen = 0;
+ final double SPLIT_SLOP = 1.1;
+ int endSplitIndex = startSplitIndex + splitLen;
+ for (int j=startSplitIndex; j< endSplitIndex; j++) {
FileSplit fileSplit = (FileSplit) inputSplits[j];
Path p = fileSplit.getPath();
- long start = fileSplit.getStart();
+ long blockSize = p.getFileSystem(conf).getBlockSize(p);
+ long splitSize = (long) (helper.computeSplitSize(goalSize, minSize, blockSize) * SPLIT_SLOP);
+ start = fileSplit.getStart();
long length = fileSplit.getLength();
-
- starts[j] = start;
- lengths[j] = length;
- paths[j] = p;
+ int index = j - startSplitIndex;
+ starts[index] = start;
+ lengths[index] = length;
+ totalLen += length;
+ paths[index] = p;
+ if (totalLen >= splitSize)
+ {
+
+ for (int ii = batches[numBatches] + 1; ii < index - 1; ii++)
+ starts[ii] = -1; // all intermediate files are not split
+ batches[++numBatches] = index;
+ batchSize = 1;
+ totalLen = length;
+ } else if (batchSize + 1 > batchesPerSplit) {
+ for (int ii = batches[numBatches] + 1; ii < index - 1; ii++)
+ starts[ii] = -1; // all intermediate files are not split
+ batches[++numBatches] = index;
+ batchSize = 1;
+ totalLen = length;
+ } else {
+ batchSize++;
+ }
}
+ for (int ii = batches[numBatches] + 1; ii < splitLen - 1; ii++)
+ starts[ii] = -1; // all intermediate files are not split
+ if (splitLen > 0)
+ batches[++numBatches] = splitLen;
- List<RowSplit> subSplits = reader.rowSplit(starts, lengths, paths, splitCGIndex);
-
+ List<RowSplit> subSplits = reader.rowSplit(starts, lengths, paths, splitCGIndex, batches, numBatches);
+
for (Iterator<RowSplit> it = subSplits.iterator(); it.hasNext();) {
RowSplit subSplit = it.next();
RowTableSplit split = new RowTableSplit(reader, subSplit, conf);
@@ -689,7 +900,7 @@
}
}
}
-
+
LOG.info("getSplits : returning " + ret.size() + " row splits.");
return ret.toArray(new InputSplit[ret.size()]);
}
@@ -728,11 +939,8 @@
BasicTable.Reader reader =
new BasicTable.Reader(leaf.getPath(), conf);
reader.setProjection(leaf.getProjection());
- if (sorted)
- {
- BasicTableStatus s = reader.getStatus();
- status.add(s);
- }
+ BasicTableStatus s = reader.getStatus();
+ status.add(s);
readers.add(reader);
if (first)
first = false;
@@ -753,7 +961,7 @@
return getSortedSplits(conf, numSplits, expr, readers, status);
}
- return getRowSplits(conf, numSplits, expr, readers);
+ return getRowSplits(conf, numSplits, expr, readers, status);
} catch (ParseException e) {
throw new IOException("Projection parsing failed : "+e.getMessage());
}
Modified: hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableInputFormat.java?rev=920562&r1=920561&r2=920562&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableInputFormat.java (original)
+++ hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableInputFormat.java Mon Mar 8 23:20:34 2010
@@ -30,6 +30,8 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.WritableUtils;
@@ -41,6 +43,7 @@
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.InvalidInputException;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.zebra.io.BasicTable;
@@ -401,7 +404,6 @@
{
throw new IOException("The table is not properly sorted");
}
- setSorted( conf );
} else {
List<LeafTableInfo> leaves = expr.getLeafTables(null);
for (Iterator<LeafTableInfo> it = leaves.iterator(); it.hasNext(); )
@@ -425,9 +427,9 @@
}
}
}
- // need key range input splits for sorted table union
- setSorted( conf );
}
+ // need key range input splits for sorted table union
+ setSorted(conf);
}
/**
@@ -612,25 +614,145 @@
}
private static class DummyFileInputFormat extends FileInputFormat<BytesWritable, Tuple> {
- public DummyFileInputFormat(Job job, long minSplitSize) {
- super.setMinInputSplitSize( job, minSplitSize );
+ /**
+ * the next constant and class are copies from FileInputFormat
+ */
+ private static final PathFilter hiddenFileFilter = new PathFilter(){
+ public boolean accept(Path p){
+ String name = p.getName();
+ return !name.startsWith("_") && !name.startsWith(".");
+ }
+ };
+
+ /**
+ * Proxy PathFilter that accepts a path only if all filters given in the
+ * constructor do. Used by the listPaths() to apply the built-in
+ * hiddenFileFilter together with a user provided one (if any).
+ */
+ private static class MultiPathFilter implements PathFilter {
+ private List<PathFilter> filters;
+
+ public MultiPathFilter(List<PathFilter> filters) {
+ this.filters = filters;
+ }
+
+ public boolean accept(Path path) {
+ for (PathFilter filter : filters) {
+ if (!filter.accept(path)) {
+ return false;
+ }
+ }
+ return true;
+ }
+ }
+ private Integer[] fileNumbers = null;
+
+ private List<BasicTable.Reader> readers;
+
+ public Integer[] getFileNumbers() {
+ return fileNumbers;
+ }
+
+ public DummyFileInputFormat(Job job, long minSplitSize, List<BasicTable.Reader> readers) {
+ super.setMinInputSplitSize(job, minSplitSize);
+ this.readers = readers;
}
- @Override
- public RecordReader<BytesWritable, Tuple> createRecordReader(
- InputSplit arg0, TaskAttemptContext arg1) throws IOException,
- InterruptedException {
- // no-op
- return null;
- }
+ @Override
+ public RecordReader<BytesWritable, Tuple> createRecordReader(
+ InputSplit arg0, TaskAttemptContext arg1) throws IOException,
+ InterruptedException {
+ // no-op
+ return null;
+ }
+
+ @Override
+ public long computeSplitSize(long blockSize, long minSize, long maxSize) {
+ return super.computeSplitSize(blockSize, minSize, maxSize);
+ }
+
+ /**
+ * copy from FileInputFormat: add assignment to table file numbers
+ */
+ @Override
+ public List<FileStatus> listStatus(JobContext jobContext) throws IOException {
+ Configuration job = jobContext.getConfiguration();
+ Path[] dirs = getInputPaths(jobContext);
+ if (dirs.length == 0) {
+ throw new IOException("No input paths specified in job");
+ }
+
+ List<FileStatus> result = new ArrayList<FileStatus>();
+ List<IOException> errors = new ArrayList<IOException>();
+
+ // creates a MultiPathFilter with the hiddenFileFilter and the
+ // user provided one (if any).
+ List<PathFilter> filters = new ArrayList<PathFilter>();
+ filters.add(hiddenFileFilter);
+ PathFilter jobFilter = getInputPathFilter(jobContext);
+ if (jobFilter != null) {
+ filters.add(jobFilter);
+ }
+ PathFilter inputFilter = new MultiPathFilter(filters);
+
+ ArrayList<Integer> fileNumberList = new ArrayList<Integer>();
+ int index = 0;
+ for (Path p: dirs) {
+ FileSystem fs = p.getFileSystem(job);
+ FileStatus[] matches = fs.globStatus(p, inputFilter);
+ if (matches == null) {
+ errors.add(new IOException("Input path does not exist: " + p));
+ } else if (matches.length == 0) {
+ errors.add(new IOException("Input Pattern " + p + " matches 0 files"));
+ } else {
+ for (FileStatus globStat: matches) {
+ if (globStat.isDir()) {
+ FileStatus[] fileStatuses = fs.listStatus(globStat.getPath(), inputFilter);
+ // reorder according to CG index
+ BasicTable.Reader reader = readers.get(index);
+ reader.rearrangeFileIndices(fileStatuses);
+ for(FileStatus stat: fileStatuses) {
+ if (stat != null)
+ result.add(stat);
+ }
+ fileNumberList.add(fileStatuses.length);
+ } else {
+ result.add(globStat);
+ fileNumberList.add(1);
+ }
+ }
+ }
+ index++;
+ }
+ fileNumbers = new Integer[fileNumberList.size()];
+ fileNumberList.toArray(fileNumbers);
+
+ if (!errors.isEmpty()) {
+ throw new InvalidInputException(errors);
+ }
+ LOG.info("Total input paths to process : " + result.size());
+ return result;
+ }
}
private static List<InputSplit> getRowSplits(Configuration conf,
- TableExpr expr, List<BasicTable.Reader> readers) throws IOException {
+ TableExpr expr, List<BasicTable.Reader> readers,
+ List<BasicTableStatus> status) throws IOException {
ArrayList<InputSplit> ret = new ArrayList<InputSplit>();
Job job = new Job(conf);
- DummyFileInputFormat helper = new DummyFileInputFormat(job, getMinSplitSize(conf));
+ long minSplitSize = getMinSplitSize(conf);
+
+ long minSize = Math.max(conf.getLong("mapreduce.min.split.size", 1), minSplitSize);
+ long totalBytes = 0;
+ for (Iterator<BasicTableStatus> it = status.iterator(); it.hasNext(); )
+ {
+ totalBytes += it.next().getSize();
+ }
+ StringBuilder sb = new StringBuilder();
+ boolean first = true;
+ PathFilter filter = null;
+ List<BasicTable.Reader> realReaders = new ArrayList<BasicTable.Reader>();
for (int i = 0; i < readers.size(); ++i) {
BasicTable.Reader reader = readers.get(i);
@@ -640,29 +762,103 @@
/* We can create input splits only if there does exist a valid column group for split.
* Otherwise, we do not create input splits. */
if (splitCGIndex >= 0) {
- Path path = new Path (reader.getPath().toString() + "/" + reader.getName(splitCGIndex));
- DummyFileInputFormat.setInputPaths(job, path);
- PathFilter filter = reader.getPathFilter( job.getConfiguration() );
- DummyFileInputFormat.setInputPathFilter(job, filter.getClass());
-
- List<InputSplit> inputSplits = helper.getSplits( job );
-
- long starts[] = new long[inputSplits.size()];
- long lengths[] = new long[inputSplits.size()];
- Path paths[] = new Path [inputSplits.size()];
- for (int j=0; j<inputSplits.size(); j++) {
- FileSplit fileSplit = (FileSplit) inputSplits.get( j );
+ realReaders.add(reader);
+ if (first)
+ {
+ // filter is identical across tables
+ filter = reader.getPathFilter(conf);
+ first = false;
+ } else
+ sb.append(",");
+ sb.append(reader.getPath().toString() + "/" + reader.getName(splitCGIndex));
+ }
+ }
+
+ DummyFileInputFormat helper = new DummyFileInputFormat(job,minSplitSize, realReaders);
+
+ if (!realReaders.isEmpty())
+ {
+ DummyFileInputFormat.setInputPaths(job, sb.toString());
+ DummyFileInputFormat.setInputPathFilter(job, filter.getClass());
+ List<InputSplit> inputSplitList = helper.getSplits(job);
+ InputSplit[] inputSplits = inputSplitList.toArray(new InputSplit[0]);
+
+ /*
+ * Potential file batching optimizations include:
+ * 1) sort single file inputSplits in the descending order of their sizes so
+ * that the ops of new file opens are spread to a maximum degree;
+ * 2) batching the files with maximum block distribution affinities into the same input split
+ */
+
+ int[] inputSplitBoundaries = new int[realReaders.size()];
+ long start, prevStart = Long.MIN_VALUE;
+ int tableIndex = 0, fileNumber = 0;
+ Integer[] fileNumbers = helper.getFileNumbers();
+ if (fileNumbers.length != realReaders.size())
+ throw new IOException("Number of tables in input paths of input splits is incorrect.");
+ for (int j=0; j<inputSplits.length; j++) {
+ FileSplit fileSplit = (FileSplit) inputSplits[j];
+ start = fileSplit.getStart();
+ if (start <= prevStart)
+ {
+ fileNumber++;
+ if (fileNumber >= fileNumbers[tableIndex])
+ {
+ inputSplitBoundaries[tableIndex++] = j;
+ fileNumber = 0;
+ }
+ }
+ prevStart = start;
+ }
+ inputSplitBoundaries[tableIndex++] = inputSplits.length;
+ if (tableIndex != realReaders.size())
+ throw new IOException("Number of tables in input splits is incorrect.");
+ for (tableIndex = 0; tableIndex < realReaders.size(); tableIndex++)
+ {
+ int startSplitIndex = (tableIndex == 0 ? 0 : inputSplitBoundaries[tableIndex - 1]);
+ int splitLen = (tableIndex == 0 ? inputSplitBoundaries[0] :
+ inputSplitBoundaries[tableIndex] - inputSplitBoundaries[tableIndex-1]);
+ BasicTable.Reader reader = realReaders.get(tableIndex);
+ /* Get the index of the column group that will be used for row-split.*/
+ int splitCGIndex = reader.getRowSplitCGIndex();
+
+ long starts[] = new long[splitLen];
+ long lengths[] = new long[splitLen];
+ int batches[] = new int[splitLen + 1];
+ batches[0] = 0;
+ int numBatches = 0;
+ Path paths[] = new Path [splitLen];
+ long totalLen = 0;
+ final double SPLIT_SLOP = 1.1;
+ int endSplitIndex = startSplitIndex + splitLen;
+ for (int j=startSplitIndex; j< endSplitIndex; j++) {
+ FileSplit fileSplit = (FileSplit) inputSplits[j];
Path p = fileSplit.getPath();
- long start = fileSplit.getStart();
+ long blockSize = p.getFileSystem(conf).getBlockSize(p);
+ long splitSize = (long) (helper.computeSplitSize(blockSize, minSize, totalBytes) * SPLIT_SLOP);
+ start = fileSplit.getStart();
long length = fileSplit.getLength();
+ int index = j - startSplitIndex;
+ starts[index] = start;
+ lengths[index] = length;
+ totalLen += length;
+ paths[index] = p;
+ if (totalLen >= splitSize)
+ {
- starts[j] = start;
- lengths[j] = length;
- paths[j] = p;
+ for (int ii = batches[numBatches] + 1; ii < index - 1; ii++)
+ starts[ii] = -1; // all intermediate files are not split
+ batches[++numBatches] = index;
+ totalLen = length;
+ }
}
+ for (int ii = batches[numBatches] + 1; ii < splitLen - 1; ii++)
+ starts[ii] = -1; // all intermediate files are not split
+ if (splitLen > 0)
+ batches[++numBatches] = splitLen;
+
+ List<RowSplit> subSplits = reader.rowSplit(starts, lengths, paths, splitCGIndex, batches, numBatches);
- List<RowSplit> subSplits = reader.rowSplit(starts, lengths, paths, splitCGIndex);
-
for (Iterator<RowSplit> it = subSplits.iterator(); it.hasNext();) {
RowSplit subSplit = it.next();
RowTableSplit split = new RowTableSplit(reader, subSplit, conf);
@@ -670,7 +866,7 @@
}
}
}
-
+
LOG.info("getSplits : returning " + ret.size() + " row splits.");
return ret;
}
@@ -718,11 +914,8 @@
BasicTable.Reader reader =
new BasicTable.Reader(leaf.getPath(), conf );
reader.setProjection(leaf.getProjection());
- if (sorted)
- {
- BasicTableStatus s = reader.getStatus();
- status.add(s);
- }
+ BasicTableStatus s = reader.getStatus();
+ status.add(s);
readers.add(reader);
if (first)
first = false;
@@ -742,7 +935,7 @@
return sorted ?
singleSplit ? getSortedSplits( conf, 1, expr, readers, status) : getSortedSplits(conf, -1, expr, readers, status) :
- getRowSplits( conf, expr, readers);
+ getRowSplits( conf, expr, readers, status);
} catch (ParseException e) {
throw new IOException("Projection parsing failed : "+e.getMessage());
} finally {
Modified: hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestDropColumnGroup.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestDropColumnGroup.java?rev=920562&r1=920561&r2=920562&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestDropColumnGroup.java (original)
+++ hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestDropColumnGroup.java Mon Mar 8 23:20:34 2010
@@ -174,13 +174,13 @@
false }, numRows);
TestBasicTable.doRangeSplit(new int[] { 4, 0, 2 }, numRows,
- "a, b, e, f, x", path);
+ "a, b, c, e, f, x", path);
// Remove another CG.
BasicTable.dropColumnGroup(path, conf, "CG0");
TestBasicTable.doRangeSplit(new int[] { 4, 0, 2, 3, 1 }, numRows,
- "a, y, e, f, x", path);
+ "a, y, c, e, f, x", path);
BasicTable.drop(path, conf);
}