You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ya...@apache.org on 2010/02/18 18:01:31 UTC
svn commit: r911493 - in
/hadoop/pig/branches/load-store-redesign/contrib/zebra: ./
src/java/org/apache/hadoop/zebra/io/ src/java/org/apache/hadoop/zebra/mapred/
src/java/org/apache/hadoop/zebra/mapreduce/
src/test/org/apache/hadoop/zebra/io/ src/test/...
Author: yanz
Date: Thu Feb 18 17:01:30 2010
New Revision: 911493
URL: http://svn.apache.org/viewvc?rev=911493&view=rev
Log:
PIG-1227: Throw exception if column group meta file is missing for an unsorted table (yanz) ; PIG-1201: unnecessary name node calls by each mapper; too big input split serialization size by Pig's Slice implementation (yanz)
Modified:
hadoop/pig/branches/load-store-redesign/contrib/zebra/CHANGES.txt
hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/io/BasicTable.java
hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/io/ColumnGroup.java
hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/BasicTableExpr.java
hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableExpr.java
hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableInputFormat.java
hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableUnionExpr.java
hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/BasicTableExpr.java
hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableExpr.java
hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableInputFormat.java
hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableUnionExpr.java
hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestBasicTable.java
hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroup.java
hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupReaders.java
hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupWithWorkPath.java
hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestBasicTableIOFormatLocalFS.java
hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestBasicTableUnionLoader.java
hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestBasicUnion.java
hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestCollectionTableLoader.java
hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestGlobTableLoader.java
hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestMapTableLoader.java
hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestTableLoader.java
hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestTableLoaderPrune.java
hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/types/TestStorageGrammar.java
Modified: hadoop/pig/branches/load-store-redesign/contrib/zebra/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/CHANGES.txt?rev=911493&r1=911492&r2=911493&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/CHANGES.txt (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/CHANGES.txt Thu Feb 18 17:01:30 2010
@@ -58,6 +58,10 @@
BUG FIXES
+ PIG-1227: Throw exception if column group meta file is missing for an unsorted table (yanz)
+
+ PIG-1201: unnecessary name node calls by each mapper; too big input split serialization size by Pig's Slice implementation (yanz)
+
PIG-1115: cleanup of temp files left by failed tasks (gauravj via yanz)
PIG-1167: Hadoop file glob support (yanz)
Modified: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/io/BasicTable.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/io/BasicTable.java?rev=911493&r1=911492&r2=911493&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/io/BasicTable.java (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/io/BasicTable.java Thu Feb 18 17:01:30 2010
@@ -92,6 +92,8 @@
private final static String DELETED_CG_PREFIX = ".deleted-";
+ public final static String DELETED_CG_SEPARATOR_PER_TABLE = ",";
+
// no public ctor for instantiating a BasicTable object
private BasicTable() {
// no-op
@@ -139,7 +141,7 @@
/* Retry up to numCGs times accounting for other CG deleting threads or processes.*/
while (triedCount ++ < numCGs) {
try {
- schemaFile = new SchemaFile(path, conf);
+ schemaFile = new SchemaFile(path, null, conf);
break;
} catch (FileNotFoundException e) {
LOG.info("Try " + triedCount + " times : " + e.getMessage());
@@ -195,7 +197,7 @@
} catch (IOException e) {
// one remote possibility is that another user
// already deleted CG.
- SchemaFile tempSchema = new SchemaFile(path, conf);
+ SchemaFile tempSchema = new SchemaFile(path, null, conf);
if (tempSchema.isCGDeleted(cgIdx)) {
LOG.info(path + " : " + cgName +
" is deleted by someone else. That is ok.");
@@ -278,10 +280,15 @@
* Optional configuration parameters.
* @throws IOException
*/
+
public Reader(Path path, Configuration conf) throws IOException {
+ this(path, null, conf);
+ }
+ public Reader(Path path, String[] deletedCGs, Configuration conf) throws IOException {
try {
+ boolean mapper = (deletedCGs != null);
this.path = path;
- schemaFile = new SchemaFile(path, conf);
+ schemaFile = new SchemaFile(path, deletedCGs, conf);
metaReader = MetaFile.createReader(new Path(path, BT_META_FILE), conf);
// create column group readers
int numCGs = schemaFile.getNumOfPhysicalSchemas();
@@ -298,7 +305,7 @@
if (!schemaFile.isCGDeleted(nx)) {
colGroups[nx] =
new ColumnGroup.Reader(new Path(path, partition.getCGSchema(nx).getName()),
- conf);
+ conf, mapper);
if (firstValidCG < 0) {
firstValidCG = nx;
}
@@ -308,7 +315,8 @@
else
cgTuples[nx] = null;
}
- buildStatus();
+ if (schemaFile.isSorted())
+ buildStatus();
closed = false;
}
catch (Exception e) {
@@ -407,7 +415,9 @@
/**
* Get the status of the BasicTable.
*/
- public BasicTableStatus getStatus() {
+ public BasicTableStatus getStatus() throws IOException {
+ if (status == null)
+ buildStatus();
return status;
}
@@ -562,13 +572,16 @@
*
* @param path
* The path to the BasicTable.
+ * @deletedCGs
+ * The deleted column groups from front end; null if unavailable from front end
* @param conf
* @return The logical Schema of the table (all columns).
* @throws IOException
*/
public static Schema getSchema(Path path, Configuration conf)
throws IOException {
- SchemaFile schF = new SchemaFile(path, conf);
+ // fake an empty deleted cg list as getSchema does not care about deleted cgs
+ SchemaFile schF = new SchemaFile(path, new String[0], conf);
return schF.getLogical();
}
@@ -650,7 +663,7 @@
* Get index of the column group that will be used for row-based split.
*
*/
- public int getRowSplitCGIndex() {
+ public int getRowSplitCGIndex() throws IOException {
// Try to find the largest non-deleted and used column group by projection;
int largestCGIndex = -1;
int splitCGIndex = -1;
@@ -722,8 +735,12 @@
String getStorageString() {
return schemaFile.getStorageString();
}
+
+ public String getDeletedCGs() {
+ return schemaFile.getDeletedCGs();
+ }
- private void buildStatus() {
+ private void buildStatus() throws IOException {
status = new BasicTableStatus();
if (firstValidCG >= 0) {
status.beginKey = colGroups[firstValidCG].getStatus().getBeginKey();
@@ -911,11 +928,12 @@
int cgIdx = rowSplit.getCGIndex();
CGRowSplit cgSplit = new CGRowSplit();
- cgSplit.fileIndex = inputCGSplit.fileIndex;
+ cgSplit.name = inputCGSplit.name;
// startByte and numBytes from inputCGSplit are ignored, since
// they make sense for only one CG.
cgSplit.startRow = inputCGSplit.startRow;
cgSplit.numRows = inputCGSplit.numRows;
+ cgSplit.size = inputCGSplit.size;
if (cgSplit.startRow >= 0) {
//assume the rows are already set up.
@@ -1345,7 +1363,8 @@
try {
actualOutputPath = path;
writerConf = conf;
- schemaFile = new SchemaFile(path, conf);
+ // fake an empty deleted cg list as no cg should have been deleted now
+ schemaFile = new SchemaFile(path, new String[0], conf);
int numCGs = schemaFile.getNumOfPhysicalSchemas();
partition = schemaFile.getPartition();
sorted = schemaFile.isSorted();
@@ -1677,8 +1696,8 @@
boolean[] cgDeletedFlags;
// ctor for reading
- public SchemaFile(Path path, Configuration conf) throws IOException {
- readSchemaFile(path, conf);
+ public SchemaFile(Path path, String[] deletedCGs, Configuration conf) throws IOException {
+ readSchemaFile(path, deletedCGs, conf);
}
public Schema[] getPhysicalSchema() {
@@ -1825,7 +1844,7 @@
outSchema.close();
}
- private void readSchemaFile(Path path, Configuration conf)
+ private void readSchemaFile(Path path, String[] deletedCGs, Configuration conf)
throws IOException {
Path pathSchema = makeSchemaFilePath(path);
if (!path.getFileSystem(conf).exists(pathSchema)) {
@@ -1872,7 +1891,18 @@
throw new IOException("parser.RecordSchema failed :" + e.getMessage());
}
sorted = WritableUtils.readVInt(in) == 1 ? true : false;
- setCGDeletedFlags(path, conf);
+ if (deletedCGs == null)
+ setCGDeletedFlags(path, conf);
+ else {
+ for (String deletedCG : deletedCGs)
+ {
+ for (int i = 0; i < cgschemas.length; i++)
+ {
+ if (cgschemas[i].getName().equals(deletedCG))
+ cgDeletedFlags[i] = true;
+ }
+ }
+ }
if (version.compareTo(new Version((short)1, (short)0)) > 0)
{
int numSortColumns = WritableUtils.readVInt(in);
@@ -1942,7 +1972,23 @@
}
}
-
+ String getDeletedCGs() {
+ StringBuilder sb = new StringBuilder();
+ // comma separated
+ boolean first = true;
+ for (int i = 0; i < physical.length; i++) {
+ if (cgDeletedFlags[i])
+ {
+ if (first)
+ first = false;
+ else {
+ sb.append(DELETED_CG_SEPARATOR_PER_TABLE);
+ }
+ sb.append(getName(i));
+ }
+ }
+ return sb.toString();
+ }
}
static public void dumpInfo(String file, PrintStream out, Configuration conf)
Modified: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/io/ColumnGroup.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/io/ColumnGroup.java?rev=911493&r1=911492&r2=911493&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/io/ColumnGroup.java (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/io/ColumnGroup.java Thu Feb 18 17:01:30 2010
@@ -225,6 +225,7 @@
SplitColumn top; // directly associated with logical schema
SplitColumn leaf; // corresponding to projection
boolean closed;
+ boolean dirty;
/**
* Get the Column Group physical schema without loading the full CG index.
@@ -255,13 +256,24 @@
*/
public Reader(Path path, Configuration conf) throws IOException,
ParseException {
- this(path, true, conf);
+ this(path, conf, false);
}
-
+
+ public Reader(Path path, Configuration conf, boolean mapper) throws IOException,
+ ParseException {
+ this(path, true, conf, mapper);
+ }
+
Reader(Path path, boolean dirty, Configuration conf) throws IOException,
ParseException {
+ this(path, dirty, conf, false);
+ }
+
+ Reader(Path path, boolean dirty, Configuration conf, boolean mapper) throws IOException,
+ ParseException {
this.path = path;
this.conf = conf;
+ this.dirty = dirty;
fs = path.getFileSystem(conf);
// check existence of path
@@ -269,7 +281,7 @@
throw new IOException("Path doesn't exist: " + path);
}
- if (!fs.getFileStatus(path).isDir()) {
+ if (!mapper && !fs.getFileStatus(path).isDir()) {
throw new IOException("Path exists but not a directory: " + path);
}
@@ -279,24 +291,21 @@
}
projection = new Projection(cgschema.getSchema()); // default projection to CG schema.
Path metaFilePath = makeMetaFilePath(path);
- /* If index file is not existing or loading from an unsorted table. */
- if (!fs.exists(metaFilePath) || !cgschema.isSorted() ) {
- // special case for unsorted CG that did not create index properly.
- if (cgschema.isSorted()) {
- throw new FileNotFoundException(
- "Missing Meta File for sorted Column Group");
- }
- cgindex = buildIndex(fs, path, dirty, conf);
+ /* If index file is not existing */
+ if (!fs.exists(metaFilePath)) {
+ throw new FileNotFoundException(
+ "Missing Meta File of " + metaFilePath);
}
- else {
+ else if (cgschema.isSorted()) {
MetaFile.Reader metaFile = MetaFile.createReader(metaFilePath, conf);
try {
cgindex = new CGIndex();
DataInputStream dis = metaFile.getMetaBlock(BLOCK_NAME_INDEX);
try {
cgindex.readFields(dis);
- }
- finally {
+ } catch (IOException e) {
+ throw new IOException("Index file read failure :"+ e.getMessage());
+ } finally {
dis.close();
}
}
@@ -429,6 +438,8 @@
}
if (split == null) {
+ if (cgindex == null)
+ cgindex = buildIndex(fs, path, dirty, conf);
return getScanner(new CGRangeSplit(0, cgindex.size()), closeReader);
}
if (split.len < 0) {
@@ -474,6 +485,8 @@
return getBlockDistribution(new CGRangeSplit(0, cgindex.size()));
}
+ if (cgindex == null)
+ cgindex = buildIndex(fs, path, dirty, conf);
if ((split.start | split.len | (cgindex.size() - split.start - split.len)) < 0) {
throw new IndexOutOfBoundsException("Bad split");
}
@@ -509,10 +522,9 @@
}
BlockDistribution ret = new BlockDistribution();
- if (split.fileIndex >= 0)
+ if (split.name != null)
{
- CGIndexEntry entry = cgindex.get(split.fileIndex);
- FileStatus tfileStatus = fs.getFileStatus(new Path(path, entry.getName()));
+ FileStatus tfileStatus = fs.getFileStatus(new Path(path, split.name));
BlockLocation[] locations = fs.getFileBlockLocations(tfileStatus, split.startByte, split.numBytes);
for (BlockLocation l : locations) {
@@ -532,17 +544,26 @@
void fillRowSplit(CGRowSplit rowSplit, long startOffset, long length)
throws IOException {
- if (rowSplit.fileIndex < 0)
+ if (rowSplit.name == null)
return;
- Path tfPath = new Path(path, cgindex.get(rowSplit.fileIndex).getName());
- FileStatus tfile = fs.getFileStatus(tfPath);
+ Path tfPath = new Path(path, rowSplit.name);
+ long size = rowSplit.size;
+ if (size == 0)
+ {
+ /* the on disk table is sorted. Later this will be made unnecessary when
+ * CGIndexEntry serializes its bytes field and the meta file versioning is
+ * supported.
+ */
+ FileStatus tfile = fs.getFileStatus(tfPath);
+ size = tfile.getLen();
+ }
TFile.Reader reader = null;
try {
reader = new TFile.Reader(fs.open(tfPath),
- tfile.getLen(), conf);
+ size, conf);
long startRow = reader.getRecordNumNear(startOffset);
long endRow = reader.getRecordNumNear(startOffset + length);
@@ -703,7 +724,9 @@
/**
* Get the status of the ColumnGroup.
*/
- public BasicTableStatus getStatus() {
+ public BasicTableStatus getStatus() throws IOException {
+ if (cgindex == null)
+ cgindex = buildIndex(fs, path, dirty, conf);
return cgindex.status;
}
@@ -715,10 +738,12 @@
* @return A list of range-based splits, whose size may be less than or
* equal to n.
*/
- public List<CGRangeSplit> rangeSplit(int n) {
+ public List<CGRangeSplit> rangeSplit(int n) throws IOException {
// The output of this method must be only dependent on the cgindex and
// input parameter n - so that horizontally stitched column groups will
// get aligned splits.
+ if (cgindex == null)
+ cgindex = buildIndex(fs, path, dirty, conf);
int numFiles = cgindex.size();
if ((numFiles < n) || (n < 0)) {
return rangeSplit(numFiles);
@@ -752,8 +777,10 @@
long start = starts[i];
long length = lengths[i];
Path path = paths[i];
- int idx = cgindex.getFileIndex(path);
- lst.add(new CGRowSplit(idx, start, length));
+ if (cgindex == null)
+ cgindex = buildIndex(fs, this.path, dirty, conf);
+ long size = cgindex.get(cgindex.getFileIndex(path)).bytes;
+ lst.add(new CGRowSplit(path.getName(), start, length, size));
}
return lst;
@@ -796,7 +823,7 @@
* compressor is inside cgschema
*/
reader = new TFile.Reader(ins, fs.getFileStatus(path).getLen(), conf);
- if (rowRange != null && rowRange.fileIndex >= 0) {
+ if (rowRange != null) {
scanner = reader.createScannerByRecordNum(rowRange.startRow,
rowRange.startRow + rowRange.numRows);
} else {
@@ -922,6 +949,8 @@
CGScanner(CGRangeSplit split, boolean closeReader) throws IOException,
ParseException {
+ if (cgindex== null)
+ cgindex = buildIndex(fs, path, dirty, conf);
if (split == null) {
beginIndex = 0;
endIndex = cgindex.size();
@@ -941,15 +970,9 @@
*/
CGScanner(CGRowSplit rowRange, boolean closeReader)
throws IOException, ParseException {
+
beginIndex = 0;
- endIndex = cgindex.size();
- if (rowRange != null && rowRange.fileIndex>= 0) {
- if (rowRange.fileIndex >= cgindex.size()) {
- throw new IllegalArgumentException("Part Index is out of range.");
- }
- beginIndex = rowRange.fileIndex;
- endIndex = beginIndex+1;
- }
+ endIndex = 1;
init(rowRange, null, null, closeReader);
}
@@ -982,8 +1005,15 @@
for (int i = beginIndex; i < endIndex; ++i) {
RawComparable begin = (i == beginIndex) ? beginKey : null;
RawComparable end = (i == endIndex - 1) ? endKey : null;
- TFileScanner scanner =
- new TFileScanner(fs, cgindex.getPath(i, path), rowRange,
+ TFileScanner scanner;
+ if (rowRange != null)
+ scanner =
+ new TFileScanner(fs, new Path(path, rowRange.name), rowRange,
+ begin, end,
+ cgschema, logicalSchema, conf);
+ else
+ scanner =
+ new TFileScanner(fs, cgindex.getPath(i, path), null,
begin, end,
cgschema, logicalSchema, conf);
// skip empty scanners.
@@ -1161,16 +1191,18 @@
}
public static class CGRowSplit implements Writable {
- int fileIndex = -1;
+ String name;
long startByte = -1;
long numBytes = -1;
long startRow = -1;
long numRows = -1;
+ long size = 0; // size of the file in the selected CG
- CGRowSplit(int fileIdx, long start, long len) {
- this.fileIndex = fileIdx;
+ CGRowSplit(String name, long start, long len, long size) {
+ this.name = name;
this.startByte = start;
this.numBytes = len;
+ this.size = size;
}
public CGRowSplit() {
@@ -1180,31 +1212,34 @@
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
- sb.append("{fileIndex = " + fileIndex + "}\n");
+ sb.append("{name = " + name + "}\n");
sb.append("{startByte = " + startByte + "}\n");
sb.append("{numBytes = " + numBytes + "}\n");
sb.append("{startRow = " + startRow + "}\n");
sb.append("{numRows = " + numRows + "}\n");
+ sb.append("{size = " + size + "}\n");
return sb.toString();
}
@Override
public void readFields(DataInput in) throws IOException {
- fileIndex = Utils.readVInt(in);
+ name = Utils.readString(in);
startByte = Utils.readVLong(in);
numBytes = Utils.readVLong(in);
startRow = Utils.readVLong(in);
numRows = Utils.readVLong(in);
+ size = Utils.readVLong(in);
}
@Override
public void write(DataOutput out) throws IOException {
- Utils.writeVInt(out, fileIndex);
+ Utils.writeString(out, name);
Utils.writeVLong(out, startByte);
Utils.writeVLong(out, numBytes);
Utils.writeVLong(out, startRow);
Utils.writeVLong(out, numRows);
+ Utils.writeVLong(out, size);
}
}
@@ -1466,24 +1501,14 @@
private void createIndex() throws IOException {
MetaFile.Writer metaFile =
- MetaFile.createWriter(makeMetaFilePath(finalOutputPath), conf);
- if (cgschema.isSorted()) {
- CGIndex index = buildIndex(fs, finalOutputPath, false, conf);
- DataOutputStream dos = metaFile.createMetaBlock(BLOCK_NAME_INDEX);
- try {
- index.write(dos);
- }
- finally {
- dos.close();
- }
- } else { /* Create an empty data meta file for unsorted table. */
- DataOutputStream dos = metaFile.createMetaBlock(BLOCK_NAME_INDEX);
- try {
- Utils.writeString(dos, "");
- }
- finally {
- dos.close();
- }
+ MetaFile.createWriter(makeMetaFilePath(finalOutputPath), conf);
+ CGIndex index = buildIndex(fs, finalOutputPath, false, conf);
+ DataOutputStream dos = metaFile.createMetaBlock(BLOCK_NAME_INDEX);
+ try {
+ index.write(dos);
+ }
+ finally {
+ dos.close();
}
metaFile.close();
}
@@ -1714,7 +1739,7 @@
static class CGIndexEntry implements RawComparable, Writable {
int index;
String name;
- long rows;
+ long rows, bytes;
RawComparable firstKey;
RawComparable lastKey;
@@ -1915,6 +1940,7 @@
status.rows += rows;
index.add(range);
sorted = false;
+ range.bytes = bytes;
}
// building dirty index
@@ -1926,6 +1952,7 @@
next.name = name;
index.add(next);
sorted = false;
+ next.bytes = bytes;
}
int lowerBound(RawComparable key, final Comparator<RawComparable> comparator)
@@ -1960,6 +1987,7 @@
for (int i = 0; i < n; ++i) {
CGIndexEntry range = new CGIndexEntry();
range.readFields(in);
+ range.setIndex(i);
index.add(range);
}
status.readFields(in);
@@ -2060,6 +2088,8 @@
out.printf("%s : %s\n", e.getKey(), e.getValue());
}
out.println("TFiles within the Column Group :");
+ if (reader.cgindex == null)
+ reader.cgindex = buildIndex(reader.fs, reader.path, reader.dirty, conf);
for (CGIndexEntry entry : reader.cgindex.index) {
IOutils.indent(out, indent);
out.printf(" *Name : %s\n", entry.name);
Modified: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/BasicTableExpr.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/BasicTableExpr.java?rev=911493&r1=911492&r2=911493&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/BasicTableExpr.java (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/BasicTableExpr.java Thu Feb 18 17:01:30 2010
@@ -106,7 +106,8 @@
@Override
public TableScanner getScanner(BytesWritable begin, BytesWritable end,
String projection, Configuration conf) throws IOException {
- BasicTable.Reader reader = new BasicTable.Reader(path, conf);
+ String[] deletedCGs = getDeletedCGs(conf);
+ BasicTable.Reader reader = new BasicTable.Reader(path, deletedCGs, conf);
try {
reader.setProjection(projection);
} catch (ParseException e) {
Modified: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableExpr.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableExpr.java?rev=911493&r1=911492&r2=911493&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableExpr.java (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableExpr.java Thu Feb 18 17:01:30 2010
@@ -27,7 +27,6 @@
import org.apache.hadoop.zebra.io.BasicTable;
import org.apache.hadoop.zebra.io.TableScanner;
import org.apache.hadoop.zebra.parser.ParseException;
-import org.apache.hadoop.zebra.types.Projection;
import org.apache.hadoop.zebra.schema.Schema;
/**
@@ -105,7 +104,6 @@
* @see Schema
* @return A TableScanner object.
*/
- @SuppressWarnings("unused")
public TableScanner getScanner(BytesWritable begin,
BytesWritable end, String projection, Configuration conf)
throws IOException {
@@ -127,7 +125,7 @@
public TableScanner getScanner(UnsortedTableSplit split, String projection,
Configuration conf) throws IOException, ParseException {
BasicTable.Reader reader =
- new BasicTable.Reader(new Path(split.getPath()), conf);
+ new BasicTable.Reader(new Path(split.getPath()), getDeletedCGs(conf), conf);
reader.setProjection(projection);
return reader.getScanner(split.getSplit(), true);
}
@@ -147,7 +145,7 @@
public TableScanner getScanner(RowTableSplit split, String projection,
Configuration conf) throws IOException, ParseException, ParseException {
BasicTable.Reader reader =
- new BasicTable.Reader(new Path(split.getPath()), conf);
+ new BasicTable.Reader(new Path(split.getPath()), getDeletedCGs(conf), conf);
reader.setProjection(projection);
return reader.getScanner(true, split.getSplit());
}
@@ -240,4 +238,31 @@
* dump table info with indent
*/
protected abstract void dumpInfo(PrintStream ps, Configuration conf, int indent) throws IOException;
+
+ /**
+ * get the deleted cg for tables in union
+ * @param conf The Configuration object
+ * @return
+ */
+ protected final String[] getDeletedCGsPerUnion(Configuration conf) {
+ return getDeletedCGs(conf, TableInputFormat.DELETED_CG_SEPARATOR_PER_UNION);
+ }
+
+ protected final String[] getDeletedCGs(Configuration conf) {
+ return getDeletedCGs(conf, BasicTable.DELETED_CG_SEPARATOR_PER_TABLE);
+ }
+
+ private final String[] getDeletedCGs(Configuration conf, String separator) {
+ String[] deletedCGs = null;
+ String fe;
+ if ((fe = conf.get(TableInputFormat.INPUT_FE)) != null && fe.equals("true"))
+ {
+ String original = conf.get(TableInputFormat.INPUT_DELETED_CGS, null);
+ if (original == null)
+ deletedCGs = new String[0]; // empty array needed to indicate it is fe checked
+ else
+ deletedCGs = original.split(separator, -1);
+ }
+ return deletedCGs;
+ }
}
Modified: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableInputFormat.java?rev=911493&r1=911492&r2=911493&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableInputFormat.java (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableInputFormat.java Thu Feb 18 17:01:30 2010
@@ -143,9 +143,12 @@
public class TableInputFormat implements InputFormat<BytesWritable, Tuple> {
static Log LOG = LogFactory.getLog(TableInputFormat.class);
- private static final String INPUT_EXPR = "mapred.lib.table.input.expr";
- private static final String INPUT_PROJ = "mapred.lib.table.input.projection";
- private static final String INPUT_SORT = "mapred.lib.table.input.sort";
+ public static final String INPUT_EXPR = "mapred.lib.table.input.expr";
+ public static final String INPUT_PROJ = "mapred.lib.table.input.projection";
+ public static final String INPUT_SORT = "mapred.lib.table.input.sort";
+ public static final String INPUT_FE = "mapred.lib.table.input.fe";
+ public static final String INPUT_DELETED_CGS = "mapred.lib.table.input.deleted_cgs";
+ static final String DELETED_CG_SEPARATOR_PER_UNION = ";";
/**
* Set the paths to the input table.
@@ -645,8 +648,7 @@
}
private static InputSplit[] getRowSplits(JobConf conf, int numSplits,
- TableExpr expr, List<BasicTable.Reader> readers,
- List<BasicTableStatus> status) throws IOException {
+ TableExpr expr, List<BasicTable.Reader> readers) throws IOException {
ArrayList<InputSplit> ret = new ArrayList<InputSplit>();
DummyFileInputFormat helper = new DummyFileInputFormat(getMinSplitSize(conf));
@@ -718,25 +720,40 @@
new ArrayList<BasicTableStatus>(nLeaves);
try {
+ StringBuilder sb = new StringBuilder();
+ boolean sorted = expr.sortedSplitRequired();
+ boolean first = true;
for (Iterator<LeafTableInfo> it = leaves.iterator(); it.hasNext();) {
LeafTableInfo leaf = it.next();
BasicTable.Reader reader =
new BasicTable.Reader(leaf.getPath(), conf);
reader.setProjection(leaf.getProjection());
- BasicTableStatus s = reader.getStatus();
+ if (sorted)
+ {
+ BasicTableStatus s = reader.getStatus();
+ status.add(s);
+ }
readers.add(reader);
- status.add(s);
+ if (first)
+ first = false;
+ else {
+ sb.append(TableInputFormat.DELETED_CG_SEPARATOR_PER_UNION);
+ }
+ sb.append(reader.getDeletedCGs());
}
+ conf.set(INPUT_FE, "true");
+ conf.set(INPUT_DELETED_CGS, sb.toString());
+
if (readers.isEmpty()) {
return new InputSplit[0];
}
- if (expr.sortedSplitRequired()) {
+ if (sorted) {
return getSortedSplits(conf, numSplits, expr, readers, status);
}
- return getRowSplits(conf, numSplits, expr, readers, status);
+ return getRowSplits(conf, numSplits, expr, readers);
} catch (ParseException e) {
throw new IOException("Projection parsing failed : "+e.getMessage());
}
Modified: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableUnionExpr.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableUnionExpr.java?rev=911493&r1=911492&r2=911493&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableUnionExpr.java (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapred/TableUnionExpr.java Thu Feb 18 17:01:30 2010
@@ -105,14 +105,20 @@
throw new IllegalArgumentException("Union of 0 table");
}
ArrayList<BasicTable.Reader> readers = new ArrayList<BasicTable.Reader>(n);
- final ArrayList<BasicTableStatus> status =
- new ArrayList<BasicTableStatus>(n);
+ String[] deletedCGsInUnion = getDeletedCGsPerUnion(conf);
+
+ if (deletedCGsInUnion != null && deletedCGsInUnion.length != n)
+ throw new IllegalArgumentException("Invalid string of deleted column group names: expected = "+
+ n + " actual =" + deletedCGsInUnion.length);
+
for (int i = 0; i < n; ++i) {
+ String deletedCGs = (deletedCGsInUnion == null ? null : deletedCGsInUnion[i]);
+ String[] deletedCGList = (deletedCGs == null ? null :
+ deletedCGs.split(BasicTable.DELETED_CG_SEPARATOR_PER_TABLE));
BasicTableExpr expr = (BasicTableExpr) composite.get(i);
BasicTable.Reader reader =
- new BasicTable.Reader(expr.getPath(), conf);
+ new BasicTable.Reader(expr.getPath(), deletedCGList, conf);
readers.add(reader);
- status.add(reader.getStatus());
}
String actualProjection = projection;
Modified: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/BasicTableExpr.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/BasicTableExpr.java?rev=911493&r1=911492&r2=911493&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/BasicTableExpr.java (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/BasicTableExpr.java Thu Feb 18 17:01:30 2010
@@ -106,7 +106,8 @@
@Override
public TableScanner getScanner(BytesWritable begin, BytesWritable end,
String projection, Configuration conf) throws IOException {
- BasicTable.Reader reader = new BasicTable.Reader(path, conf);
+ String[] deletedCGs = getDeletedCGs(conf);
+ BasicTable.Reader reader = new BasicTable.Reader(path, deletedCGs, conf);
try {
reader.setProjection(projection);
} catch (ParseException e) {
Modified: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableExpr.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableExpr.java?rev=911493&r1=911492&r2=911493&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableExpr.java (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableExpr.java Thu Feb 18 17:01:30 2010
@@ -27,6 +27,7 @@
import org.apache.hadoop.zebra.io.BasicTable;
import org.apache.hadoop.zebra.io.TableScanner;
import org.apache.hadoop.zebra.parser.ParseException;
+import org.apache.hadoop.zebra.types.Projection;
import org.apache.hadoop.zebra.schema.Schema;
/**
@@ -125,7 +126,7 @@
public TableScanner getScanner(UnsortedTableSplit split, String projection,
Configuration conf) throws IOException, ParseException {
BasicTable.Reader reader =
- new BasicTable.Reader(new Path(split.getPath()), conf);
+ new BasicTable.Reader(new Path(split.getPath()), getDeletedCGs(conf), conf);
reader.setProjection(projection);
return reader.getScanner(split.getSplit(), true);
}
@@ -145,7 +146,7 @@
public TableScanner getScanner(RowTableSplit split, String projection,
Configuration conf) throws IOException, ParseException, ParseException {
BasicTable.Reader reader =
- new BasicTable.Reader(new Path(split.getPath()), conf);
+ new BasicTable.Reader(new Path(split.getPath()), getDeletedCGs(conf), conf);
reader.setProjection(projection);
return reader.getScanner(true, split.getSplit());
}
@@ -238,4 +239,31 @@
* dump table info with indent
*/
protected abstract void dumpInfo(PrintStream ps, Configuration conf, int indent) throws IOException;
+
+ /**
+ * get the deleted cg for tables in union
+ * @param conf The Configuration object
+ * @return
+ */
+ protected final String[] getDeletedCGsPerUnion(Configuration conf) {
+ return getDeletedCGs(conf, TableInputFormat.DELETED_CG_SEPARATOR_PER_UNION);
+ }
+
+ protected final String[] getDeletedCGs(Configuration conf) {
+ return getDeletedCGs(conf, BasicTable.DELETED_CG_SEPARATOR_PER_TABLE);
+ }
+
+ private final String[] getDeletedCGs(Configuration conf, String separator) {
+ String[] deletedCGs = null;
+ String fe;
+ if ((fe = conf.get(TableInputFormat.INPUT_FE)) != null && fe.equals("true"))
+ {
+ String original = conf.get(TableInputFormat.INPUT_DELETED_CGS, null);
+ if (original == null)
+ deletedCGs = new String[0]; // empty array needed to indicate it is fe checked
+ else
+ deletedCGs = original.split(separator, -1);
+ }
+ return deletedCGs;
+ }
}
Modified: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableInputFormat.java?rev=911493&r1=911492&r2=911493&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableInputFormat.java (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableInputFormat.java Thu Feb 18 17:01:30 2010
@@ -146,6 +146,9 @@
private static final String INPUT_EXPR = "mapreduce.lib.table.input.expr";
private static final String INPUT_PROJ = "mapreduce.lib.table.input.projection";
private static final String INPUT_SORT = "mapreduce.lib.table.input.sort";
+ static final String INPUT_FE = "mapreduce.lib.table.input.fe";
+ static final String INPUT_DELETED_CGS = "mapreduce.lib.table.input.deleted_cgs";
+ static final String DELETED_CG_SEPARATOR_PER_UNION = ";";
/**
* Set the paths to the input table.
@@ -624,8 +627,7 @@
}
private static List<InputSplit> getRowSplits(Configuration conf,
- TableExpr expr, List<BasicTable.Reader> readers,
- List<BasicTableStatus> status) throws IOException {
+ TableExpr expr, List<BasicTable.Reader> readers) throws IOException {
ArrayList<InputSplit> ret = new ArrayList<InputSplit>();
Job job = new Job(conf);
DummyFileInputFormat helper = new DummyFileInputFormat(job, getMinSplitSize(conf));
@@ -708,24 +710,39 @@
new ArrayList<BasicTableStatus>(nLeaves);
try {
+ StringBuilder sb = new StringBuilder();
+ boolean sorted = expr.sortedSplitRequired();
+ boolean first = true;
for (Iterator<LeafTableInfo> it = leaves.iterator(); it.hasNext();) {
LeafTableInfo leaf = it.next();
BasicTable.Reader reader =
new BasicTable.Reader(leaf.getPath(), conf );
reader.setProjection(leaf.getProjection());
- BasicTableStatus s = reader.getStatus();
+ if (sorted)
+ {
+ BasicTableStatus s = reader.getStatus();
+ status.add(s);
+ }
readers.add(reader);
- status.add(s);
+ if (first)
+ first = false;
+ else {
+ sb.append(TableInputFormat.DELETED_CG_SEPARATOR_PER_UNION);
+ }
+ sb.append(reader.getDeletedCGs());
}
+ conf.set(INPUT_FE, "true");
+ conf.set(INPUT_DELETED_CGS, sb.toString());
+
if( readers.isEmpty() ) {
// I think we should throw exception here.
return new ArrayList<InputSplit>();
}
- return expr.sortedSplitRequired() ?
- singleSplit ? getSortedSplits( conf, 1, expr, readers, status ) : getSortedSplits(conf, -1, expr, readers, status) :
- getRowSplits( conf, expr, readers, status );
+ return sorted ?
+ singleSplit ? getSortedSplits( conf, 1, expr, readers, status) : getSortedSplits(conf, -1, expr, readers, status) :
+ getRowSplits( conf, expr, readers);
} catch (ParseException e) {
throw new IOException("Projection parsing failed : "+e.getMessage());
} finally {
Modified: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableUnionExpr.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableUnionExpr.java?rev=911493&r1=911492&r2=911493&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableUnionExpr.java (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/java/org/apache/hadoop/zebra/mapreduce/TableUnionExpr.java Thu Feb 18 17:01:30 2010
@@ -105,14 +105,20 @@
throw new IllegalArgumentException("Union of 0 table");
}
ArrayList<BasicTable.Reader> readers = new ArrayList<BasicTable.Reader>(n);
- final ArrayList<BasicTableStatus> status =
- new ArrayList<BasicTableStatus>(n);
+ String[] deletedCGsInUnion = getDeletedCGsPerUnion(conf);
+
+ if (deletedCGsInUnion != null && deletedCGsInUnion.length != n)
+ throw new IllegalArgumentException("Invalid string of deleted column group names: expected = "+
+ n + " actual =" + deletedCGsInUnion.length);
+
for (int i = 0; i < n; ++i) {
+ String deletedCGs = (deletedCGsInUnion == null ? null : deletedCGsInUnion[i]);
+ String[] deletedCGList = (deletedCGs == null ? null :
+ deletedCGs.split(BasicTable.DELETED_CG_SEPARATOR_PER_TABLE));
BasicTableExpr expr = (BasicTableExpr) composite.get(i);
BasicTable.Reader reader =
- new BasicTable.Reader(expr.getPath(), conf);
+ new BasicTable.Reader(expr.getPath(), deletedCGList, conf);
readers.add(reader);
- status.add(reader.getStatus());
}
String actualProjection = projection;
Modified: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestBasicTable.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestBasicTable.java?rev=911493&r1=911492&r2=911493&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestBasicTable.java (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestBasicTable.java Thu Feb 18 17:01:30 2010
@@ -353,7 +353,7 @@
@Test
public void testNormalCases() throws IOException, ParseException {
Path path = new Path(rootPath, "TestBasicTableNormal");
- doReadWrite(path, 2, 250, "a, b, c", "", null, "a, d, c, f", false, false);
+ doReadWrite(path, 2, 250, "a, b, c", "", null, "a, d, c, f", true, false);
doReadWrite(path, 2, 250, "a, b, c", "", null, "a, d, c, f", true, false);
doReadWrite(path, 2, 250, "a, b, c", "", "a", "a, d, c, f", true, true);
}
Modified: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroup.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroup.java?rev=911493&r1=911492&r2=911493&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroup.java (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroup.java Thu Feb 18 17:01:30 2010
@@ -411,7 +411,6 @@
@Test
public void testEmptyCG() throws IOException, ParseException {
Path path = new Path(rootPath, "TestColumnGroupEmptyCG");
- doReadWrite(path, 0, 0, "a, b, c", "a, d, c, f", false, false, null);
doReadWrite(path, 0, 0, "a, b, c", "a, d, c, f", true, false, null);
doReadWrite(path, 0, 0, "a, b, c", "a, d, c, f", true, true, null);
}
@@ -419,14 +418,12 @@
@Test
public void testEmptyTFiles() throws IOException, ParseException {
Path path = new Path(rootPath, "TestColumnGroupEmptyTFile");
- doReadWrite(path, 2, 0, "a, b, c", "a, d, c, f", false, false, null);
doReadWrite(path, 2, 0, "a, b, c", "a, d, c, f", true, false, null);
doReadWrite(path, 2, 0, "a, b, c", "a, d, c, f", true, true, null);
}
public void testNormalCases() throws IOException, ParseException {
Path path = new Path(rootPath, "TestColumnGroupNormal");
- doReadWrite(path, 2, 500, "a, b, c", "a, d, c, f", false, false, null);
doReadWrite(path, 2, 500, "a, b, c", "a, d, c, f", true, false, null);
doReadWrite(path, 2, 500, "a, b, c", "a, d, c, f", true, true, null);
}
@@ -435,8 +432,6 @@
public void testSomeEmptyTFiles() throws IOException, ParseException {
Path path = new Path(rootPath, "TestColumnGroupSomeEmptyTFile");
for (int[] emptyTFiles : new int[][] { { 1, 2 }}) {
- doReadWrite(path, 2, 250, "a, b, c", "a, d, c, f", false, false,
- emptyTFiles);
doReadWrite(path, 2, 250, "a, b, c", "a, d, c, f", true, false,
emptyTFiles);
doReadWrite(path, 2, 250, "a, b, c", "a, d, c, f", true, true,
Modified: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupReaders.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupReaders.java?rev=911493&r1=911492&r2=911493&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupReaders.java (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupReaders.java Thu Feb 18 17:01:30 2010
@@ -67,7 +67,7 @@
@AfterClass
public static void tearDownOnce() throws IOException {
- finish();
+ close();
}
@SuppressWarnings("unchecked")
@@ -175,9 +175,7 @@
ColumnGroup.Writer writer2 = writeOnePart(null, 2);
ColumnGroup.Writer writer3 = writeOnePart(null, 3);
- writer1.finish();
- writer2.finish();
- writer3.finish();
+ writer3.close();
// read in parts
readOnePart(1);
@@ -327,7 +325,7 @@
private static void finish() throws IOException {
if (writer != null) {
- writer.finish();
+ writer.close();
}
}
Modified: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupWithWorkPath.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupWithWorkPath.java?rev=911493&r1=911492&r2=911493&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupWithWorkPath.java (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestColumnGroupWithWorkPath.java Thu Feb 18 17:01:30 2010
@@ -412,7 +412,6 @@
@Test
public void testEmptyCG() throws IOException, ParseException {
Path path = new Path(rootPath, "TestColumnGroupEmptyCG");
- doReadWrite(path, 0, 0, "a, b, c", "a, d, c, f", false, false, null);
doReadWrite(path, 0, 0, "a, b, c", "a, d, c, f", true, false, null);
doReadWrite(path, 0, 0, "a, b, c", "a, d, c, f", true, true, null);
}
@@ -420,14 +419,12 @@
@Test
public void testEmptyTFiles() throws IOException, ParseException {
Path path = new Path(rootPath, "TestColumnGroupEmptyTFile");
- doReadWrite(path, 2, 0, "a, b, c", "a, d, c, f", false, false, null);
doReadWrite(path, 2, 0, "a, b, c", "a, d, c, f", true, false, null);
doReadWrite(path, 2, 0, "a, b, c", "a, d, c, f", true, true, null);
}
public void testNormalCases() throws IOException, ParseException {
Path path = new Path(rootPath, "TestColumnGroupNormal");
- doReadWrite(path, 2, 500, "a, b, c", "a, d, c, f", false, false, null);
doReadWrite(path, 2, 500, "a, b, c", "a, d, c, f", true, false, null);
doReadWrite(path, 2, 500, "a, b, c", "a, d, c, f", true, true, null);
}
@@ -436,8 +433,6 @@
public void testSomeEmptyTFiles() throws IOException, ParseException {
Path path = new Path(rootPath, "TestColumnGroupSomeEmptyTFile");
for (int[] emptyTFiles : new int[][] { { 1, 2 }}) {
- doReadWrite(path, 2, 250, "a, b, c", "a, d, c, f", false, false,
- emptyTFiles);
doReadWrite(path, 2, 250, "a, b, c", "a, d, c, f", true, false,
emptyTFiles);
doReadWrite(path, 2, 250, "a, b, c", "a, d, c, f", true, true,
Modified: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestBasicTableIOFormatLocalFS.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestBasicTableIOFormatLocalFS.java?rev=911493&r1=911492&r2=911493&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestBasicTableIOFormatLocalFS.java (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/mapred/TestBasicTableIOFormatLocalFS.java Thu Feb 18 17:01:30 2010
@@ -305,6 +305,7 @@
// set map-only job.
jobConf.setNumReduceTasks(0);
JobClient.runJob(jobConf);
+ BasicTableOutputFormat.close(jobConf);
}
/**
@@ -599,6 +600,7 @@
jobConf.setNumReduceTasks(options.numReducer);
JobClient.runJob(jobConf);
+ BasicTableOutputFormat.close(jobConf);
}
void reduce(Summary sum, Summary delta) {
@@ -950,6 +952,7 @@
jobConf.setNumReduceTasks(1);
JobClient.runJob(jobConf);
+ BasicTableOutputFormat.close(jobConf);
}
void printFreqWords() throws IOException, ParseException {
Modified: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestBasicTableUnionLoader.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestBasicTableUnionLoader.java?rev=911493&r1=911492&r2=911493&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestBasicTableUnionLoader.java (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestBasicTableUnionLoader.java Thu Feb 18 17:01:30 2010
@@ -108,6 +108,7 @@
for (int i = 0; i < numsInserters; i++) {
inserters[i].close();
}
+ writer.close();
/*
* create 2nd basic table;
@@ -141,6 +142,7 @@
for (int i = 0; i < numsInserters; i++) {
inserters[i].close();
}
+ writer.close();
}
@AfterClass
Modified: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestBasicUnion.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestBasicUnion.java?rev=911493&r1=911492&r2=911493&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestBasicUnion.java (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestBasicUnion.java Thu Feb 18 17:01:30 2010
@@ -126,7 +126,8 @@
for (int i = 0; i < numsInserters; i++) {
inserters[i].close();
}
-
+ writer.close();
+
/*
* create 2nd basic table;
*/
@@ -159,7 +160,8 @@
for (int i = 0; i < numsInserters; i++) {
inserters[i].close();
}
-
+ writer.close();
+
/*
* create 3rd basic table;
*/
@@ -192,6 +194,8 @@
for (int i = 0; i < numsInserters; i++) {
inserters[i].close();
}
+ writer.close();
+
/*
* create 4th basic table;
*/
@@ -224,6 +228,7 @@
for (int i = 0; i < numsInserters; i++) {
inserters[i].close();
}
+ writer.close();
/*
* create 5th basic table;
*/
@@ -256,7 +261,7 @@
for (int i = 0; i < numsInserters; i++) {
inserters[i].close();
}
-
+ writer.close();
}
@AfterClass
Modified: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestCollectionTableLoader.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestCollectionTableLoader.java?rev=911493&r1=911492&r2=911493&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestCollectionTableLoader.java (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestCollectionTableLoader.java Thu Feb 18 17:01:30 2010
@@ -118,6 +118,7 @@
for (int i = 0; i < numsInserters; i++) {
inserters[i].close();
}
+ writer.close();
}
@AfterClass
@@ -137,4 +138,4 @@
System.out.println(cur);
}
}
-}
\ No newline at end of file
+}
Modified: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestGlobTableLoader.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestGlobTableLoader.java?rev=911493&r1=911492&r2=911493&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestGlobTableLoader.java (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestGlobTableLoader.java Thu Feb 18 17:01:30 2010
@@ -159,6 +159,7 @@
for (int i = 0; i < numsInserters; i++) {
inserters[i].close();
}
+ writer.close();
}
@AfterClass
Modified: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestMapTableLoader.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestMapTableLoader.java?rev=911493&r1=911492&r2=911493&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestMapTableLoader.java (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestMapTableLoader.java Thu Feb 18 17:01:30 2010
@@ -114,6 +114,7 @@
for (int i = 0; i < numsInserters; i++) {
inserters[i].close();
}
+ writer.close();
}
@AfterClass
Modified: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestTableLoader.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestTableLoader.java?rev=911493&r1=911492&r2=911493&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestTableLoader.java (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestTableLoader.java Thu Feb 18 17:01:30 2010
@@ -103,6 +103,7 @@
for (int i = 0; i < numsInserters; i++) {
inserters[i].close();
}
+ writer.close();
}
@AfterClass
Modified: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestTableLoaderPrune.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestTableLoaderPrune.java?rev=911493&r1=911492&r2=911493&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestTableLoaderPrune.java (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/pig/TestTableLoaderPrune.java Thu Feb 18 17:01:30 2010
@@ -149,6 +149,7 @@
for (int i = 0; i < numsInserters; i++) {
inserters[i].close();
}
+ writer.close();
}
@AfterClass
Modified: hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/types/TestStorageGrammar.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/types/TestStorageGrammar.java?rev=911493&r1=911492&r2=911493&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/types/TestStorageGrammar.java (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/zebra/src/test/org/apache/hadoop/zebra/types/TestStorageGrammar.java Thu Feb 18 17:01:30 2010
@@ -359,9 +359,10 @@
} catch (Exception e) {
e.printStackTrace();
}
- writer.finish();
+ writer.close();
ByteArrayOutputStream bos = new ByteArrayOutputStream();
PrintStream ps = new PrintStream(bos);
+
BasicTable.dumpInfo(path1.toString(), ps, conf);
System.out.println("start dumpinfo ===========\n" + bos.toString());
Assert.assertEquals(true, bos.toString().contains("Serializer: pig"));
@@ -382,7 +383,7 @@
} catch (Exception e) {
e.printStackTrace();
}
- writer.finish();
+ writer.close();
ByteArrayOutputStream bos = new ByteArrayOutputStream();
PrintStream ps = new PrintStream(bos);
BasicTable.dumpInfo(path1.toString(), ps, conf);
@@ -404,7 +405,7 @@
} catch (Exception e) {
e.printStackTrace();
}
- writer.finish();
+ writer.close();
ByteArrayOutputStream bos = new ByteArrayOutputStream();
PrintStream ps = new PrintStream(bos);
BasicTable.dumpInfo(path1.toString(), ps, conf);
@@ -426,7 +427,7 @@
} catch (Exception e) {
e.printStackTrace();
}
- writer.finish();
+ writer.close();
ByteArrayOutputStream bos = new ByteArrayOutputStream();
PrintStream ps = new PrintStream(bos);
BasicTable.dumpInfo(path1.toString(), ps, conf);
@@ -448,7 +449,7 @@
} catch (Exception e) {
e.printStackTrace();
}
- writer.finish();
+ writer.close();
ByteArrayOutputStream bos = new ByteArrayOutputStream();
PrintStream ps = new PrintStream(bos);
BasicTable.dumpInfo(path1.toString(), ps, conf);
@@ -470,7 +471,7 @@
} catch (Exception e) {
e.printStackTrace();
}
- writer.finish();
+ writer.close();
ByteArrayOutputStream bos = new ByteArrayOutputStream();
PrintStream ps = new PrintStream(bos);
BasicTable.dumpInfo(path1.toString(), ps, conf);
@@ -491,7 +492,7 @@
} catch (Exception e) {
e.printStackTrace();
}
- writer.finish();
+ writer.close();
ByteArrayOutputStream bos = new ByteArrayOutputStream();
PrintStream ps = new PrintStream(bos);
BasicTable.dumpInfo(path1.toString(), ps, conf);
@@ -605,7 +606,7 @@
fs = path.getFileSystem(conf);
BasicTable.Writer writer = new BasicTable.Writer(path1, schema, storage,
conf);
- writer.finish();
+ writer.close();
ByteArrayOutputStream bos = new ByteArrayOutputStream();
PrintStream ps = new PrintStream(bos);
System.out.println("start dumpinfo 17 ===========");
@@ -699,7 +700,7 @@
fs = path.getFileSystem(conf);
BasicTable.Writer writer = new BasicTable.Writer(path1, schema, storage,
conf);
- writer.finish();
+ writer.close();
ByteArrayOutputStream bos = new ByteArrayOutputStream();
PrintStream ps = new PrintStream(bos);
System.out.println("start dumpinfo 22===========" + bos.toString());