You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ya...@apache.org on 2010/02/03 02:08:16 UTC
svn commit: r905856 - in /hadoop/pig/branches/branch-0.6/contrib/zebra: ./
src/java/org/apache/hadoop/zebra/io/ src/java/org/apache/hadoop/zebra/mapred/
src/java/org/apache/hadoop/zebra/pig/
Author: yanz
Date: Wed Feb 3 01:08:15 2010
New Revision: 905856
URL: http://svn.apache.org/viewvc?rev=905856&view=rev
Log:
PIG-1201: unnecessary name node calls by each mapper; too big input split serialization size by Pig's Slice implementation (yanz)
Modified:
hadoop/pig/branches/branch-0.6/contrib/zebra/CHANGES.txt
hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/io/BasicTable.java
hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/io/ColumnGroup.java
hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/BasicTableExpr.java
hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableExpr.java
hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableInputFormat.java
hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableUnionExpr.java
hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableLoader.java
Modified: hadoop/pig/branches/branch-0.6/contrib/zebra/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/contrib/zebra/CHANGES.txt?rev=905856&r1=905855&r2=905856&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.6/contrib/zebra/CHANGES.txt (original)
+++ hadoop/pig/branches/branch-0.6/contrib/zebra/CHANGES.txt Wed Feb 3 01:08:15 2010
@@ -39,6 +39,8 @@
BUG FIXES
+ PIG-1201: unnecessary name node calls by each mapper; too big input split serialization size by Pig's Slice implementation (yanz)
+
PIG-1167: Hadoop file glob support (yanz)
PIG-1145: Merge Join on Large Table throws an EOF exception (yanz)
Modified: hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/io/BasicTable.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/io/BasicTable.java?rev=905856&r1=905855&r2=905856&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/io/BasicTable.java (original)
+++ hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/io/BasicTable.java Wed Feb 3 01:08:15 2010
@@ -95,6 +95,8 @@
private final static String DELETED_CG_PREFIX = ".deleted-";
+ public final static String DELETED_CG_SEPARATOR_PER_TABLE = ",";
+
// no public ctor for instantiating a BasicTable object
private BasicTable() {
// no-op
@@ -142,7 +144,7 @@
/* Retry up to numCGs times accounting for other CG deleting threads or processes.*/
while (triedCount ++ < numCGs) {
try {
- schemaFile = new SchemaFile(path, conf);
+ schemaFile = new SchemaFile(path, null, conf);
break;
} catch (FileNotFoundException e) {
LOG.info("Try " + triedCount + " times : " + e.getMessage());
@@ -198,7 +200,7 @@
} catch (IOException e) {
// one remote possibility is that another user
// already deleted CG.
- SchemaFile tempSchema = new SchemaFile(path, conf);
+ SchemaFile tempSchema = new SchemaFile(path, null, conf);
if (tempSchema.isCGDeleted(cgIdx)) {
LOG.info(path + " : " + cgName +
" is deleted by someone else. That is ok.");
@@ -281,10 +283,15 @@
* Optional configuration parameters.
* @throws IOException
*/
+
public Reader(Path path, Configuration conf) throws IOException {
+ this(path, null, conf);
+ }
+ public Reader(Path path, String[] deletedCGs, Configuration conf) throws IOException {
try {
+ boolean mapper = (deletedCGs != null);
this.path = path;
- schemaFile = new SchemaFile(path, conf);
+ schemaFile = new SchemaFile(path, deletedCGs, conf);
metaReader = MetaFile.createReader(new Path(path, BT_META_FILE), conf);
// create column group readers
int numCGs = schemaFile.getNumOfPhysicalSchemas();
@@ -301,7 +308,7 @@
if (!schemaFile.isCGDeleted(nx)) {
colGroups[nx] =
new ColumnGroup.Reader(new Path(path, partition.getCGSchema(nx).getName()),
- conf);
+ conf, mapper);
if (firstValidCG < 0) {
firstValidCG = nx;
}
@@ -311,7 +318,8 @@
else
cgTuples[nx] = null;
}
- buildStatus();
+ if (schemaFile.isSorted())
+ buildStatus();
closed = false;
}
catch (Exception e) {
@@ -410,7 +418,9 @@
/**
* Get the status of the BasicTable.
*/
- public BasicTableStatus getStatus() {
+ public BasicTableStatus getStatus() throws IOException {
+ if (status == null)
+ buildStatus();
return status;
}
@@ -565,13 +575,16 @@
*
* @param path
* The path to the BasicTable.
+ * @deletedCGs
+ * The deleted column groups from front end; null if unavailable from front end
* @param conf
* @return The logical Schema of the table (all columns).
* @throws IOException
*/
public static Schema getSchema(Path path, Configuration conf)
throws IOException {
- SchemaFile schF = new SchemaFile(path, conf);
+ // fake an empty deleted cg list as getSchema does not care about deleted cgs
+ SchemaFile schF = new SchemaFile(path, new String[0], conf);
return schF.getLogical();
}
@@ -653,7 +666,7 @@
* Get index of the column group that will be used for row-based split.
*
*/
- public int getRowSplitCGIndex() {
+ public int getRowSplitCGIndex() throws IOException {
// Try to find the largest non-deleted and used column group by projection;
int largestCGIndex = -1;
int splitCGIndex = -1;
@@ -725,8 +738,12 @@
String getStorageString() {
return schemaFile.getStorageString();
}
+
+ public String getDeletedCGs() {
+ return schemaFile.getDeletedCGs();
+ }
- private void buildStatus() {
+ private void buildStatus() throws IOException {
status = new BasicTableStatus();
if (firstValidCG >= 0) {
status.beginKey = colGroups[firstValidCG].getStatus().getBeginKey();
@@ -913,11 +930,12 @@
int cgIdx = rowSplit.getCGIndex();
CGRowSplit cgSplit = new CGRowSplit();
- cgSplit.fileIndex = inputCGSplit.fileIndex;
+ cgSplit.name = inputCGSplit.name;
// startByte and numBytes from inputCGSplit are ignored, since
// they make sense for only one CG.
cgSplit.startRow = inputCGSplit.startRow;
cgSplit.numRows = inputCGSplit.numRows;
+ cgSplit.size = inputCGSplit.size;
if (cgSplit.startRow >= 0) {
//assume the rows are already set up.
@@ -1337,9 +1355,11 @@
* thrown if the table is already closed, or is in the process of being
* closed.
*/
+
public Writer(Path path, Configuration conf) throws IOException {
try {
- schemaFile = new SchemaFile(path, conf);
+ // fake an empty deleted cg list as no cg should have been deleted now
+ schemaFile = new SchemaFile(path, new String[0], conf);
int numCGs = schemaFile.getNumOfPhysicalSchemas();
partition = schemaFile.getPartition();
sorted = schemaFile.isSorted();
@@ -1650,8 +1670,8 @@
boolean[] cgDeletedFlags;
// ctor for reading
- public SchemaFile(Path path, Configuration conf) throws IOException {
- readSchemaFile(path, conf);
+ public SchemaFile(Path path, String[] deletedCGs, Configuration conf) throws IOException {
+ readSchemaFile(path, deletedCGs, conf);
}
public Schema[] getPhysicalSchema() {
@@ -1798,7 +1818,7 @@
outSchema.close();
}
- private void readSchemaFile(Path path, Configuration conf)
+ private void readSchemaFile(Path path, String[] deletedCGs, Configuration conf)
throws IOException {
Path pathSchema = makeSchemaFilePath(path);
if (!path.getFileSystem(conf).exists(pathSchema)) {
@@ -1845,7 +1865,18 @@
throw new IOException("parser.RecordSchema failed :" + e.getMessage());
}
sorted = WritableUtils.readVInt(in) == 1 ? true : false;
- setCGDeletedFlags(path, conf);
+ if (deletedCGs == null)
+ setCGDeletedFlags(path, conf);
+ else {
+ for (String deletedCG : deletedCGs)
+ {
+ for (int i = 0; i < cgschemas.length; i++)
+ {
+ if (cgschemas[i].getName().equals(deletedCG))
+ cgDeletedFlags[i] = true;
+ }
+ }
+ }
if (version.compareTo(new Version((short)1, (short)0)) > 0)
{
int numSortColumns = WritableUtils.readVInt(in);
@@ -1915,7 +1946,23 @@
}
}
-
+ String getDeletedCGs() {
+ StringBuilder sb = new StringBuilder();
+ // comma separated
+ boolean first = true;
+ for (int i = 0; i < physical.length; i++) {
+ if (cgDeletedFlags[i])
+ {
+ if (first)
+ first = false;
+ else {
+ sb.append(DELETED_CG_SEPARATOR_PER_TABLE);
+ }
+ sb.append(getName(i));
+ }
+ }
+ return sb.toString();
+ }
}
static public void dumpInfo(String file, PrintStream out, Configuration conf)
Modified: hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/io/ColumnGroup.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/io/ColumnGroup.java?rev=905856&r1=905855&r2=905856&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/io/ColumnGroup.java (original)
+++ hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/io/ColumnGroup.java Wed Feb 3 01:08:15 2010
@@ -225,6 +225,7 @@
SplitColumn top; // directly associated with logical schema
SplitColumn leaf; // corresponding to projection
boolean closed;
+ boolean dirty;
/**
* Get the Column Group physical schema without loading the full CG index.
@@ -255,13 +256,24 @@
*/
public Reader(Path path, Configuration conf) throws IOException,
ParseException {
- this(path, true, conf);
+ this(path, conf, false);
}
-
+
+ public Reader(Path path, Configuration conf, boolean mapper) throws IOException,
+ ParseException {
+ this(path, true, conf, mapper);
+ }
+
Reader(Path path, boolean dirty, Configuration conf) throws IOException,
ParseException {
+ this(path, dirty, conf, false);
+ }
+
+ Reader(Path path, boolean dirty, Configuration conf, boolean mapper) throws IOException,
+ ParseException {
this.path = path;
this.conf = conf;
+ this.dirty = dirty;
fs = path.getFileSystem(conf);
// check existence of path
@@ -269,7 +281,7 @@
throw new IOException("Path doesn't exist: " + path);
}
- if (!fs.getFileStatus(path).isDir()) {
+ if (!mapper && !fs.getFileStatus(path).isDir()) {
throw new IOException("Path exists but not a directory: " + path);
}
@@ -279,8 +291,8 @@
}
projection = new Projection(cgschema.getSchema()); // default projection to CG schema.
Path metaFilePath = makeMetaFilePath(path);
- /* If index file is not existing or loading from an unsorted table. */
- if (!fs.exists(metaFilePath) || !cgschema.isSorted() ) {
+ /* If index file is not existing */
+ if (!fs.exists(metaFilePath)) {
// special case for unsorted CG that did not create index properly.
if (cgschema.isSorted()) {
throw new FileNotFoundException(
@@ -288,15 +300,16 @@
}
cgindex = buildIndex(fs, path, dirty, conf);
}
- else {
+ else if (cgschema.isSorted()) {
MetaFile.Reader metaFile = MetaFile.createReader(metaFilePath, conf);
try {
cgindex = new CGIndex();
DataInputStream dis = metaFile.getMetaBlock(BLOCK_NAME_INDEX);
try {
cgindex.readFields(dis);
- }
- finally {
+ } catch (IOException e) {
+ throw new IOException("Index file read failure :"+ e.getMessage());
+ } finally {
dis.close();
}
}
@@ -429,6 +442,8 @@
}
if (split == null) {
+ if (cgindex == null)
+ cgindex = buildIndex(fs, path, dirty, conf);
return getScanner(new CGRangeSplit(0, cgindex.size()), closeReader);
}
if (split.len < 0) {
@@ -474,6 +489,8 @@
return getBlockDistribution(new CGRangeSplit(0, cgindex.size()));
}
+ if (cgindex == null)
+ cgindex = buildIndex(fs, path, dirty, conf);
if ((split.start | split.len | (cgindex.size() - split.start - split.len)) < 0) {
throw new IndexOutOfBoundsException("Bad split");
}
@@ -509,10 +526,9 @@
}
BlockDistribution ret = new BlockDistribution();
- if (split.fileIndex >= 0)
+ if (split.name != null)
{
- CGIndexEntry entry = cgindex.get(split.fileIndex);
- FileStatus tfileStatus = fs.getFileStatus(new Path(path, entry.getName()));
+ FileStatus tfileStatus = fs.getFileStatus(new Path(path, split.name));
BlockLocation[] locations = fs.getFileBlockLocations(tfileStatus, split.startByte, split.numBytes);
for (BlockLocation l : locations) {
@@ -532,17 +548,26 @@
void fillRowSplit(CGRowSplit rowSplit, long startOffset, long length)
throws IOException {
- if (rowSplit.fileIndex < 0)
+ if (rowSplit.name == null)
return;
- Path tfPath = new Path(path, cgindex.get(rowSplit.fileIndex).getName());
- FileStatus tfile = fs.getFileStatus(tfPath);
+ Path tfPath = new Path(path, rowSplit.name);
+ long size = rowSplit.size;
+ if (size == 0)
+ {
+ /* the on disk table is sorted. Later this will be made unnecessary when
+ * CGIndexEntry serializes its bytes field and the meta file versioning is
+ * supported.
+ */
+ FileStatus tfile = fs.getFileStatus(tfPath);
+ size = tfile.getLen();
+ }
TFile.Reader reader = null;
try {
reader = new TFile.Reader(fs.open(tfPath),
- tfile.getLen(), conf);
+ size, conf);
long startRow = reader.getRecordNumNear(startOffset);
long endRow = reader.getRecordNumNear(startOffset + length);
@@ -703,7 +728,9 @@
/**
* Get the status of the ColumnGroup.
*/
- public BasicTableStatus getStatus() {
+ public BasicTableStatus getStatus() throws IOException {
+ if (cgindex == null)
+ cgindex = buildIndex(fs, path, dirty, conf);
return cgindex.status;
}
@@ -715,10 +742,12 @@
* @return A list of range-based splits, whose size may be less than or
* equal to n.
*/
- public List<CGRangeSplit> rangeSplit(int n) {
+ public List<CGRangeSplit> rangeSplit(int n) throws IOException {
// The output of this method must be only dependent on the cgindex and
// input parameter n - so that horizontally stitched column groups will
// get aligned splits.
+ if (cgindex == null)
+ cgindex = buildIndex(fs, path, dirty, conf);
int numFiles = cgindex.size();
if ((numFiles < n) || (n < 0)) {
return rangeSplit(numFiles);
@@ -752,8 +781,10 @@
long start = starts[i];
long length = lengths[i];
Path path = paths[i];
- int idx = cgindex.getFileIndex(path);
- lst.add(new CGRowSplit(idx, start, length));
+ if (cgindex == null)
+ cgindex = buildIndex(fs, this.path, dirty, conf);
+ long size = cgindex.get(cgindex.getFileIndex(path)).bytes;
+ lst.add(new CGRowSplit(path.getName(), start, length, size));
}
return lst;
@@ -796,7 +827,7 @@
* compressor is inside cgschema
*/
reader = new TFile.Reader(ins, fs.getFileStatus(path).getLen(), conf);
- if (rowRange != null && rowRange.fileIndex >= 0) {
+ if (rowRange != null) {
scanner = reader.createScannerByRecordNum(rowRange.startRow,
rowRange.startRow + rowRange.numRows);
} else {
@@ -921,6 +952,8 @@
CGScanner(CGRangeSplit split, boolean closeReader) throws IOException,
ParseException {
+ if (cgindex== null)
+ cgindex = buildIndex(fs, path, dirty, conf);
if (split == null) {
beginIndex = 0;
endIndex = cgindex.size();
@@ -940,15 +973,9 @@
*/
CGScanner(CGRowSplit rowRange, boolean closeReader)
throws IOException, ParseException {
+
beginIndex = 0;
- endIndex = cgindex.size();
- if (rowRange != null && rowRange.fileIndex>= 0) {
- if (rowRange.fileIndex >= cgindex.size()) {
- throw new IllegalArgumentException("Part Index is out of range.");
- }
- beginIndex = rowRange.fileIndex;
- endIndex = beginIndex+1;
- }
+ endIndex = 1;
init(rowRange, null, null, closeReader);
}
@@ -981,8 +1008,15 @@
for (int i = beginIndex; i < endIndex; ++i) {
RawComparable begin = (i == beginIndex) ? beginKey : null;
RawComparable end = (i == endIndex - 1) ? endKey : null;
- TFileScanner scanner =
- new TFileScanner(fs, cgindex.getPath(i, path), rowRange,
+ TFileScanner scanner;
+ if (rowRange != null)
+ scanner =
+ new TFileScanner(fs, new Path(path, rowRange.name), rowRange,
+ begin, end,
+ cgschema, logicalSchema, conf);
+ else
+ scanner =
+ new TFileScanner(fs, cgindex.getPath(i, path), null,
begin, end,
cgschema, logicalSchema, conf);
// skip empty scanners.
@@ -1160,16 +1194,18 @@
}
public static class CGRowSplit implements Writable {
- int fileIndex = -1;
+ String name;
long startByte = -1;
long numBytes = -1;
long startRow = -1;
long numRows = -1;
+ long size = 0; // size of the file in the selected CG
- CGRowSplit(int fileIdx, long start, long len) {
- this.fileIndex = fileIdx;
+ CGRowSplit(String name, long start, long len, long size) {
+ this.name = name;
this.startByte = start;
this.numBytes = len;
+ this.size = size;
}
public CGRowSplit() {
@@ -1179,31 +1215,34 @@
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
- sb.append("{fileIndex = " + fileIndex + "}\n");
+ sb.append("{name = " + name + "}\n");
sb.append("{startByte = " + startByte + "}\n");
sb.append("{numBytes = " + numBytes + "}\n");
sb.append("{startRow = " + startRow + "}\n");
sb.append("{numRows = " + numRows + "}\n");
+ sb.append("{size = " + size + "}\n");
return sb.toString();
}
@Override
public void readFields(DataInput in) throws IOException {
- fileIndex = Utils.readVInt(in);
+ name = Utils.readString(in);
startByte = Utils.readVLong(in);
numBytes = Utils.readVLong(in);
startRow = Utils.readVLong(in);
numRows = Utils.readVLong(in);
+ size = Utils.readVLong(in);
}
@Override
public void write(DataOutput out) throws IOException {
- Utils.writeVInt(out, fileIndex);
+ Utils.writeString(out, name);
Utils.writeVLong(out, startByte);
Utils.writeVLong(out, numBytes);
Utils.writeVLong(out, startRow);
Utils.writeVLong(out, numRows);
+ Utils.writeVLong(out, size);
}
}
@@ -1444,23 +1483,13 @@
private void createIndex() throws IOException {
MetaFile.Writer metaFile =
MetaFile.createWriter(makeMetaFilePath(path), conf);
- if (cgschema.isSorted()) {
- CGIndex index = buildIndex(fs, path, false, conf);
- DataOutputStream dos = metaFile.createMetaBlock(BLOCK_NAME_INDEX);
- try {
- index.write(dos);
- }
- finally {
- dos.close();
- }
- } else { /* Create an empty data meta file for unsorted table. */
- DataOutputStream dos = metaFile.createMetaBlock(BLOCK_NAME_INDEX);
- try {
- Utils.writeString(dos, "");
- }
- finally {
- dos.close();
- }
+ CGIndex index = buildIndex(fs, path, false, conf);
+ DataOutputStream dos = metaFile.createMetaBlock(BLOCK_NAME_INDEX);
+ try {
+ index.write(dos);
+ }
+ finally {
+ dos.close();
}
metaFile.close();
}
@@ -1689,7 +1718,7 @@
static class CGIndexEntry implements RawComparable, Writable {
int index;
String name;
- long rows;
+ long rows, bytes;
RawComparable firstKey;
RawComparable lastKey;
@@ -1890,6 +1919,7 @@
status.rows += rows;
index.add(range);
sorted = false;
+ range.bytes = bytes;
}
// building dirty index
@@ -1901,6 +1931,7 @@
next.name = name;
index.add(next);
sorted = false;
+ next.bytes = bytes;
}
int lowerBound(RawComparable key, final Comparator<RawComparable> comparator)
@@ -1935,6 +1966,7 @@
for (int i = 0; i < n; ++i) {
CGIndexEntry range = new CGIndexEntry();
range.readFields(in);
+ range.setIndex(i);
index.add(range);
}
status.readFields(in);
@@ -2035,6 +2067,8 @@
out.printf("%s : %s\n", e.getKey(), e.getValue());
}
out.println("TFiles within the Column Group :");
+ if (reader.cgindex == null)
+ reader.cgindex = buildIndex(reader.fs, reader.path, reader.dirty, conf);
for (CGIndexEntry entry : reader.cgindex.index) {
IOutils.indent(out, indent);
out.printf(" *Name : %s\n", entry.name);
Modified: hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/BasicTableExpr.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/BasicTableExpr.java?rev=905856&r1=905855&r2=905856&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/BasicTableExpr.java (original)
+++ hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/BasicTableExpr.java Wed Feb 3 01:08:15 2010
@@ -106,7 +106,8 @@
@Override
public TableScanner getScanner(BytesWritable begin, BytesWritable end,
String projection, Configuration conf) throws IOException {
- BasicTable.Reader reader = new BasicTable.Reader(path, conf);
+ String[] deletedCGs = getDeletedCGs(conf);
+ BasicTable.Reader reader = new BasicTable.Reader(path, deletedCGs, conf);
try {
reader.setProjection(projection);
} catch (ParseException e) {
Modified: hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableExpr.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableExpr.java?rev=905856&r1=905855&r2=905856&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableExpr.java (original)
+++ hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableExpr.java Wed Feb 3 01:08:15 2010
@@ -27,7 +27,6 @@
import org.apache.hadoop.zebra.io.BasicTable;
import org.apache.hadoop.zebra.io.TableScanner;
import org.apache.hadoop.zebra.parser.ParseException;
-import org.apache.hadoop.zebra.types.Projection;
import org.apache.hadoop.zebra.schema.Schema;
/**
@@ -105,7 +104,6 @@
* @see Schema
* @return A TableScanner object.
*/
- @SuppressWarnings("unused")
public TableScanner getScanner(BytesWritable begin,
BytesWritable end, String projection, Configuration conf)
throws IOException {
@@ -127,7 +125,7 @@
public TableScanner getScanner(UnsortedTableSplit split, String projection,
Configuration conf) throws IOException, ParseException {
BasicTable.Reader reader =
- new BasicTable.Reader(new Path(split.getPath()), conf);
+ new BasicTable.Reader(new Path(split.getPath()), getDeletedCGs(conf), conf);
reader.setProjection(projection);
return reader.getScanner(split.getSplit(), true);
}
@@ -147,7 +145,7 @@
public TableScanner getScanner(RowTableSplit split, String projection,
Configuration conf) throws IOException, ParseException, ParseException {
BasicTable.Reader reader =
- new BasicTable.Reader(new Path(split.getPath()), conf);
+ new BasicTable.Reader(new Path(split.getPath()), getDeletedCGs(conf), conf);
reader.setProjection(projection);
return reader.getScanner(true, split.getSplit());
}
@@ -240,4 +238,31 @@
* dump table info with indent
*/
protected abstract void dumpInfo(PrintStream ps, Configuration conf, int indent) throws IOException;
+
+ /**
+ * get the deleted cg for tables in union
+ * @param conf The Configuration object
+ * @return
+ */
+ protected final String[] getDeletedCGsPerUnion(Configuration conf) {
+ return getDeletedCGs(conf, TableInputFormat.DELETED_CG_SEPARATOR_PER_UNION);
+ }
+
+ protected final String[] getDeletedCGs(Configuration conf) {
+ return getDeletedCGs(conf, BasicTable.DELETED_CG_SEPARATOR_PER_TABLE);
+ }
+
+ private final String[] getDeletedCGs(Configuration conf, String separator) {
+ String[] deletedCGs = null;
+ String fe;
+ if ((fe = conf.get(TableInputFormat.INPUT_FE)) != null && fe.equals("true"))
+ {
+ String original = conf.get(TableInputFormat.INPUT_DELETEED_CGS, null);
+ if (original == null)
+ deletedCGs = new String[0]; // empty array needed to indicate it is fe checked
+ else
+ deletedCGs = original.split(separator, -1);
+ }
+ return deletedCGs;
+ }
}
Modified: hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableInputFormat.java?rev=905856&r1=905855&r2=905856&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableInputFormat.java (original)
+++ hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableInputFormat.java Wed Feb 3 01:08:15 2010
@@ -140,9 +140,12 @@
public class TableInputFormat implements InputFormat<BytesWritable, Tuple> {
static Log LOG = LogFactory.getLog(TableInputFormat.class);
- private static final String INPUT_EXPR = "mapred.lib.table.input.expr";
- private static final String INPUT_PROJ = "mapred.lib.table.input.projection";
- private static final String INPUT_SORT = "mapred.lib.table.input.sort";
+ public static final String INPUT_EXPR = "mapred.lib.table.input.expr";
+ public static final String INPUT_PROJ = "mapred.lib.table.input.projection";
+ public static final String INPUT_SORT = "mapred.lib.table.input.sort";
+ public static final String INPUT_FE = "mapred.lib.table.input.fe";
+ public static final String INPUT_DELETEED_CGS = "mapred.lib.table.input.deleted_cgs";
+ static final String DELETED_CG_SEPARATOR_PER_UNION = ";";
/**
* Set the paths to the input table.
@@ -642,8 +645,7 @@
}
private static InputSplit[] getRowSplits(JobConf conf, int numSplits,
- TableExpr expr, List<BasicTable.Reader> readers,
- List<BasicTableStatus> status) throws IOException {
+ TableExpr expr, List<BasicTable.Reader> readers) throws IOException {
ArrayList<InputSplit> ret = new ArrayList<InputSplit>();
DummyFileInputFormat helper = new DummyFileInputFormat(getMinSplitSize(conf));
@@ -715,25 +717,40 @@
new ArrayList<BasicTableStatus>(nLeaves);
try {
+ StringBuilder sb = new StringBuilder();
+ boolean sorted = expr.sortedSplitRequired();
+ boolean first = true;
for (Iterator<LeafTableInfo> it = leaves.iterator(); it.hasNext();) {
LeafTableInfo leaf = it.next();
BasicTable.Reader reader =
new BasicTable.Reader(leaf.getPath(), conf);
reader.setProjection(leaf.getProjection());
- BasicTableStatus s = reader.getStatus();
+ if (sorted)
+ {
+ BasicTableStatus s = reader.getStatus();
+ status.add(s);
+ }
readers.add(reader);
- status.add(s);
+ if (first)
+ first = false;
+ else {
+ sb.append(TableInputFormat.DELETED_CG_SEPARATOR_PER_UNION);
+ }
+ sb.append(reader.getDeletedCGs());
}
+ conf.set(INPUT_FE, "true");
+ conf.set(INPUT_DELETEED_CGS, sb.toString());
+
if (readers.isEmpty()) {
return new InputSplit[0];
}
- if (expr.sortedSplitRequired()) {
+ if (sorted) {
return getSortedSplits(conf, numSplits, expr, readers, status);
}
- return getRowSplits(conf, numSplits, expr, readers, status);
+ return getRowSplits(conf, numSplits, expr, readers);
} catch (ParseException e) {
throw new IOException("Projection parsing failed : "+e.getMessage());
}
Modified: hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableUnionExpr.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableUnionExpr.java?rev=905856&r1=905855&r2=905856&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableUnionExpr.java (original)
+++ hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableUnionExpr.java Wed Feb 3 01:08:15 2010
@@ -105,14 +105,20 @@
throw new IllegalArgumentException("Union of 0 table");
}
ArrayList<BasicTable.Reader> readers = new ArrayList<BasicTable.Reader>(n);
- final ArrayList<BasicTableStatus> status =
- new ArrayList<BasicTableStatus>(n);
+ String[] deletedCGsInUnion = getDeletedCGsPerUnion(conf);
+
+ if (deletedCGsInUnion != null && deletedCGsInUnion.length != n)
+ throw new IllegalArgumentException("Invalid string of deleted column group names: expected = "+
+ n + " actual =" + deletedCGsInUnion.length);
+
for (int i = 0; i < n; ++i) {
+ String deletedCGs = (deletedCGsInUnion == null ? null : deletedCGsInUnion[i]);
+ String[] deletedCGList = (deletedCGs == null ? null :
+ deletedCGs.split(BasicTable.DELETED_CG_SEPARATOR_PER_TABLE));
BasicTableExpr expr = (BasicTableExpr) composite.get(i);
BasicTable.Reader reader =
- new BasicTable.Reader(expr.getPath(), conf);
+ new BasicTable.Reader(expr.getPath(), deletedCGList, conf);
readers.add(reader);
- status.add(reader.getStatus());
}
String actualProjection = projection;
Modified: hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableLoader.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableLoader.java?rev=905856&r1=905855&r2=905856&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableLoader.java (original)
+++ hadoop/pig/branches/branch-0.6/contrib/zebra/src/java/org/apache/hadoop/zebra/pig/TableLoader.java Wed Feb 3 01:08:15 2010
@@ -430,6 +430,9 @@
private TreeMap<String, String> configMap;
private InputSplit split;
+ transient private final String[] zebraConfs = {TableInputFormat.INPUT_EXPR,
+ TableInputFormat.INPUT_PROJ, TableInputFormat.INPUT_SORT,
+ TableInputFormat.INPUT_DELETEED_CGS, TableInputFormat.INPUT_FE, "mapred.input.dir"};
transient private JobConf conf;
transient private int numProjCols = 0;
transient private RecordReader<BytesWritable, Tuple> scanner;
@@ -437,16 +440,16 @@
transient private boolean sorted = false;
TableSlice(JobConf conf, InputSplit split, boolean sorted) {
- // hack: expecting JobConf contains nothing but a <string, string>
- // key-value pair store.
configMap = new TreeMap<String, String>();
- for (Iterator<Map.Entry<String, String>> it = conf.iterator(); it.hasNext();) {
- Map.Entry<String, String> e = it.next();
- configMap.put(e.getKey(), e.getValue());
- }
-
-
-
+ String value;
+
+ for (String zebraConf : zebraConfs)
+ {
+ value = conf.get(zebraConf);
+ if (value != null)
+ configMap.put(zebraConf, value);
+ }
+
this.split = split;
this.sorted = sorted;
}
@@ -500,14 +503,14 @@
@Override
public void init(DataStorage store) throws IOException {
- Configuration localConf = new Configuration();
+ Configuration localConf = ConfigurationUtil.toConfiguration(store.getConfiguration());
for (Iterator<Map.Entry<String, String>> it =
configMap.entrySet().iterator(); it.hasNext();) {
Map.Entry<String, String> e = it.next();
localConf.set(e.getKey(), e.getValue());
}
conf = new JobConf(localConf);
- String projection;
+ String projection;
try
{
projection = TableInputFormat.getProjection(conf);
@@ -516,8 +519,8 @@
}
numProjCols = Projection.getNumColumns(projection);
TableInputFormat inputFormat = new TableInputFormat();
- if (sorted)
- TableInputFormat.requireSortedTable(conf, null);
+ if (sorted)
+ TableInputFormat.requireSortedTable(conf, null);
scanner = inputFormat.getRecordReader(split, conf, Reporter.NULL);
key = new BytesWritable();
}