You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by sz...@apache.org on 2012/11/05 01:40:55 UTC

svn commit: r1405683 - in /hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs: ./ src/main/java/org/apache/hadoop/hdfs/ src/main/java/org/apache/hadoop/hdfs/server/namenode/ src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/ s...

Author: szetszwo
Date: Mon Nov  5 00:40:54 2012
New Revision: 1405683

URL: http://svn.apache.org/viewvc?rev=1405683&view=rev
Log:
HDFS-4149. Implement the disallowSnapshot(..) in FSNamesystem and add resetSnapshottable(..) to SnapshotManager.

Added:
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/Snapshot.java
Modified:
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-2802.txt
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeDirectorySnapshottable.java
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/SnapshotManager.java
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSDirectory.java
    hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestSnapshotPathINodes.java

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-2802.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-2802.txt?rev=1405683&r1=1405682&r2=1405683&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-2802.txt (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-2802.txt Mon Nov  5 00:40:54 2012
@@ -50,3 +50,6 @@ Branch-2802 Snapshot (Unreleased)
 
   HDFS-4146. Use getter and setter in INodeFileWithLink to access blocks and
   initialize root directory as snapshottable. (szetszwo)
+
+  HDFS-4149. Implement the disallowSnapshot(..) in FSNamesystem and add
+  resetSnapshottable(..) to SnapshotManager. (szetszwo)

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java?rev=1405683&r1=1405682&r2=1405683&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java Mon Nov  5 00:40:54 2012
@@ -872,28 +872,26 @@ public class DistributedFileSystem exten
   /**
    * Allow snapshot on a directory.
    * 
-   * @param snapshotRoot the directory to be snapped
+   * @param path the directory to be taken snapshots
    * @throws IOException
    */
-  public void allowSnapshot(String snapshotRoot)
-      throws IOException {
-    dfs.allowSnapshot(snapshotRoot);
+  public void allowSnapshot(String path) throws IOException {
+    dfs.allowSnapshot(path);
   }
   
   /**
    * Disallow snapshot on a directory.
    * 
-   * @param snapshotRoot the directory to be snapped
+   * @param path the snapshottable directory.
    * @throws IOException
    */
-  public void disallowSnapshot(String snapshotRoot)
-      throws IOException {
-    dfs.disallowSnapshot(snapshotRoot);
+  public void disallowSnapshot(String path) throws IOException {
+    dfs.disallowSnapshot(path);
   }
   
   @Override
-  public void createSnapshot(String snapshotName, String snapshotRoot)
+  public void createSnapshot(String snapshotName, String path)
       throws IOException {
-    dfs.createSnapshot(snapshotName, snapshotRoot);
+    dfs.createSnapshot(snapshotName, path);
   }
 }

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1405683&r1=1405682&r2=1405683&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Mon Nov  5 00:40:54 2012
@@ -5543,22 +5543,38 @@ public class FSNamesystem implements Nam
     }
     getEditLog().logSync();
     
+    //TODO: need to update metrics in corresponding SnapshotManager method 
+
     if (auditLog.isInfoEnabled() && isExternalInvocation()) {
       logAuditEvent(UserGroupInformation.getCurrentUser(), getRemoteIp(),
           "allowSnapshot", path, null, null);
     }
   }
   
-  // Disallow snapshot on a directory.
-  @VisibleForTesting
-  public void disallowSnapshot(String snapshotRoot)
+  /** Disallow snapshot on a directory. */
+  public void disallowSnapshot(String path)
       throws SafeModeException, IOException {
-    // TODO: implement, also need to update metrics in corresponding
-    // SnapshotManager method 
+    writeLock();
+    try {
+      checkOperation(OperationCategory.WRITE);
+      if (isInSafeMode()) {
+        throw new SafeModeException("Cannot disallow snapshot for " + path,
+            safeMode);
+      }
+      checkOwner(path);
+
+      snapshotManager.resetSnapshottable(path);
+      getEditLog().logDisallowSnapshot(path);
+    } finally {
+      writeUnlock();
+    }
+    getEditLog().logSync();
+
+    //TODO: need to update metrics in corresponding SnapshotManager method 
     
     if (auditLog.isInfoEnabled() && isExternalInvocation()) {
       logAuditEvent(UserGroupInformation.getCurrentUser(), getRemoteIp(),
-          "disallowSnapshot", snapshotRoot, null, null);
+          "disallowSnapshot", path, null, null);
     }
   }
   

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java?rev=1405683&r1=1405682&r2=1405683&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java Mon Nov  5 00:40:54 2012
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.io.PrintWriter;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.Iterator;
 import java.util.List;
 
 import org.apache.hadoop.fs.UnresolvedLinkException;
@@ -30,8 +31,8 @@ import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.UnresolvedPathException;
-import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeDirectoryWithSnapshot;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeDirectorySnapshottable;
+import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeDirectoryWithSnapshot;
 
 import com.google.common.annotations.VisibleForTesting;
 
@@ -248,7 +249,7 @@ public class INodeDirectory extends INod
         }
         // Resolve snapshot root
         curNode = ((INodeDirectorySnapshottable) parentDir)
-            .getSnapshotINode(components[count + 1]);
+            .getSnapshotRoot(components[count + 1]);
         if (index >= -1) {
           existing.snapshotRootIndex = existing.size;
         }
@@ -601,20 +602,14 @@ public class INodeDirectory extends INod
    */
   @VisibleForTesting
   protected static void dumpTreeRecursively(PrintWriter out,
-      StringBuilder prefix, List<? extends INode> subs) {
-    prefix.append(DUMPTREE_EXCEPT_LAST_ITEM);
-    if (subs != null && subs.size() != 0) {
-      int i = 0;
-      for(; i < subs.size() - 1; i++) {
-        subs.get(i).dumpTreeRecursively(out, prefix);
+      StringBuilder prefix, Iterable<? extends INode> subs) {
+    if (subs != null) {
+      for(final Iterator<? extends INode> i = subs.iterator(); i.hasNext();) {
+        final INode inode = i.next();
+        prefix.append(i.hasNext()? DUMPTREE_EXCEPT_LAST_ITEM: DUMPTREE_LAST_ITEM);
+        inode.dumpTreeRecursively(out, prefix);
         prefix.setLength(prefix.length() - 2);
-        prefix.append(DUMPTREE_EXCEPT_LAST_ITEM);
       }
-
-      prefix.setLength(prefix.length() - 2);
-      prefix.append(DUMPTREE_LAST_ITEM);
-      subs.get(i).dumpTreeRecursively(out, prefix);
     }
-    prefix.setLength(prefix.length() - 2);
   }
 }

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeDirectorySnapshottable.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeDirectorySnapshottable.java?rev=1405683&r1=1405682&r2=1405683&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeDirectorySnapshottable.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeDirectorySnapshottable.java Mon Nov  5 00:40:54 2012
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.io.PrintWriter;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.Iterator;
 import java.util.List;
 
 import org.apache.hadoop.HadoopIllegalArgumentException;
@@ -30,7 +31,12 @@ import org.apache.hadoop.hdfs.server.nam
 import org.apache.hadoop.hdfs.server.namenode.INodeDirectoryWithQuota;
 import org.apache.hadoop.util.Time;
 
-/** Directories where taking snapshots is allowed. */
+/**
+ * Directories where taking snapshots is allowed.
+ * 
+ * Like other {@link INode} subclasses, this class is synchronized externally
+ * by the namesystem and FSDirectory locks.
+ */
 @InterfaceAudience.Private
 public class INodeDirectorySnapshottable extends INodeDirectoryWithQuota {
   static public INodeDirectorySnapshottable newInstance(
@@ -51,43 +57,48 @@ public class INodeDirectorySnapshottable
       INode inode, String src) throws IOException {
     final INodeDirectory dir = INodeDirectory.valueOf(inode, src);
     if (!dir.isSnapshottable()) {
-      throw new SnapshotException(src + " is not a snapshottable directory.");
+      throw new SnapshotException(
+          "Directory is not a snapshottable directory: " + src);
     }
     return (INodeDirectorySnapshottable)dir;
   }
 
-  /** A list of snapshots of this directory. */
-  private final List<INodeDirectoryWithSnapshot> snapshots
-      = new ArrayList<INodeDirectoryWithSnapshot>();
+  /** Snapshots of this directory in ascending order of snapshot id. */
+  private final List<Snapshot> snapshots = new ArrayList<Snapshot>();
+
+  /** Number of snapshots allowed. */
+  private int snapshotQuota;
+
+  private INodeDirectorySnapshottable(long nsQuota, long dsQuota,
+      INodeDirectory dir, final int snapshotQuota) {
+    super(nsQuota, dsQuota, dir);
+    setSnapshotQuota(snapshotQuota);
+  }
+  
+  int getNumSnapshots() {
+    return snapshots.size();
+  }
   
-  public INode getSnapshotINode(byte[] name) {
+  /** @return the root directory of a snapshot. */
+  public INodeDirectory getSnapshotRoot(byte[] snapshotName) {
     if (snapshots == null || snapshots.size() == 0) {
       return null;
     }
-    int low = Collections.binarySearch(snapshots, name);
+    int low = Collections.binarySearch(snapshots, snapshotName);
     if (low >= 0) {
-      return snapshots.get(low);
+      return snapshots.get(low).getRoot();
     }
     return null;
   }
-  
-  /** Number of snapshots is allowed. */
-  private int snapshotQuota;
-
-  private INodeDirectorySnapshottable(long nsQuota, long dsQuota,
-      INodeDirectory dir, final int snapshotQuota) {
-    super(nsQuota, dsQuota, dir);
-    setSnapshotQuota(snapshotQuota);
-  }
 
   public int getSnapshotQuota() {
     return snapshotQuota;
   }
 
   public void setSnapshotQuota(int snapshotQuota) {
-    if (snapshotQuota <= 0) {
+    if (snapshotQuota < 0) {
       throw new HadoopIllegalArgumentException(
-          "Cannot set snapshot quota to " + snapshotQuota + " <= 0");
+          "Cannot set snapshot quota to " + snapshotQuota + " < 0");
     }
     this.snapshotQuota = snapshotQuota;
   }
@@ -98,8 +109,7 @@ public class INodeDirectorySnapshottable
   }
 
   /** Add a snapshot root under this directory. */
-  INodeDirectoryWithSnapshot addSnapshotRoot(final String name
-      ) throws SnapshotException {
+  void addSnapshot(final Snapshot s) throws SnapshotException {
     //check snapshot quota
     if (snapshots.size() + 1 > snapshotQuota) {
       throw new SnapshotException("Failed to add snapshot: there are already "
@@ -107,14 +117,12 @@ public class INodeDirectorySnapshottable
           + snapshotQuota);
     }
 
-    final INodeDirectoryWithSnapshot r = new INodeDirectoryWithSnapshot(name, this);
-    snapshots.add(r);
+    snapshots.add(s);
 
     //set modification time
     final long timestamp = Time.now();
-    r.setModificationTime(timestamp);
+    s.getRoot().setModificationTime(timestamp);
     setModificationTime(timestamp);
-    return r;
   }
   
   @Override
@@ -126,6 +134,28 @@ public class INodeDirectorySnapshottable
     out.print(snapshots.size() <= 1 ? " snapshot of " : " snapshots of ");
     out.println(getLocalName());
 
-    dumpTreeRecursively(out, prefix, snapshots);
+    dumpTreeRecursively(out, prefix, new Iterable<INodeDirectoryWithSnapshot>() {
+      @Override
+      public Iterator<INodeDirectoryWithSnapshot> iterator() {
+        return new Iterator<INodeDirectoryWithSnapshot>() {
+          final Iterator<Snapshot> i = snapshots.iterator();
+
+          @Override
+          public boolean hasNext() {
+            return i.hasNext();
+          }
+
+          @Override
+          public INodeDirectoryWithSnapshot next() {
+            return i.next().getRoot();
+          }
+
+          @Override
+          public void remove() {
+            throw new UnsupportedOperationException();
+          }
+        };
+      }
+    });
   }
 }

Added: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/Snapshot.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/Snapshot.java?rev=1405683&view=auto
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/Snapshot.java (added)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/Snapshot.java Mon Nov  5 00:40:54 2012
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode.snapshot;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+/** Snapshot of a sub-tree in the namesystem. */
+@InterfaceAudience.Private
+public class Snapshot implements Comparable<byte[]> {
+  /** Snapshot ID. */
+  private final int id;
+  /** The root directory of the snapshot. */
+  private final INodeDirectoryWithSnapshot root;
+
+  Snapshot(int id, String name, INodeDirectorySnapshottable dir) {
+    this.id = id;
+    this.root = new INodeDirectoryWithSnapshot(name, dir);
+  }
+
+  INodeDirectoryWithSnapshot getRoot() {
+    return root;
+  }
+
+  @Override
+  public int compareTo(byte[] bytes) {
+    return root.compareTo(bytes);
+  }
+}

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/SnapshotManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/SnapshotManager.java?rev=1405683&r1=1405682&r2=1405683&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/SnapshotManager.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/SnapshotManager.java Mon Nov  5 00:40:54 2012
@@ -20,7 +20,7 @@ package org.apache.hadoop.hdfs.server.na
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.hadoop.hdfs.server.namenode.FSDirectory;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
@@ -34,8 +34,11 @@ import org.apache.hadoop.hdfs.server.nam
 public class SnapshotManager implements SnapshotStats {
   private final FSNamesystem namesystem;
   private final FSDirectory fsdir;
-  private AtomicLong numSnapshottableDirs = new AtomicLong();
-  private AtomicLong numSnapshots = new AtomicLong();
+
+  private final AtomicInteger numSnapshottableDirs = new AtomicInteger();
+  private final AtomicInteger numSnapshots = new AtomicInteger();
+
+  private int snapshotID = 0;
   
   /** All snapshottable directories in the namesystem. */
   private final List<INodeDirectorySnapshottable> snapshottables
@@ -49,31 +52,47 @@ public class SnapshotManager implements 
 
   /**
    * Set the given directory as a snapshottable directory.
-   * If the path is already a snapshottable directory, this is a no-op.
-   * Otherwise, the {@link INodeDirectory} of the path is replaced by an 
-   * {@link INodeDirectorySnapshottable}.
+   * If the path is already a snapshottable directory, update the quota.
    */
   public void setSnapshottable(final String path, final int snapshotQuota
       ) throws IOException {
-    namesystem.writeLock();
-    try {
-      final INodeDirectory d = INodeDirectory.valueOf(fsdir.getINode(path), path);
-      if (d.isSnapshottable()) {
-        //The directory is already a snapshottable directory. 
-        return;
-      }
-
-      final INodeDirectorySnapshottable s
-          = INodeDirectorySnapshottable.newInstance(d, snapshotQuota);
-      fsdir.replaceINodeDirectory(path, d, s);
-      snapshottables.add(s);
-    } finally {
-      namesystem.writeUnlock();
+    final INodeDirectory d = INodeDirectory.valueOf(fsdir.getINode(path), path);
+    if (d.isSnapshottable()) {
+      //The directory is already a snapshottable directory.
+      ((INodeDirectorySnapshottable)d).setSnapshotQuota(snapshotQuota);
+      return;
     }
+
+    final INodeDirectorySnapshottable s
+        = INodeDirectorySnapshottable.newInstance(d, snapshotQuota);
+    fsdir.replaceINodeDirectory(path, d, s);
+    snapshottables.add(s);
+
     numSnapshottableDirs.getAndIncrement();
   }
 
   /**
+   * Set the given snapshottable directory to non-snapshottable.
+   * 
+   * @throws SnapshotException if there are snapshots in the directory.
+   */
+  public void resetSnapshottable(final String path
+      ) throws IOException {
+    final INodeDirectorySnapshottable s = INodeDirectorySnapshottable.valueOf(
+        fsdir.getINode(path), path);
+    if (s.getNumSnapshots() > 0) {
+      throw new SnapshotException("The directory " + path + " has snapshot(s). "
+          + "Please redo the operation after removing all the snapshots.");
+    }
+
+    final INodeDirectory d = new INodeDirectory(s);
+    fsdir.replaceINodeDirectory(path, s, d);
+    snapshottables.remove(s);
+
+    numSnapshottableDirs.getAndDecrement();
+  }
+
+  /**
    * Create a snapshot of the given path.
    * 
    * @param snapshotName The name of the snapshot.
@@ -81,7 +100,18 @@ public class SnapshotManager implements 
    */
   public void createSnapshot(final String snapshotName, final String path
       ) throws IOException {
-    new SnapshotCreation(path).run(snapshotName);
+    // Find the source root directory path where the snapshot is taken.
+    final INodeDirectorySnapshottable srcRoot
+        = INodeDirectorySnapshottable.valueOf(fsdir.getINode(path), path);
+
+    synchronized(this) {
+      final Snapshot s = new Snapshot(snapshotID, snapshotName, srcRoot); 
+      srcRoot.addSnapshot(s);
+      new SnapshotCreation().processRecursively(srcRoot, s.getRoot());
+      
+      //create success, update id
+      snapshotID++;
+    }
     numSnapshots.getAndIncrement();
   }
   
@@ -92,22 +122,6 @@ public class SnapshotManager implements 
    * where N = # files + # directories + # symlinks. 
    */
   class SnapshotCreation {
-    /** The source root directory path where the snapshot is taken. */
-    final INodeDirectorySnapshottable srcRoot;
-    
-    /** 
-     * Constructor.
-     * @param path The path must be a snapshottable directory.
-     */
-    private SnapshotCreation(final String path) throws IOException {
-      srcRoot = INodeDirectorySnapshottable.valueOf(fsdir.getINode(path), path);
-    }
-    
-    void run(final String name) throws IOException {
-      final INodeDirectoryWithSnapshot root = srcRoot.addSnapshotRoot(name);
-      processRecursively(srcRoot, root);
-    }
-
     /** Process snapshot creation recursively. */
     private void processRecursively(final INodeDirectory srcDir,
         final INodeDirectory dstDir) throws IOException {

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSDirectory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSDirectory.java?rev=1405683&r1=1405682&r2=1405683&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSDirectory.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSDirectory.java Mon Nov  5 00:40:54 2012
@@ -107,9 +107,11 @@ public class TestFSDirectory {
 
     for(; (line = in.readLine()) != null; ) {
       line = line.trim();
-      Assert.assertTrue(line.startsWith(INodeDirectory.DUMPTREE_LAST_ITEM)
-          || line.startsWith(INodeDirectory.DUMPTREE_EXCEPT_LAST_ITEM));
-      checkClassName(line);
+      if (!line.contains("snapshot")) {
+        Assert.assertTrue(line.startsWith(INodeDirectory.DUMPTREE_LAST_ITEM)
+            || line.startsWith(INodeDirectory.DUMPTREE_EXCEPT_LAST_ITEM));
+        checkClassName(line);
+      }
     }
 
     LOG.info("Create a new file " + file4);
@@ -134,8 +136,7 @@ public class TestFSDirectory {
     int i = line.lastIndexOf('(');
     int j = line.lastIndexOf('@');
     final String classname = line.substring(i+1, j);
-    Assert.assertTrue(classname.equals(INodeFile.class.getSimpleName())
-        || classname.equals(INodeDirectory.class.getSimpleName())
-        || classname.equals(INodeDirectoryWithQuota.class.getSimpleName()));
+    Assert.assertTrue(classname.startsWith(INodeFile.class.getSimpleName())
+        || classname.startsWith(INodeDirectory.class.getSimpleName()));
   }
 }

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestSnapshotPathINodes.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestSnapshotPathINodes.java?rev=1405683&r1=1405682&r2=1405683&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestSnapshotPathINodes.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestSnapshotPathINodes.java Mon Nov  5 00:40:54 2012
@@ -89,8 +89,17 @@ public class TestSnapshotPathINodes {
 
     // After a directory is snapshottable
     hdfs.allowSnapshot(path);
-    final INode after = fsdir.getINode(path);
-    Assert.assertTrue(after instanceof INodeDirectorySnapshottable);
+    {
+      final INode after = fsdir.getINode(path);
+      Assert.assertTrue(after instanceof INodeDirectorySnapshottable);
+    }
+    
+    hdfs.disallowSnapshot(path);
+    {
+      final INode after = fsdir.getINode(path);
+      Assert.assertTrue(after instanceof INodeDirectory);
+      Assert.assertFalse(after instanceof INodeDirectorySnapshottable);
+    }
   }
   
   /**