You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2007/05/31 21:31:58 UTC

svn commit: r543225 - in /lucene/hadoop/branches/branch-0.13: ./ src/java/org/apache/hadoop/dfs/ src/java/org/apache/hadoop/mapred/ src/test/org/apache/hadoop/dfs/

Author: cutting
Date: Thu May 31 12:31:57 2007
New Revision: 543225

URL: http://svn.apache.org/viewvc?view=rev&rev=543225
Log:
Merge -r 543206:543222 from trunk to 0.13 branch.  Fixes: HADOOP-1242 and HADOOP-1332.

Modified:
    lucene/hadoop/branches/branch-0.13/CHANGES.txt
    lucene/hadoop/branches/branch-0.13/src/java/org/apache/hadoop/dfs/DataStorage.java
    lucene/hadoop/branches/branch-0.13/src/java/org/apache/hadoop/dfs/FSImage.java
    lucene/hadoop/branches/branch-0.13/src/java/org/apache/hadoop/dfs/Storage.java
    lucene/hadoop/branches/branch-0.13/src/java/org/apache/hadoop/mapred/TaskRunner.java
    lucene/hadoop/branches/branch-0.13/src/test/org/apache/hadoop/dfs/TestDFSFinalize.java
    lucene/hadoop/branches/branch-0.13/src/test/org/apache/hadoop/dfs/UpgradeUtilities.java

Modified: lucene/hadoop/branches/branch-0.13/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.13/CHANGES.txt?view=diff&rev=543225&r1=543224&r2=543225
==============================================================================
--- lucene/hadoop/branches/branch-0.13/CHANGES.txt (original)
+++ lucene/hadoop/branches/branch-0.13/CHANGES.txt Thu May 31 12:31:57 2007
@@ -430,6 +430,12 @@
      AlreadyBeingCreatedException when wrapped as a RemoteException.
      (Hairong Kuang via tomwhite)
 
+129. HADOOP-1242.  Improve handling of DFS upgrades.
+     (Konstantin Shvachko via cutting)
+
+130. HADOOP-1332.  Fix so that TaskTracker exits reliably during unit
+     tests on Windows.  (omalley via cutting)
+
 
 Release 0.12.3 - 2007-04-06
 

Modified: lucene/hadoop/branches/branch-0.13/src/java/org/apache/hadoop/dfs/DataStorage.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.13/src/java/org/apache/hadoop/dfs/DataStorage.java?view=diff&rev=543225&r1=543224&r2=543225
==============================================================================
--- lucene/hadoop/branches/branch-0.13/src/java/org/apache/hadoop/dfs/DataStorage.java (original)
+++ lucene/hadoop/branches/branch-0.13/src/java/org/apache/hadoop/dfs/DataStorage.java Thu May 31 12:31:57 2007
@@ -11,7 +11,6 @@
 
 import org.apache.hadoop.dfs.FSConstants.StartupOption;
 import org.apache.hadoop.dfs.FSConstants.NodeType;
-import org.apache.hadoop.dfs.FSImage.NameNodeFile;
 import org.apache.hadoop.util.Daemon;
 import org.apache.hadoop.fs.FileUtil.HardLink;
 
@@ -164,6 +163,21 @@
     File oldF = new File(sd.root, "storage");
     if (!oldF.exists())
       return false;
+    // check the layout version inside the storage file
+    // Lock and Read old storage file
+    RandomAccessFile oldFile = new RandomAccessFile(oldF, "rws");
+    if (oldFile == null)
+      throw new IOException("Cannot read file: " + oldF);
+    FileLock oldLock = oldFile.getChannel().tryLock();
+    try {
+      oldFile.seek(0);
+      int odlVersion = oldFile.readInt();
+      if (odlVersion < LAST_PRE_UPGRADE_LAYOUT_VERSION)
+        return false;
+    } finally {
+      oldLock.release();
+      oldFile.close();
+    }
     // check consistency of the old storage
     File oldDataDir = new File(sd.root, "data");
     if (!oldDataDir.exists()) 
@@ -206,13 +220,14 @@
     FileLock oldLock = oldFile.getChannel().tryLock();
     if (oldLock == null)
       throw new IOException("Cannot lock file: " + oldF);
+    String odlStorageID = "";
     try {
       oldFile.seek(0);
       int odlVersion = oldFile.readInt();
       if (odlVersion < LAST_PRE_UPGRADE_LAYOUT_VERSION)
         throw new IncorrectVersionException(odlVersion, "file " + oldF,
                                             LAST_PRE_UPGRADE_LAYOUT_VERSION);
-      String odlStorageID = org.apache.hadoop.io.UTF8.readString(oldFile);
+      odlStorageID = org.apache.hadoop.io.UTF8.readString(oldFile);
   
       // check new storage
       File newDataDir = sd.getCurrentDir();
@@ -221,14 +236,8 @@
         throw new IOException("Version file already exists: " + versionF);
       if (newDataDir.exists()) // somebody created current dir manually
         deleteDir(newDataDir);
-      // Write new layout
+      // move "data" to "current"
       rename(oldDataDir, newDataDir);
-  
-      this.layoutVersion = FSConstants.LAYOUT_VERSION;
-      this.namespaceID = nsInfo.getNamespaceID();
-      this.cTime = 0;
-      this.storageID = odlStorageID;
-      sd.write();
       // close and unlock old file
     } finally {
       oldLock.release();
@@ -236,6 +245,13 @@
     }
     // move old storage file into current dir
     rename(oldF, new File(sd.getCurrentDir(), "storage"));
+
+    // Write new version file
+    this.layoutVersion = FSConstants.LAYOUT_VERSION;
+    this.namespaceID = nsInfo.getNamespaceID();
+    this.cTime = 0;
+    this.storageID = odlStorageID;
+    sd.write();
     LOG.info("Conversion of " + oldF + " is complete.");
   }
 
@@ -408,5 +424,23 @@
     
     for(int i = 0; i < blockNames.length; i++)
       linkBlocks(new File(from, blockNames[i]), new File(to, blockNames[i]));
+  }
+
+  protected void corruptPreUpgradeStorage(File rootDir) throws IOException {
+    File oldF = new File(rootDir, "storage");
+    if (oldF.exists())
+      return;
+    // recreate old storage file to let pre-upgrade versions fail
+    if (!oldF.createNewFile())
+      throw new IOException("Cannot create file " + oldF);
+    RandomAccessFile oldFile = new RandomAccessFile(oldF, "rws");
+    if (oldFile == null)
+      throw new IOException("Cannot read file: " + oldF);
+    // write new version into old storage file
+    try {
+      writeCorruptedData(oldFile);
+    } finally {
+      oldFile.close();
+    }
   }
 }

Modified: lucene/hadoop/branches/branch-0.13/src/java/org/apache/hadoop/dfs/FSImage.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.13/src/java/org/apache/hadoop/dfs/FSImage.java?view=diff&rev=543225&r1=543224&r2=543225
==============================================================================
--- lucene/hadoop/branches/branch-0.13/src/java/org/apache/hadoop/dfs/FSImage.java (original)
+++ lucene/hadoop/branches/branch-0.13/src/java/org/apache/hadoop/dfs/FSImage.java Thu May 31 12:31:57 2007
@@ -27,6 +27,7 @@
 import java.io.FileInputStream;
 import java.io.FileOutputStream;
 import java.io.IOException;
+import java.io.RandomAccessFile;
 import java.util.AbstractList;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -435,7 +436,21 @@
   boolean isConversionNeeded(StorageDirectory sd) throws IOException {
     File oldImageDir = new File(sd.root, "image");
     if (!oldImageDir.exists())
-      return false;
+      throw new InconsistentFSStateException(sd.root,
+          oldImageDir + " does not exist.");
+    // check the layout version inside the image file
+    File oldF = new File(oldImageDir, "fsimage");
+    RandomAccessFile oldFile = new RandomAccessFile(oldF, "rws");
+    if (oldFile == null)
+      throw new IOException("Cannot read file: " + oldF);
+    try {
+      oldFile.seek(0);
+      int odlVersion = oldFile.readInt();
+      if (odlVersion < LAST_PRE_UPGRADE_LAYOUT_VERSION)
+        return false;
+    } finally {
+      oldFile.close();
+    }
     // check consistency of the old storage
     if (!oldImageDir.isDirectory())
       throw new InconsistentFSStateException(sd.root,
@@ -492,8 +507,8 @@
       needReformat = true;
     } else {
       sd.write();
-      LOG.info("Conversion of " + oldImage + " is complete.");
     }
+    LOG.info("Conversion of " + oldImage + " is complete.");
     return needReformat;
   }
 
@@ -958,6 +973,27 @@
       node.setRemaining(remaining);
       node.setLastUpdate(lastUpdate);
       node.setXceiverCount(xceiverCount);
+    }
+  }
+
+  protected void corruptPreUpgradeStorage(File rootDir) throws IOException {
+    File oldImageDir = new File(rootDir, "image");
+    if (!oldImageDir.exists())
+      if (!oldImageDir.mkdir())
+        throw new IOException("Cannot create directory " + oldImageDir);
+    File oldImage = new File(oldImageDir, "fsimage");
+    if (!oldImage.exists())
+      // recreate old image file to let pre-upgrade versions fail
+      if (!oldImage.createNewFile())
+        throw new IOException("Cannot create file " + oldImage);
+    RandomAccessFile oldFile = new RandomAccessFile(oldImage, "rws");
+    if (oldFile == null)
+      throw new IOException("Cannot read file: " + oldImage);
+    // write new version into old image file
+    try {
+      writeCorruptedData(oldFile);
+    } finally {
+      oldFile.close();
     }
   }
 }

Modified: lucene/hadoop/branches/branch-0.13/src/java/org/apache/hadoop/dfs/Storage.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.13/src/java/org/apache/hadoop/dfs/Storage.java?view=diff&rev=543225&r1=543224&r2=543225
==============================================================================
--- lucene/hadoop/branches/branch-0.13/src/java/org/apache/hadoop/dfs/Storage.java (original)
+++ lucene/hadoop/branches/branch-0.13/src/java/org/apache/hadoop/dfs/Storage.java Thu May 31 12:31:57 2007
@@ -157,6 +157,7 @@
      * @throws IOException
      */
     void write() throws IOException {
+      corruptPreUpgradeStorage(root);
       write(getVersionFile());
     }
 
@@ -520,5 +521,21 @@
     return "NS-" + Integer.toString(storage.getNamespaceID())
       + "-" + Integer.toString(storage.getLayoutVersion())
       + "-" + Long.toString(storage.getCTime());
+  }
+
+  // Pre-upgrade version compatibility
+  protected abstract void corruptPreUpgradeStorage(File rootDir) throws IOException;
+
+  protected void writeCorruptedData(RandomAccessFile file) throws IOException {
+    final String messageForPreUpgradeVersion =
+      "\nThis file is INTENTIONALLY CORRUPTED so that versions\n"
+      + "of Hadoop prior to 0.13 (which are incompatible\n"
+      + "with this directory layout) will fail to start.\n";
+  
+    file.seek(0);
+    file.writeInt(FSConstants.LAYOUT_VERSION);
+    org.apache.hadoop.io.UTF8.writeString(file, "");
+    file.writeBytes(messageForPreUpgradeVersion);
+    file.getFD().sync();
   }
 }

Modified: lucene/hadoop/branches/branch-0.13/src/java/org/apache/hadoop/mapred/TaskRunner.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.13/src/java/org/apache/hadoop/mapred/TaskRunner.java?view=diff&rev=543225&r1=543224&r2=543225
==============================================================================
--- lucene/hadoop/branches/branch-0.13/src/java/org/apache/hadoop/mapred/TaskRunner.java (original)
+++ lucene/hadoop/branches/branch-0.13/src/java/org/apache/hadoop/mapred/TaskRunner.java Thu May 31 12:31:57 2007
@@ -388,16 +388,19 @@
    */
   private void runChild(String[] args, File dir) throws IOException {
     this.process = Runtime.getRuntime().exec(args, null, dir);
+    
+    Thread logStdErrThread = null;
+    Thread logStdOutThread = null;
     try {
-      new Thread() {
-        public void run() {
-          // Copy stderr of the process
-          logStream(process.getErrorStream(), taskStdErrLogWriter); 
-        }
-      }.start();
-        
-      // Copy stderr of the process; normally empty
-      logStream(process.getInputStream(), taskStdOutLogWriter);		  
+      // Copy stderr of the child-process via a thread
+      logStdErrThread = logStream((t.getTaskId() + " - " + "stderr"), 
+                                   process.getErrorStream(), 
+                                   taskStdErrLogWriter);
+      
+      // Copy stdout of the child-process via a thread
+      logStdOutThread = logStream((t.getTaskId() + " - " + "stdout"), 
+                                  process.getInputStream(), 
+                                  taskStdOutLogWriter); 
       
       int exit_code = process.waitFor();
      
@@ -410,8 +413,21 @@
       throw new IOException(e.toString());
     } finally {
       kill();
-      taskStdOutLogWriter.close();
-      taskStdErrLogWriter.close();
+      
+      // Kill both stdout/stderr copying threads 
+      if (logStdErrThread != null) {
+        logStdErrThread.interrupt();
+        try {
+          logStdErrThread.join();
+        } catch (InterruptedException ie) {}
+      }
+      
+      if (logStdOutThread != null) {
+        logStdOutThread.interrupt();
+        try {
+          logStdOutThread.join();
+        } catch (InterruptedException ie) {}
+      }
     }
   }
 
@@ -426,24 +442,47 @@
   }
 
   /**
+   * Spawn a new thread to copy the child-jvm's stdout/stderr streams
+   * via a {@link TaskLog.Writer}
+   * 
+   * @param threadName thread name
+   * @param stream child-jvm's stdout/stderr stream
+   * @param writer {@link TaskLog.Writer} used to copy the child-jvm's data
+   * @return Return the newly created thread
    */
-  private void logStream(InputStream output, TaskLog.Writer taskLog) {
-    try {
-      byte[] buf = new byte[512];
-      int n = 0;
-      while ((n = output.read(buf, 0, buf.length)) != -1) {
-        // Write out to the task's log
-        taskLog.write(buf, 0, n);
-      }
-    } catch (IOException e) {
-      LOG.warn(t.getTaskId()+" Error reading child output", e);
-    } finally {
-      try {
-        output.close();
-      } catch (IOException e) {
-        LOG.warn(t.getTaskId()+" Error closing child output", e);
+  private Thread logStream(String threadName, 
+                           final InputStream stream, 
+                           final TaskLog.Writer taskLog) {
+    Thread loggerThread = new Thread() {
+      public void run() {
+        try {
+          byte[] buf = new byte[512];
+          while (!Thread.interrupted()) {
+            while (stream.available() > 0) {
+              int n = stream.read(buf, 0, buf.length);
+              taskLog.write(buf, 0, n);
+            }
+            Thread.sleep(1000);
+          }
+        } catch (IOException e) {
+          LOG.warn(t.getTaskId()+" Error reading child output", e);
+        } catch (InterruptedException e) {
+          // expected
+        } finally {
+          try {
+            stream.close();
+            taskLog.close();
+          } catch (IOException e) {
+            LOG.warn(t.getTaskId()+" Error closing child output", e);
+          }
+        }
       }
-    }
+    };
+    loggerThread.setName(threadName);
+    loggerThread.setDaemon(true);
+    loggerThread.start();
+    
+    return loggerThread;
   }
   
 }

Modified: lucene/hadoop/branches/branch-0.13/src/test/org/apache/hadoop/dfs/TestDFSFinalize.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.13/src/test/org/apache/hadoop/dfs/TestDFSFinalize.java?view=diff&rev=543225&r1=543224&r2=543225
==============================================================================
--- lucene/hadoop/branches/branch-0.13/src/test/org/apache/hadoop/dfs/TestDFSFinalize.java (original)
+++ lucene/hadoop/branches/branch-0.13/src/test/org/apache/hadoop/dfs/TestDFSFinalize.java Thu May 31 12:31:57 2007
@@ -23,11 +23,9 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.dfs.FSConstants.NodeType;
 import static org.apache.hadoop.dfs.FSConstants.NodeType.NAME_NODE;
 import static org.apache.hadoop.dfs.FSConstants.NodeType.DATA_NODE;
 import org.apache.hadoop.dfs.FSConstants.StartupOption;
-import org.apache.hadoop.fs.Path;
 
 /**
  * This test ensures the appropriate response from the system when 
@@ -82,7 +80,6 @@
    * This test attempts to finalize the NameNode and DataNode.
    */
   public void testFinalize() throws Exception {
-    File[] baseDirs;
     UpgradeUtilities.initialize();
     
     for (int numDirs = 1; numDirs <= 2; numDirs++) {

Modified: lucene/hadoop/branches/branch-0.13/src/test/org/apache/hadoop/dfs/UpgradeUtilities.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.13/src/test/org/apache/hadoop/dfs/UpgradeUtilities.java?view=diff&rev=543225&r1=543224&r2=543225
==============================================================================
--- lucene/hadoop/branches/branch-0.13/src/test/org/apache/hadoop/dfs/UpgradeUtilities.java (original)
+++ lucene/hadoop/branches/branch-0.13/src/test/org/apache/hadoop/dfs/UpgradeUtilities.java Thu May 31 12:31:57 2007
@@ -250,16 +250,26 @@
       LocalFileSystem localFS = FileSystem.getLocal(new Configuration());
       switch (nodeType) {
       case NAME_NODE:
-        localFS.copyToLocalFile(
-                                new Path(namenodeStorage.toString(), "current"),
+        localFS.copyToLocalFile(new Path(namenodeStorage.toString(), "current"),
                                 new Path(newDir.toString()),
                                 false);
+        Path newImgDir = new Path(newDir.getParent(), "image");
+        if (!localFS.exists(newImgDir))
+          localFS.copyToLocalFile(
+              new Path(namenodeStorage.toString(), "image"),
+              newImgDir,
+              false);
         break;
       case DATA_NODE:
-        localFS.copyToLocalFile(
-                                new Path(datanodeStorage.toString(), "current"),
+        localFS.copyToLocalFile(new Path(datanodeStorage.toString(), "current"),
                                 new Path(newDir.toString()),
                                 false);
+        Path newStorageFile = new Path(newDir.getParent(), "storage");
+        if (!localFS.exists(newStorageFile))
+          localFS.copyToLocalFile(
+              new Path(datanodeStorage.toString(), "storage"),
+              newStorageFile,
+              false);
         break;
       }
       retVal[i] = newDir;