You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2007/07/16 20:04:55 UTC
svn commit: r556684 - in /lucene/hadoop/trunk: CHANGES.txt
src/java/org/apache/hadoop/mapred/TaskLog.java
Author: cutting
Date: Mon Jul 16 11:04:54 2007
New Revision: 556684
URL: http://svn.apache.org/viewvc?view=rev&rev=556684
Log:
HADOOP-1524. Permit user task logs to appear as they are created. Contributed by Michael Bieniosek.
Modified:
lucene/hadoop/trunk/CHANGES.txt
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskLog.java
Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=556684&r1=556683&r2=556684
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Mon Jul 16 11:04:54 2007
@@ -351,6 +351,9 @@
109. HADOOP-1597. Add status reports and post-upgrade options to HDFS
distributed upgrade. (Konstantin Shvachko via cutting)
+110. HADOOP-1524. Permit user task logs to appear as they're
+ created. (Michael Bieniosek via cutting)
+
Release 0.13.0 - 2007-06-08
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskLog.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskLog.java?view=diff&rev=556684&r1=556683&r2=556684
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskLog.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskLog.java Mon Jul 16 11:04:54 2007
@@ -238,12 +238,13 @@
throws IOException {
currentSplit = getLogSplit(split);
LOG.debug("About to create the split: " + currentSplit);
+ // Record the 'split' in the index
+ writeIndexRecord();
return new BufferedOutputStream(new FileOutputStream(currentSplit));
}
private synchronized void writeIndexRecord() throws IOException {
- String indexRecord = currentSplit + "|" + splitOffset + "|" +
- splitLength + "\n";
+ String indexRecord = currentSplit + "|" + splitOffset + "\n";
splitIndex.write(indexRecord.getBytes());
splitIndex.flush();
}
@@ -253,9 +254,6 @@
LOG.debug("About to rotate-out the split: " + noSplits);
out.close();
- // Record the 'split' in the index
- writeIndexRecord();
-
// Re-initialize the state
splitOffset += splitLength;
splitLength = 0;
@@ -312,12 +310,10 @@
private static class IndexRecord {
String splitName;
long splitOffset;
- long splitLength;
- IndexRecord(String splitName, long splitOffset, long splitLength) {
+ IndexRecord(String splitName, long splitOffset) {
this.splitName = splitName;
this.splitOffset = splitOffset;
- this.splitLength = splitLength;
}
}
@@ -331,26 +327,26 @@
String line;
while ((line = splitIndex.readLine()) != null) {
String[] fields = line.split("\\|");
- if (fields.length != 3) {
+ if (fields.length != 2) {
throw new IOException("Malformed split-index with " +
fields.length + " fields");
}
IndexRecord record = new IndexRecord(
fields[0],
- Long.valueOf(fields[1]).longValue(),
- Long.valueOf(fields[2]).longValue()
+ Long.valueOf(fields[1]).longValue()
);
- LOG.debug("Split: <" + record.splitName + ", " + record.splitOffset +
- ", " + record.splitLength + ">");
+ LOG.debug("Split: <" + record.splitName + ", " + record.splitOffset + ">");
// Save
records.add(record);
- logFileSize += record.splitLength;
}
indexRecords = new IndexRecord[records.size()];
indexRecords = records.toArray(indexRecords);
+ IndexRecord lastRecord = indexRecords[records.size() - 1];
+ logFileSize = lastRecord.splitOffset
+ + new File(lastRecord.splitName).length();
initialized = true;
LOG.debug("Log size: " + logFileSize);
}
@@ -384,34 +380,28 @@
// Get all splits
Vector<InputStream> streams = new Vector<InputStream>();
- int totalLogSize = 0;
for (int i=0; i < indexRecords.length; ++i) {
InputStream stream = getLogSplit(i);
if (stream != null) {
streams.add(stream);
- totalLogSize += indexRecords[i].splitLength;
LOG.debug("Added split: " + i);
}
}
- LOG.debug("Total log-size on disk: " + totalLogSize +
- "; actual log-size: " + logFileSize);
+ LOG.debug("Total log-size on disk: " + logFileSize);
// Copy log data into buffer
- byte[] b = new byte[totalLogSize];
+ byte[] b = new byte[(int) logFileSize];
SequenceInputStream in = new SequenceInputStream(streams.elements());
try {
- int bytesRead = 0, totalBytesRead = 0;
- int off = 0, len = totalLogSize;
- LOG.debug("Attempting to read " + len + " bytes from logs");
- while ((bytesRead = in.read(b, off, len)) > 0) {
+ int bytesRead = 0;
+ int off = 0;
+ LOG.debug("Attempting to read " + logFileSize + " bytes from logs");
+ while ((bytesRead = in.read(b, off, (int) logFileSize - off)) > 0) {
LOG.debug("Got " + bytesRead + " bytes");
off += bytesRead;
- len -= bytesRead;
-
- totalBytesRead += bytesRead;
}
- if (totalBytesRead != totalLogSize) {
+ if (off != logFileSize) {
LOG.debug("Didn't not read all requisite data in logs!");
}
} finally {