You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2007/07/16 20:04:55 UTC

svn commit: r556684 - in /lucene/hadoop/trunk: CHANGES.txt src/java/org/apache/hadoop/mapred/TaskLog.java

Author: cutting
Date: Mon Jul 16 11:04:54 2007
New Revision: 556684

URL: http://svn.apache.org/viewvc?view=rev&rev=556684
Log:
HADOOP-1524.  Permit user task logs to appear as they are created.  Contributed by Michael Bieniosek.

Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskLog.java

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=556684&r1=556683&r2=556684
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Mon Jul 16 11:04:54 2007
@@ -351,6 +351,9 @@
 109. HADOOP-1597.  Add status reports and post-upgrade options to HDFS
      distributed upgrade.  (Konstantin Shvachko via cutting)
 
+110. HADOOP-1524.  Permit user task logs to appear as they're
+     created.  (Michael Bieniosek via cutting)
+
 
 Release 0.13.0 - 2007-06-08
 

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskLog.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskLog.java?view=diff&rev=556684&r1=556683&r2=556684
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskLog.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskLog.java Mon Jul 16 11:04:54 2007
@@ -238,12 +238,13 @@
       throws IOException {
       currentSplit =  getLogSplit(split);
       LOG.debug("About to create the split: " + currentSplit);
+      // Record the 'split' in the index
+      writeIndexRecord();
       return new BufferedOutputStream(new FileOutputStream(currentSplit));
     }
     
     private synchronized void writeIndexRecord() throws IOException {
-      String indexRecord = currentSplit + "|" + splitOffset + "|" + 
-                               splitLength + "\n";
+      String indexRecord = currentSplit + "|" + splitOffset + "\n";
       splitIndex.write(indexRecord.getBytes());
       splitIndex.flush();
     }
@@ -253,9 +254,6 @@
       LOG.debug("About to rotate-out the split: " + noSplits);
       out.close();
       
-      // Record the 'split' in the index
-      writeIndexRecord();
-      
       // Re-initialize the state
       splitOffset += splitLength;
       splitLength = 0;
@@ -312,12 +310,10 @@
     private static class IndexRecord {
       String splitName;
       long splitOffset;
-      long splitLength;
       
-      IndexRecord(String splitName, long splitOffset, long splitLength) {
+      IndexRecord(String splitName, long splitOffset) {
         this.splitName = splitName;
         this.splitOffset = splitOffset;
-        this.splitLength = splitLength;
       }
     }
     
@@ -331,26 +327,26 @@
       String line;
       while ((line = splitIndex.readLine()) != null) {
         String[] fields = line.split("\\|");
-        if (fields.length != 3) {
+        if (fields.length != 2) {
           throw new IOException("Malformed split-index with " + 
                                 fields.length + " fields");
         }
         
         IndexRecord record = new IndexRecord(
                                              fields[0], 
-                                             Long.valueOf(fields[1]).longValue(), 
-                                             Long.valueOf(fields[2]).longValue()
+                                             Long.valueOf(fields[1]).longValue()
                                              );
-        LOG.debug("Split: <" + record.splitName + ", " + record.splitOffset + 
-                  ", " + record.splitLength + ">");
+        LOG.debug("Split: <" + record.splitName + ", " + record.splitOffset + ">");
         
         // Save 
         records.add(record);
-        logFileSize += record.splitLength;
       }
 
       indexRecords = new IndexRecord[records.size()];
       indexRecords = records.toArray(indexRecords);
+      IndexRecord lastRecord = indexRecords[records.size() - 1];
+      logFileSize = lastRecord.splitOffset
+          + new File(lastRecord.splitName).length();
       initialized = true;
       LOG.debug("Log size: " + logFileSize);
     }
@@ -384,34 +380,28 @@
       
       // Get all splits 
       Vector<InputStream> streams = new Vector<InputStream>();
-      int totalLogSize = 0;
       for (int i=0; i < indexRecords.length; ++i) {
         InputStream stream = getLogSplit(i);
         if (stream != null) {
           streams.add(stream);
-          totalLogSize += indexRecords[i].splitLength;
           LOG.debug("Added split: " + i);
         }
       }
-      LOG.debug("Total log-size on disk: " + totalLogSize + 
-                "; actual log-size: " + logFileSize);
+      LOG.debug("Total log-size on disk: " + logFileSize);
 
       // Copy log data into buffer
-      byte[] b = new byte[totalLogSize];
+      byte[] b = new byte[(int) logFileSize];
       SequenceInputStream in = new SequenceInputStream(streams.elements());
       try {
-        int bytesRead = 0, totalBytesRead = 0;
-        int off = 0, len = totalLogSize;
-        LOG.debug("Attempting to read " + len + " bytes from logs");
-        while ((bytesRead = in.read(b, off, len)) > 0) {
+        int bytesRead = 0;
+        int off = 0;
+        LOG.debug("Attempting to read " + logFileSize + " bytes from logs");
+        while ((bytesRead = in.read(b, off, (int) logFileSize - off)) > 0) {
           LOG.debug("Got " + bytesRead + " bytes");
           off += bytesRead;
-          len -= bytesRead;
-        
-          totalBytesRead += bytesRead;
         }
 
-        if (totalBytesRead != totalLogSize) {
+        if (off != logFileSize) {
           LOG.debug("Didn't not read all requisite data in logs!");
         }
       } finally {