You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by jg...@apache.org on 2010/11/16 23:23:53 UTC

svn commit: r1035841 - in /hadoop/hdfs/trunk: CHANGES.txt src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java

Author: jghoman
Date: Tue Nov 16 22:23:52 2010
New Revision: 1035841

URL: http://svn.apache.org/viewvc?rev=1035841&view=rev
Log:
HDFS-1071. savenamespace should write the fsimage to all configured fs.name.dir in parallel.  Contributed by Dmytro Molkov.

Modified:
    hadoop/hdfs/trunk/CHANGES.txt
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java

Modified: hadoop/hdfs/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/CHANGES.txt?rev=1035841&r1=1035840&r2=1035841&view=diff
==============================================================================
--- hadoop/hdfs/trunk/CHANGES.txt (original)
+++ hadoop/hdfs/trunk/CHANGES.txt Tue Nov 16 22:23:52 2010
@@ -200,6 +200,9 @@ Trunk (unreleased changes)
     HDFS-455. Make NN and DN handle in a intuitive way comma-separated 
     configuration strings. (Michele Catasta via eli)
 
+    HDFS-1071. savenamespace should write the fsimage to all configured 
+    fs.name.dir in parallel (Dmytro Molkov via jghoman)
+
   OPTIMIZATIONS
 
     HDFS-1140. Speedup INode.getPathComponents. (Dmytro Molkov via shv)

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java?rev=1035841&r1=1035840&r2=1035841&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java Tue Nov 16 22:23:52 2010
@@ -21,6 +21,7 @@ import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Iterator;
+import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -268,7 +269,7 @@ public class FSEditLog {
    *  except fsimage.processIOError)
    */
   synchronized void processIOError(
-      ArrayList<EditLogOutputStream> errorStreams,
+      List<EditLogOutputStream> errorStreams,
       boolean propagate) {
     
     if (errorStreams == null || errorStreams.size() == 0) {

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java?rev=1035841&r1=1035840&r2=1035841&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java Tue Nov 16 22:23:52 2010
@@ -37,6 +37,7 @@ import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -172,7 +173,13 @@ public class FSImage extends Storage {
   /**
    * Used for saving the image to disk
    */
-  static private final FsPermission FILE_PERM = new FsPermission((short)0);
+  static private final ThreadLocal<FsPermission> FILE_PERM =
+                          new ThreadLocal<FsPermission>() {
+                            @Override
+                            protected FsPermission initialValue() {
+                              return new FsPermission((short) 0);
+                            }
+                          };
   static private final byte[] PATH_SEPARATOR = DFSUtil.string2Bytes(Path.SEPARATOR);
 
   /**
@@ -850,30 +857,34 @@ public class FSImage extends Storage {
    * @param propagate - flag, if set - then call corresponding EditLog stream's 
    * processIOError function.
    */
-  void processIOError(ArrayList<StorageDirectory> sds, boolean propagate) {
+  void processIOError(List<StorageDirectory> sds, boolean propagate) {
     ArrayList<EditLogOutputStream> al = null;
-    for(StorageDirectory sd:sds) {
-      // if has a stream assosiated with it - remove it too..
-      if (propagate && sd.getStorageDirType().isOfType(NameNodeDirType.EDITS)) {
-        EditLogOutputStream eStream = editLog.getEditsStream(sd);
-        if(al == null) al = new ArrayList<EditLogOutputStream>(1);
-        al.add(eStream);
-      }
-      
-      for (Iterator<StorageDirectory> it = dirIterator(); it.hasNext();) {
-        StorageDirectory sd1 = it.next();
-        if (sd.equals(sd1)) {
-          //add storage to the removed list
-          LOG.warn("FSImage:processIOError: removing storage: "
-              + sd.getRoot().getPath());
-          try {
-            sd1.unlock(); //unlock before removing (in case it will be restored)
-          } catch (Exception e) {
-            // nothing
+    synchronized (sds) {
+      for (StorageDirectory sd : sds) {
+        // if has a stream assosiated with it - remove it too..
+        if (propagate && sd.getStorageDirType().isOfType(NameNodeDirType.EDITS)) {
+          EditLogOutputStream eStream = editLog.getEditsStream(sd);
+          if (al == null)
+            al = new ArrayList<EditLogOutputStream>(1);
+          al.add(eStream);
+        }
+
+        for (Iterator<StorageDirectory> it = dirIterator(); it.hasNext();) {
+          StorageDirectory sd1 = it.next();
+          if (sd.equals(sd1)) {
+            // add storage to the removed list
+            LOG.warn("FSImage:processIOError: removing storage: "
+                + sd.getRoot().getPath());
+            try {
+              sd1.unlock(); // unlock before removing (in case it will be
+                            // restored)
+            } catch (Exception e) {
+              // nothing
+            }
+            removedStorageDirs.add(sd1);
+            it.remove();
+            break;
           }
-          removedStorageDirs.add(sd1);
-          it.remove();
-          break;
         }
       }
     }
@@ -1409,6 +1420,53 @@ public class FSImage extends Storage {
     this.imageDigest = digest;
   }
   /**
+   * FSImageSaver is being run in a separate thread when saving
+   * FSImage. There is one thread per each copy of the image.
+   *
+   * FSImageSaver assumes that it was launched from a thread that holds
+   * FSNamesystem lock and waits for the execution of FSImageSaver thread
+   * to finish.
+   * This way we are guraranteed that the namespace is not being updated
+   * while multiple instances of FSImageSaver are traversing it
+   * and writing it out.
+   */
+  private class FSImageSaver implements Runnable {
+    private StorageDirectory sd;
+    private List<StorageDirectory> errorSDs;
+    
+    FSImageSaver(StorageDirectory sd, List<StorageDirectory> errorSDs) {
+      this.sd = sd;
+      this.errorSDs = errorSDs;
+    }
+    
+    public void run() {
+      try {
+        saveCurrent(sd);
+      } catch (IOException ie) {
+        LOG.error("Unable to save image for " + sd.getRoot(), ie);
+        errorSDs.add(sd);              
+      }
+    }
+    
+    public String toString() {
+      return "FSImageSaver for " + sd.getRoot() +
+             " of type " + sd.getStorageDirType();
+    }
+  }
+  
+  private void waitForThreads(List<Thread> threads) {
+    for (Thread thread : threads) {
+      while (thread.isAlive()) {
+        try {
+          thread.join();
+        } catch (InterruptedException iex) {
+          LOG.error("Caught exception while waiting for thread " +
+                    thread.getName() + " to finish. Retrying join");
+        }        
+      }
+    }
+  }
+  /**
    * Save the contents of the FS image and create empty edits.
    * 
    * In order to minimize the recovery effort in case of failure during
@@ -1428,7 +1486,8 @@ public class FSImage extends Storage {
     editLog.close();
     if(renewCheckpointTime)
       this.checkpointTime = now();
-    ArrayList<StorageDirectory> errorSDs = new ArrayList<StorageDirectory>();
+    List<StorageDirectory> errorSDs =
+      Collections.synchronizedList(new ArrayList<StorageDirectory>());
 
     // mv current -> lastcheckpoint.tmp
     for (Iterator<StorageDirectory> it = dirIterator(); it.hasNext();) {
@@ -1441,17 +1500,18 @@ public class FSImage extends Storage {
       }
     }
 
+    List<Thread> saveThreads = new ArrayList<Thread>();
     // save images into current
     for (Iterator<StorageDirectory> it = dirIterator(NameNodeDirType.IMAGE);
                                                               it.hasNext();) {
       StorageDirectory sd = it.next();
-      try {
-        saveCurrent(sd);
-      } catch(IOException ie) {
-        LOG.error("Unable to save image for " + sd.getRoot(), ie);
-        errorSDs.add(sd);
-      }
+      FSImageSaver saver = new FSImageSaver(sd, errorSDs);
+      Thread saveThread = new Thread(saver, saver.toString());
+      saveThreads.add(saveThread);
+      saveThread.start();
     }
+    waitForThreads(saveThreads);
+    saveThreads.clear();
 
     // -NOTE-
     // If NN has image-only and edits-only storage directories and fails here 
@@ -1462,18 +1522,17 @@ public class FSImage extends Storage {
     // to the old state contained in their lastcheckpoint.tmp.
     // The edits directories should be discarded during startup because their
     // checkpointTime is older than that of image directories.
-
     // recreate edits in current
     for (Iterator<StorageDirectory> it = dirIterator(NameNodeDirType.EDITS);
                                                               it.hasNext();) {
-      StorageDirectory sd = it.next();
-      try {
-        saveCurrent(sd);
-      } catch(IOException ie) {
-        LOG.error("Unable to save edits for " + sd.getRoot(), ie);
-        errorSDs.add(sd);
-      }
+      final StorageDirectory sd = it.next();
+      FSImageSaver saver = new FSImageSaver(sd, errorSDs);
+      Thread saveThread = new Thread(saver, saver.toString());
+      saveThreads.add(saveThread);
+      saveThread.start();
     }
+    waitForThreads(saveThreads);
+
     // mv lastcheckpoint.tmp -> previous.checkpoint
     for (Iterator<StorageDirectory> it = dirIterator(); it.hasNext();) {
       StorageDirectory sd = it.next();
@@ -1605,6 +1664,7 @@ public class FSImage extends Storage {
     int nameLen = name.position();
     out.writeShort(nameLen);
     out.write(name.array(), name.arrayOffset(), nameLen);
+    FsPermission filePerm = FILE_PERM.get();
     if (node.isDirectory()) {
       out.writeShort(0);  // replication
       out.writeLong(node.getModificationTime());
@@ -1613,10 +1673,10 @@ public class FSImage extends Storage {
       out.writeInt(-1);   // # of blocks
       out.writeLong(node.getNsQuota());
       out.writeLong(node.getDsQuota());
-      FILE_PERM.fromShort(node.getFsPermissionShort());
+      filePerm.fromShort(node.getFsPermissionShort());
       PermissionStatus.write(out, node.getUserName(),
                              node.getGroupName(),
-                             FILE_PERM);
+                             filePerm);
     } else if (node.isLink()) {
       out.writeShort(0);  // replication
       out.writeLong(0);   // modification time
@@ -1624,10 +1684,10 @@ public class FSImage extends Storage {
       out.writeLong(0);   // preferred block size
       out.writeInt(-2);   // # of blocks
       Text.writeString(out, ((INodeSymlink)node).getLinkValue());
-      FILE_PERM.fromShort(node.getFsPermissionShort());
+      filePerm.fromShort(node.getFsPermissionShort());
       PermissionStatus.write(out, node.getUserName(),
                              node.getGroupName(),
-                             FILE_PERM);      
+                             filePerm);      
     } else {
       INodeFile fileINode = (INodeFile)node;
       out.writeShort(fileINode.getReplication());
@@ -1638,10 +1698,10 @@ public class FSImage extends Storage {
       out.writeInt(blocks.length);
       for (Block blk : blocks)
         blk.write(out);
-      FILE_PERM.fromShort(fileINode.getFsPermissionShort());
+      filePerm.fromShort(fileINode.getFsPermissionShort());
       PermissionStatus.write(out, fileINode.getUserName(),
                              fileINode.getGroupName(),
-                             FILE_PERM);
+                             filePerm);
     }
   }