You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by cm...@apache.org on 2014/08/20 01:50:25 UTC

svn commit: r1619012 [20/35] - in /hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project: hadoop-hdfs-httpfs/ hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/client/ hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/server/ hadoop...

Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeHttpServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeHttpServer.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeHttpServer.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeHttpServer.java Tue Aug 19 23:49:39 2014
@@ -108,6 +108,16 @@ public class NameNodeHttpServer {
         DFSConfigKeys.DFS_NAMENODE_HTTPS_ADDRESS_DEFAULT);
     InetSocketAddress httpsAddr = NetUtils.createSocketAddr(httpsAddrString);
 
+    if (httpsAddr != null) {
+      // If DFS_NAMENODE_HTTPS_BIND_HOST_KEY exists then it overrides the
+      // host name portion of DFS_NAMENODE_HTTPS_ADDRESS_KEY.
+      final String bindHost =
+          conf.getTrimmed(DFSConfigKeys.DFS_NAMENODE_HTTPS_BIND_HOST_KEY);
+      if (bindHost != null && !bindHost.isEmpty()) {
+        httpsAddr = new InetSocketAddress(bindHost, httpsAddr.getPort());
+      }
+    }
+
     HttpServer2.Builder builder = DFSUtil.httpServerTemplateForNNAndJN(conf,
         httpAddr, httpsAddr, "hdfs",
         DFSConfigKeys.DFS_NAMENODE_KERBEROS_INTERNAL_SPNEGO_PRINCIPAL_KEY,

Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeLayoutVersion.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeLayoutVersion.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeLayoutVersion.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeLayoutVersion.java Tue Aug 19 23:49:39 2014
@@ -64,7 +64,8 @@ public class NameNodeLayoutVersion { 
    */
   public static enum Feature implements LayoutFeature {
     ROLLING_UPGRADE(-55, -53, "Support rolling upgrade", false),
-    EDITLOG_LENGTH(-56, "Add length field to every edit log op");
+    EDITLOG_LENGTH(-56, "Add length field to every edit log op"),
+    XATTRS(-57, "Extended attributes");
     
     private final FeatureInfo info;
 

Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java Tue Aug 19 23:49:39 2014
@@ -49,9 +49,12 @@ import org.apache.hadoop.fs.Options;
 import org.apache.hadoop.fs.ParentNotDirectoryException;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.UnresolvedLinkException;
+import org.apache.hadoop.fs.XAttr;
+import org.apache.hadoop.fs.XAttrSetFlag;
 import org.apache.hadoop.fs.permission.AclEntry;
 import org.apache.hadoop.fs.permission.AclStatus;
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.fs.permission.PermissionStatus;
 import org.apache.hadoop.ha.HAServiceStatus;
 import org.apache.hadoop.ha.HealthCheckFailedException;
@@ -113,6 +116,7 @@ import org.apache.hadoop.hdfs.server.pro
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
 import org.apache.hadoop.hdfs.server.protocol.FinalizeCommand;
 import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand;
@@ -130,6 +134,8 @@ import org.apache.hadoop.ipc.ProtobufRpc
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.ipc.WritableRpcEngine;
+import org.apache.hadoop.ipc.RefreshRegistry;
+import org.apache.hadoop.ipc.RefreshResponse;
 import org.apache.hadoop.net.Node;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.Groups;
@@ -145,6 +151,9 @@ import org.apache.hadoop.security.protoc
 import org.apache.hadoop.ipc.protocolPB.RefreshCallQueueProtocolPB;
 import org.apache.hadoop.ipc.protocolPB.RefreshCallQueueProtocolServerSideTranslatorPB;
 import org.apache.hadoop.ipc.proto.RefreshCallQueueProtocolProtos.RefreshCallQueueProtocolService;
+import org.apache.hadoop.ipc.protocolPB.GenericRefreshProtocolPB;
+import org.apache.hadoop.ipc.protocolPB.GenericRefreshProtocolServerSideTranslatorPB;
+import org.apache.hadoop.ipc.proto.GenericRefreshProtocolProtos.GenericRefreshProtocolService;
 import org.apache.hadoop.security.token.SecretManager.InvalidToken;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.tools.proto.GetUserMappingsProtocolProtos.GetUserMappingsProtocolService;
@@ -227,6 +236,11 @@ class NameNodeRpcServer implements Namen
     BlockingService refreshCallQueueService = RefreshCallQueueProtocolService
         .newReflectiveBlockingService(refreshCallQueueXlator);
 
+    GenericRefreshProtocolServerSideTranslatorPB genericRefreshXlator =
+        new GenericRefreshProtocolServerSideTranslatorPB(this);
+    BlockingService genericRefreshService = GenericRefreshProtocolService
+        .newReflectiveBlockingService(genericRefreshXlator);
+
     GetUserMappingsProtocolServerSideTranslatorPB getUserMappingXlator = 
         new GetUserMappingsProtocolServerSideTranslatorPB(this);
     BlockingService getUserMappingService = GetUserMappingsProtocolService
@@ -276,6 +290,8 @@ class NameNodeRpcServer implements Namen
       // We support Refreshing call queue here in case the client RPC queue is full
       DFSUtil.addPBProtocol(conf, RefreshCallQueueProtocolPB.class,
           refreshCallQueueService, serviceRpcServer);
+      DFSUtil.addPBProtocol(conf, GenericRefreshProtocolPB.class,
+          genericRefreshService, serviceRpcServer);
       DFSUtil.addPBProtocol(conf, GetUserMappingsProtocolPB.class, 
           getUserMappingService, serviceRpcServer);
   
@@ -320,6 +336,8 @@ class NameNodeRpcServer implements Namen
         refreshUserMappingService, clientRpcServer);
     DFSUtil.addPBProtocol(conf, RefreshCallQueueProtocolPB.class,
         refreshCallQueueService, clientRpcServer);
+    DFSUtil.addPBProtocol(conf, GenericRefreshProtocolPB.class,
+        genericRefreshService, clientRpcServer);
     DFSUtil.addPBProtocol(conf, GetUserMappingsProtocolPB.class, 
         getUserMappingService, clientRpcServer);
 
@@ -370,6 +388,12 @@ class NameNodeRpcServer implements Namen
     return clientRpcServer;
   }
   
+  /** Allow access to the service RPC server for testing */
+  @VisibleForTesting
+  RPC.Server getServiceRpcServer() {
+    return serviceRpcServer;
+  }
+  
   /**
    * Start client and service RPC servers.
    */
@@ -594,13 +618,15 @@ class NameNodeRpcServer implements Namen
   }
 
   @Override // ClientProtocol
-  public LocatedBlock getAdditionalDatanode(final String src, final ExtendedBlock blk,
+  public LocatedBlock getAdditionalDatanode(final String src,
+      final long fileId, final ExtendedBlock blk,
       final DatanodeInfo[] existings, final String[] existingStorageIDs,
       final DatanodeInfo[] excludes,
       final int numAdditionalNodes, final String clientName
       ) throws IOException {
     if (LOG.isDebugEnabled()) {
       LOG.debug("getAdditionalDatanode: src=" + src
+          + ", fileId=" + fileId
           + ", blk=" + blk
           + ", existings=" + Arrays.asList(existings)
           + ", excludes=" + Arrays.asList(excludes)
@@ -617,20 +643,20 @@ class NameNodeRpcServer implements Namen
         excludeSet.add(node);
       }
     }
-    return namesystem.getAdditionalDatanode(src, blk, existings,
+    return namesystem.getAdditionalDatanode(src, fileId, blk, existings,
         existingStorageIDs, excludeSet, numAdditionalNodes, clientName);
   }
   /**
    * The client needs to give up on the block.
    */
   @Override // ClientProtocol
-  public void abandonBlock(ExtendedBlock b, String src, String holder)
-      throws IOException {
+  public void abandonBlock(ExtendedBlock b, long fileId, String src,
+        String holder) throws IOException {
     if(stateChangeLog.isDebugEnabled()) {
       stateChangeLog.debug("*BLOCK* NameNode.abandonBlock: "
           +b+" of file "+src);
     }
-    if (!namesystem.abandonBlock(b, src, holder)) {
+    if (!namesystem.abandonBlock(b, fileId, src, holder)) {
       throw new IOException("Cannot abandon block during write to " + src);
     }
   }
@@ -806,12 +832,24 @@ class NameNodeRpcServer implements Namen
   throws IOException {
     DatanodeInfo results[] = namesystem.datanodeReport(type);
     if (results == null ) {
-      throw new IOException("Cannot find datanode report");
+      throw new IOException("Failed to get datanode report for " + type
+          + " datanodes.");
     }
     return results;
   }
     
   @Override // ClientProtocol
+  public DatanodeStorageReport[] getDatanodeStorageReport(
+      DatanodeReportType type) throws IOException {
+    final DatanodeStorageReport[] reports = namesystem.getDatanodeStorageReport(type);
+    if (reports == null ) {
+      throw new IOException("Failed to get datanode storage report for " + type
+          + " datanodes.");
+    }
+    return reports;
+  }
+
+  @Override // ClientProtocol
   public boolean setSafeMode(SafeModeAction action, boolean isChecked)
       throws IOException {
     OperationCategory opCategory = OperationCategory.UNCHECKED;
@@ -938,9 +976,10 @@ class NameNodeRpcServer implements Namen
   }
   
   @Override // ClientProtocol
-  public void fsync(String src, String clientName, long lastBlockLength)
+  public void fsync(String src, long fileId, String clientName,
+                    long lastBlockLength)
       throws IOException {
-    namesystem.fsync(src, clientName, lastBlockLength);
+    namesystem.fsync(src, fileId, clientName, lastBlockLength);
   }
 
   @Override // ClientProtocol
@@ -1018,16 +1057,21 @@ class NameNodeRpcServer implements Namen
            + "from " + nodeReg + ", reports.length=" + reports.length);
     }
     final BlockManager bm = namesystem.getBlockManager(); 
-    boolean hasStaleStorages = true;
+    boolean noStaleStorages = false;
     for(StorageBlockReport r : reports) {
       final BlockListAsLongs blocks = new BlockListAsLongs(r.getBlocks());
-      hasStaleStorages = bm.processReport(nodeReg, r.getStorage(), poolId, blocks);
+      //
+      // BlockManager.processReport accumulates information of prior calls
+      // for the same node and storage, so the value returned by the last
+      // call of this loop is the final updated value for noStaleStorage.
+      //
+      noStaleStorages = bm.processReport(nodeReg, r.getStorage(), blocks);
       metrics.incrStorageBlockReportOps();
     }
 
     if (nn.getFSImage().isUpgradeFinalized() &&
         !nn.isStandbyState() &&
-        !hasStaleStorages) {
+        noStaleStorages) {
       return new FinalizeCommand(poolId);
     }
 
@@ -1057,7 +1101,7 @@ class NameNodeRpcServer implements Namen
           +" blocks.");
     }
     for(StorageReceivedDeletedBlocks r : receivedAndDeletedBlocks) {
-      namesystem.processIncrementalBlockReport(nodeReg, poolId, r);
+      namesystem.processIncrementalBlockReport(nodeReg, r);
     }
   }
 
@@ -1143,6 +1187,12 @@ class NameNodeRpcServer implements Namen
       serviceRpcServer.refreshCallQueue(conf);
     }
   }
+
+  @Override // GenericRefreshProtocol
+  public Collection<RefreshResponse> refresh(String identifier, String[] args) {
+    // Let the registry handle as needed
+    return RefreshRegistry.defaultRegistry().dispatch(identifier, args);
+  }
   
   @Override // GetUserMappingsProtocol
   public String[] getGroupsForUser(String user) throws IOException {
@@ -1180,9 +1230,8 @@ class NameNodeRpcServer implements Namen
 
   /**
    * Verify version.
-   * 
-   * @param version
-   * @throws IOException
+   * @param version layout version
+   * @throws IOException on layout version mismatch
    */
   void verifyLayoutVersion(int version) throws IOException {
     if (version != HdfsConstants.NAMENODE_LAYOUT_VERSION)
@@ -1373,5 +1422,32 @@ class NameNodeRpcServer implements Namen
   public AclStatus getAclStatus(String src) throws IOException {
     return namesystem.getAclStatus(src);
   }
+  
+  @Override
+  public void setXAttr(String src, XAttr xAttr, EnumSet<XAttrSetFlag> flag)
+      throws IOException {
+    namesystem.setXAttr(src, xAttr, flag);
+  }
+  
+  @Override
+  public List<XAttr> getXAttrs(String src, List<XAttr> xAttrs) 
+      throws IOException {
+    return namesystem.getXAttrs(src, xAttrs);
+  }
+
+  @Override
+  public List<XAttr> listXAttrs(String src) throws IOException {
+    return namesystem.listXAttrs(src);
+  }
+  
+  @Override
+  public void removeXAttr(String src, XAttr xAttr) throws IOException {
+    namesystem.removeXAttr(src, xAttr);
+  }
+
+  @Override
+  public void checkAccess(String path, FsAction mode) throws IOException {
+    namesystem.checkAccess(path, mode);
+  }
 }
 

Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java Tue Aug 19 23:49:39 2014
@@ -17,6 +17,9 @@
  */
 package org.apache.hadoop.hdfs.server.namenode;
 
+import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT;
+import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY;
+
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.OutputStream;
@@ -47,6 +50,7 @@ import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.RemotePeerFactory;
 import org.apache.hadoop.hdfs.net.Peer;
 import org.apache.hadoop.hdfs.net.TcpPeerServer;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.DirectoryListing;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
@@ -55,6 +59,12 @@ import org.apache.hadoop.hdfs.protocol.H
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
+import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver;
+import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil;
+import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactory;
+import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementStatus;
 import org.apache.hadoop.hdfs.server.blockmanagement.NumberReplicas;
@@ -65,6 +75,7 @@ import org.apache.hadoop.net.NetworkTopo
 import org.apache.hadoop.net.NodeBase;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.Time;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -92,7 +103,7 @@ import com.google.common.annotations.Vis
  *  factors of each file.
  */
 @InterfaceAudience.Private
-public class NamenodeFsck {
+public class NamenodeFsck implements DataEncryptionKeyFactory {
   public static final Log LOG = LogFactory.getLog(NameNode.class.getName());
   
   // return string marking fsck status
@@ -115,6 +126,7 @@ public class NamenodeFsck {
   private boolean showBlocks = false;
   private boolean showLocations = false;
   private boolean showRacks = false;
+  private boolean showprogress = false;
   private boolean showCorruptFileBlocks = false;
 
   /**
@@ -149,6 +161,7 @@ public class NamenodeFsck {
   private List<String> snapshottableDirs = null;
 
   private final BlockPlacementPolicy bpPolicy;
+  private final SaslDataTransferClient saslClient;
 
   /**
    * Filesystem checker.
@@ -159,7 +172,6 @@ public class NamenodeFsck {
    * @param totalDatanodes number of live datanodes
    * @param minReplication minimum replication
    * @param remoteAddress source address of the fsck request
-   * @throws IOException
    */
   NamenodeFsck(Configuration conf, NameNode namenode,
       NetworkTopology networktopology, 
@@ -173,8 +185,16 @@ public class NamenodeFsck {
     this.minReplication = minReplication;
     this.remoteAddress = remoteAddress;
     this.bpPolicy = BlockPlacementPolicy.getInstance(conf, null,
-        networktopology);
-
+        networktopology,
+        namenode.getNamesystem().getBlockManager().getDatanodeManager()
+        .getHost2DatanodeMap());
+    this.saslClient = new SaslDataTransferClient(
+      DataTransferSaslUtil.getSaslPropertiesResolver(conf),
+      TrustedChannelResolver.getInstance(conf),
+      conf.getBoolean(
+        IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY,
+        IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT));
+    
     for (Iterator<String> it = pmap.keySet().iterator(); it.hasNext();) {
       String key = it.next();
       if (key.equals("path")) { this.path = pmap.get("path")[0]; }
@@ -184,6 +204,7 @@ public class NamenodeFsck {
       else if (key.equals("blocks")) { this.showBlocks = true; }
       else if (key.equals("locations")) { this.showLocations = true; }
       else if (key.equals("racks")) { this.showRacks = true; }
+      else if (key.equals("showprogress")) { this.showprogress = true; }
       else if (key.equals("openforwrite")) {this.showOpenFiles = true; }
       else if (key.equals("listcorruptfileblocks")) {
         this.showCorruptFileBlocks = true;
@@ -362,10 +383,13 @@ public class NamenodeFsck {
     } else if (showFiles) {
       out.print(path + " " + fileLen + " bytes, " +
         blocks.locatedBlockCount() + " block(s): ");
-    } else {
+    } else if (showprogress) {
       out.print('.');
     }
-    if (res.totalFiles % 100 == 0) { out.println(); out.flush(); }
+    if ((showprogress) && res.totalFiles % 100 == 0) {
+      out.println();
+      out.flush();
+    }
     int missing = 0;
     int corrupt = 0;
     long missize = 0;
@@ -615,15 +639,16 @@ public class NamenodeFsck {
             setConfiguration(namenode.conf).
             setRemotePeerFactory(new RemotePeerFactory() {
               @Override
-              public Peer newConnectedPeer(InetSocketAddress addr)
+              public Peer newConnectedPeer(InetSocketAddress addr,
+                  Token<BlockTokenIdentifier> blockToken, DatanodeID datanodeId)
                   throws IOException {
                 Peer peer = null;
                 Socket s = NetUtils.getDefaultSocketFactory(conf).createSocket();
                 try {
                   s.connect(addr, HdfsServerConstants.READ_TIMEOUT);
                   s.setSoTimeout(HdfsServerConstants.READ_TIMEOUT);
-                  peer = TcpPeerServer.peerFromSocketAndKey(s, namenode.getRpcServer().
-                        getDataEncryptionKey());
+                  peer = TcpPeerServer.peerFromSocketAndKey(saslClient, s,
+                        NamenodeFsck.this, blockToken, datanodeId);
                 } finally {
                   if (peer == null) {
                     IOUtils.closeQuietly(s);
@@ -662,7 +687,12 @@ public class NamenodeFsck {
       throw new Exception("Could not copy block data for " + lblock.getBlock());
     }
   }
-      
+
+  @Override
+  public DataEncryptionKey newDataEncryptionKey() throws IOException {
+    return namenode.getRpcServer().getDataEncryptionKey();
+  }
+
   /*
    * XXX (ab) See comment above for copyBlock().
    *

Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SaveNamespaceCancelledException.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SaveNamespaceCancelledException.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SaveNamespaceCancelledException.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SaveNamespaceCancelledException.java Tue Aug 19 23:49:39 2014
@@ -18,7 +18,7 @@
 package org.apache.hadoop.hdfs.server.namenode;
 
 import java.io.IOException;
-import org.apache.hadoop.classification.InterfaceAudience;;
+import org.apache.hadoop.classification.InterfaceAudience;
 
 @InterfaceAudience.Private
 public class SaveNamespaceCancelledException extends IOException {

Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java Tue Aug 19 23:49:39 2014
@@ -27,11 +27,9 @@ import java.net.URI;
 import java.net.URL;
 import java.security.PrivilegedAction;
 import java.security.PrivilegedExceptionAction;
-import java.util.Collection;
-import java.util.Date;
-import java.util.Iterator;
-import java.util.List;
+import java.util.*;
 
+import com.google.common.collect.Lists;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.CommandLineParser;
 import org.apache.commons.cli.HelpFormatter;
@@ -64,12 +62,14 @@ import org.apache.hadoop.hdfs.server.nam
 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
 import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
+import org.apache.hadoop.hdfs.util.Canceler;
 import org.apache.hadoop.http.HttpConfig;
 import org.apache.hadoop.http.HttpServer2;
 import org.apache.hadoop.io.MD5Hash;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.metrics2.source.JvmMetrics;
+import org.apache.hadoop.metrics2.util.MBeans;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -79,7 +79,9 @@ import org.apache.hadoop.util.Time;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
+import org.apache.hadoop.util.VersionInfo;
+
+import javax.management.ObjectName;
 
 /**********************************************************
  * The Secondary NameNode is a helper to the primary NameNode.
@@ -95,7 +97,8 @@ import com.google.common.collect.Immutab
  *
  **********************************************************/
 @InterfaceAudience.Private
-public class SecondaryNameNode implements Runnable {
+public class SecondaryNameNode implements Runnable,
+        SecondaryNameNodeInfoMXBean {
     
   static{
     HdfsConfiguration.init();
@@ -122,21 +125,21 @@ public class SecondaryNameNode implement
   private FSNamesystem namesystem;
 
   private Thread checkpointThread;
-
+  private ObjectName nameNodeStatusBeanName;
+  private String legacyOivImageDir;
 
   @Override
   public String toString() {
     return getClass().getSimpleName() + " Status" 
-      + "\nName Node Address    : " + nameNodeAddr   
-      + "\nStart Time           : " + new Date(starttime)
-      + "\nLast Checkpoint      : " + (lastCheckpointTime == 0? "--":
+      + "\nName Node Address      : " + nameNodeAddr
+      + "\nStart Time             : " + new Date(starttime)
+      + "\nLast Checkpoint        : " + (lastCheckpointTime == 0? "--":
 				       ((Time.monotonicNow() - lastCheckpointTime) / 1000))
 	                            + " seconds ago"
-      + "\nCheckpoint Period    : " + checkpointConf.getPeriod() + " seconds"
-      + "\nCheckpoint Size      : " + StringUtils.byteDesc(checkpointConf.getTxnCount())
-                                    + " (= " + checkpointConf.getTxnCount() + " bytes)" 
-      + "\nCheckpoint Dirs      : " + checkpointDirs
-      + "\nCheckpoint Edits Dirs: " + checkpointEditsDirs;
+      + "\nCheckpoint Period      : " + checkpointConf.getPeriod() + " seconds"
+      + "\nCheckpoint Transactions: " + checkpointConf.getTxnCount()
+      + "\nCheckpoint Dirs        : " + checkpointDirs
+      + "\nCheckpoint Edits Dirs  : " + checkpointEditsDirs;
   }
 
   @VisibleForTesting
@@ -169,11 +172,6 @@ public class SecondaryNameNode implement
     this.namenode = namenode;
   }
 
-  @VisibleForTesting
-  List<URI> getCheckpointDirs() {
-    return ImmutableList.copyOf(checkpointDirs);
-  }
-  
   /**
    * Create a connection to the primary namenode.
    */
@@ -209,7 +207,6 @@ public class SecondaryNameNode implement
   
   /**
    * Initialize SecondaryNameNode.
-   * @param commandLineOpts
    */
   private void initialize(final Configuration conf,
       CommandLineOpts commandLineOpts) throws IOException {
@@ -266,6 +263,9 @@ public class SecondaryNameNode implement
         DFSConfigKeys.DFS_SECONDARY_NAMENODE_KERBEROS_INTERNAL_SPNEGO_PRINCIPAL_KEY,
         DFSConfigKeys.DFS_SECONDARY_NAMENODE_KEYTAB_FILE_KEY);
 
+    nameNodeStatusBeanName = MBeans.register("SecondaryNameNode",
+            "SecondaryNameNodeInfo", this);
+
     infoServer = builder.build();
 
     infoServer.setAttribute("secondary.name.node", this);
@@ -291,6 +291,9 @@ public class SecondaryNameNode implement
           NetUtils.getHostPortString(httpsAddress));
     }
 
+    legacyOivImageDir = conf.get(
+        DFSConfigKeys.DFS_NAMENODE_LEGACY_OIV_IMAGE_DIR_KEY);
+
     LOG.info("Checkpoint Period   :" + checkpointConf.getPeriod() + " secs "
         + "(" + checkpointConf.getPeriod() / 60 + " min)");
     LOG.info("Log Size Trigger    :" + checkpointConf.getTxnCount() + " txns");
@@ -331,6 +334,10 @@ public class SecondaryNameNode implement
     } catch (Exception e) {
       LOG.warn("Exception shutting down SecondaryNameNode", e);
     }
+    if (nameNodeStatusBeanName != null) {
+      MBeans.unregister(nameNodeStatusBeanName);
+      nameNodeStatusBeanName = null;
+    }
     try {
       if (checkpointImage != null) {
         checkpointImage.close();
@@ -495,6 +502,7 @@ public class SecondaryNameNode implement
    * @return if the image is fetched from primary or not
    */
   @VisibleForTesting
+  @SuppressWarnings("deprecated")
   public boolean doCheckpoint() throws IOException {
     checkpointImage.ensureCurrentDirExists();
     NNStorage dstStorage = checkpointImage.getStorage();
@@ -557,11 +565,18 @@ public class SecondaryNameNode implement
 
     LOG.warn("Checkpoint done. New Image Size: " 
              + dstStorage.getFsImageName(txid).length());
-    
+
+    if (legacyOivImageDir != null && !legacyOivImageDir.isEmpty()) {
+      try {
+        checkpointImage.saveLegacyOIVImage(namesystem, legacyOivImageDir,
+            new Canceler());
+      } catch (IOException e) {
+        LOG.warn("Failed to write legacy OIV image: ", e);
+      }
+    }
     return loadImage;
   }
-  
-  
+
   /**
    * @param opts The parameters passed to this program.
    * @exception Exception if the filesystem does not exist.
@@ -680,6 +695,50 @@ public class SecondaryNameNode implement
     checkpointThread.start();
   }
 
+  @Override // SecondaryNameNodeInfoMXXBean
+  public String getHostAndPort() {
+    return NetUtils.getHostPortString(nameNodeAddr);
+  }
+
+  @Override // SecondaryNameNodeInfoMXXBean
+  public long getStartTime() {
+    return starttime;
+  }
+
+  @Override // SecondaryNameNodeInfoMXXBean
+  public long getLastCheckpointTime() {
+    return lastCheckpointTime;
+  }
+
+  @Override // SecondaryNameNodeInfoMXXBean
+  public String[] getCheckpointDirectories() {
+    ArrayList<String> r = Lists.newArrayListWithCapacity(checkpointDirs.size());
+    for (URI d : checkpointDirs) {
+      r.add(d.toString());
+    }
+    return r.toArray(new String[r.size()]);
+  }
+
+  @Override // SecondaryNameNodeInfoMXXBean
+  public String[] getCheckpointEditlogDirectories() {
+    ArrayList<String> r = Lists.newArrayListWithCapacity(checkpointEditsDirs.size());
+    for (URI d : checkpointEditsDirs) {
+      r.add(d.toString());
+    }
+    return r.toArray(new String[r.size()]);
+  }
+
+  @Override // VersionInfoMXBean
+  public String getCompileInfo() {
+    return VersionInfo.getDate() + " by " + VersionInfo.getUser() +
+            " from " + VersionInfo.getBranch();
+  }
+
+  @Override // VersionInfoMXBean
+  public String getSoftwareVersion() {
+    return VersionInfo.getVersion();
+  }
+
 
   /**
    * Container for parsed command-line options.
@@ -1005,7 +1064,7 @@ public class SecondaryNameNode implement
       } finally {
         dstNamesystem.writeUnlock();
       }
-      dstNamesystem.dir.imageLoadComplete();
+      dstNamesystem.imageLoadComplete();
     }
     // error simulation code for junit test
     CheckpointFaultInjector.getInstance().duringMerge();   

Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/TransferFsImage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/TransferFsImage.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/TransferFsImage.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/TransferFsImage.java Tue Aug 19 23:49:39 2014
@@ -159,7 +159,7 @@ public class TransferFsImage {
       }
     }
 
-    final long milliTime = System.currentTimeMillis();
+    final long milliTime = Time.monotonicNow();
     String tmpFileName = NNStorage.getTemporaryEditsFileName(
         log.getStartTxId(), log.getEndTxId(), milliTime);
     List<File> tmpFiles = dstStorage.getFiles(NameNodeDirType.EDITS,

Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/BootstrapStandby.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/BootstrapStandby.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/BootstrapStandby.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/BootstrapStandby.java Tue Aug 19 23:49:39 2014
@@ -81,6 +81,7 @@ public class BootstrapStandby implements
   
   private boolean force = false;
   private boolean interactive = true;
+  private boolean skipSharedEditsCheck = false;
 
   // Exit/return codes.
   static final int ERR_CODE_FAILED_CONNECT = 2;
@@ -117,6 +118,8 @@ public class BootstrapStandby implements
         force = true;
       } else if ("-nonInteractive".equals(arg)) {
         interactive = false;
+      } else if ("-skipSharedEditsCheck".equals(arg)) {
+        skipSharedEditsCheck = true;
       } else {
         printUsage();
         throw new HadoopIllegalArgumentException(
@@ -127,7 +130,7 @@ public class BootstrapStandby implements
 
   private void printUsage() {
     System.err.println("Usage: " + this.getClass().getSimpleName() +
-        "[-force] [-nonInteractive]");
+        " [-force] [-nonInteractive] [-skipSharedEditsCheck]");
   }
   
   private NamenodeProtocol createNNProtocolProxy()
@@ -200,7 +203,7 @@ public class BootstrapStandby implements
 
       // Ensure that we have enough edits already in the shared directory to
       // start up from the last checkpoint on the active.
-      if (!checkLogsAvailableForRead(image, imageTxId, curTxId)) {
+      if (!skipSharedEditsCheck && !checkLogsAvailableForRead(image, imageTxId, curTxId)) {
         return ERR_CODE_LOGS_UNAVAILABLE;
       }
 

Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ConfiguredFailoverProxyProvider.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ConfiguredFailoverProxyProvider.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ConfiguredFailoverProxyProvider.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ConfiguredFailoverProxyProvider.java Tue Aug 19 23:49:39 2014
@@ -34,8 +34,8 @@ import org.apache.hadoop.hdfs.DFSConfigK
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.HAUtil;
 import org.apache.hadoop.hdfs.NameNodeProxies;
+import org.apache.hadoop.hdfs.server.namenode.ha.AbstractNNFailoverProxyProvider;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
-import org.apache.hadoop.io.retry.FailoverProxyProvider;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.security.UserGroupInformation;
 
@@ -46,8 +46,8 @@ import com.google.common.base.Preconditi
  * to connect to during fail-over. The first configured address is tried first,
  * and on a fail-over event the other address is tried.
  */
-public class ConfiguredFailoverProxyProvider<T> implements
-    FailoverProxyProvider<T> {
+public class ConfiguredFailoverProxyProvider<T> extends
+    AbstractNNFailoverProxyProvider<T> {
   
   private static final Log LOG =
       LogFactory.getLog(ConfiguredFailoverProxyProvider.class);
@@ -165,4 +165,12 @@ public class ConfiguredFailoverProxyProv
       }
     }
   }
+
+  /**
+   * Logical URI is required for this failover proxy provider.
+   */
+  @Override
+  public boolean useLogicalURI() {
+    return true;
+  }
 }

Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/HAState.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/HAState.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/HAState.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/HAState.java Tue Aug 19 23:49:39 2014
@@ -130,8 +130,8 @@ abstract public class HAState {
    * Check if an operation is supported in a given state.
    * @param context HA context
    * @param op Type of the operation.
-   * @throws UnsupportedActionException if a given type of operation is not
-   *           supported in this state.
+   * @throws StandbyException if a given type of operation is not
+   *           supported in standby state
    */
   public abstract void checkOperation(final HAContext context, final OperationCategory op)
       throws StandbyException;

Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/StandbyCheckpointer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/StandbyCheckpointer.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/StandbyCheckpointer.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/StandbyCheckpointer.java Tue Aug 19 23:49:39 2014
@@ -183,6 +183,12 @@ public class StandbyCheckpointer {
       txid = img.getStorage().getMostRecentCheckpointTxId();
       assert txid == thisCheckpointTxId : "expected to save checkpoint at txid=" +
         thisCheckpointTxId + " but instead saved at txid=" + txid;
+
+      // Save the legacy OIV image, if the output dir is defined.
+      String outputDir = checkpointConf.getLegacyOivImageDir();
+      if (outputDir != null && !outputDir.isEmpty()) {
+        img.saveLegacyOIVImage(namesystem, outputDir, canceler);
+      }
     } finally {
       namesystem.longReadUnlock();
     }

Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/FSNamesystemMBean.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/FSNamesystemMBean.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/FSNamesystemMBean.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/FSNamesystemMBean.java Tue Aug 19 23:49:39 2014
@@ -151,4 +151,11 @@ public interface FSNamesystemMBean {
    * @return number of blocks pending deletion
    */
   long getPendingDeletionBlocks();
+
+  /**
+   * Number of content stale storages.
+   * @return number of content stale storages
+   */
+  public int getNumStaleStorages();
+
 }

Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/NameNodeMetrics.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/NameNodeMetrics.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/NameNodeMetrics.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/NameNodeMetrics.java Tue Aug 19 23:49:39 2014
@@ -98,7 +98,11 @@ public class NameNodeMetrics {
   @Metric("GetImageServlet putImage")
   MutableRate putImage;
 
-  NameNodeMetrics(String processName, String sessionId, int[] intervals) {
+  JvmMetrics jvmMetrics = null;
+  
+  NameNodeMetrics(String processName, String sessionId, int[] intervals,
+      final JvmMetrics jvmMetrics) {
+    this.jvmMetrics = jvmMetrics;
     registry.tag(ProcessName, processName).tag(SessionId, sessionId);
     
     final int len = intervals.length;
@@ -124,14 +128,19 @@ public class NameNodeMetrics {
     String sessionId = conf.get(DFSConfigKeys.DFS_METRICS_SESSION_ID_KEY);
     String processName = r.toString();
     MetricsSystem ms = DefaultMetricsSystem.instance();
-    JvmMetrics.create(processName, sessionId, ms);
+    JvmMetrics jm = JvmMetrics.create(processName, sessionId, ms);
     
     // Percentile measurement is off by default, by watching no intervals
     int[] intervals = 
         conf.getInts(DFSConfigKeys.DFS_METRICS_PERCENTILES_INTERVALS_KEY);
-    return ms.register(new NameNodeMetrics(processName, sessionId, intervals));
+    return ms.register(new NameNodeMetrics(processName, sessionId,
+        intervals, jm));
   }
 
+  public JvmMetrics getJvmMetrics() {
+    return jvmMetrics;
+  }
+  
   public void shutdown() {
     DefaultMetricsSystem.shutdown();
   }

Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/AbstractINodeDiff.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/AbstractINodeDiff.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/AbstractINodeDiff.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/AbstractINodeDiff.java Tue Aug 19 23:49:39 2014
@@ -17,13 +17,17 @@
  */
 package org.apache.hadoop.hdfs.server.namenode.snapshot;
 
-import com.google.common.base.Preconditions;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.List;
+
 import org.apache.hadoop.hdfs.server.namenode.INode;
 import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo;
 import org.apache.hadoop.hdfs.server.namenode.INodeAttributes;
 import org.apache.hadoop.hdfs.server.namenode.Quota;
+import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotFSImageFormat.ReferenceMap;
 
-import java.util.List;
+import com.google.common.base.Preconditions;
 
 /**
  * The difference of an inode between in two snapshots.
@@ -86,7 +90,6 @@ abstract class AbstractINodeDiff<N exten
     return posteriorDiff;
   }
 
-  /** @return the posterior diff. */
   final void setPosterior(D posterior) {
     posteriorDiff = posterior;
   }
@@ -129,4 +132,11 @@ abstract class AbstractINodeDiff<N exten
     return getClass().getSimpleName() + ": " + this.getSnapshotId() + " (post="
         + (posteriorDiff == null? null: posteriorDiff.getSnapshotId()) + ")";
   }
+
+  void writeSnapshot(DataOutput out) throws IOException {
+    out.writeInt(snapshotId);
+  }
+
+  abstract void write(DataOutput out, ReferenceMap referenceMap
+      ) throws IOException;
 }

Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/AbstractINodeDiffList.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/AbstractINodeDiffList.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/AbstractINodeDiffList.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/AbstractINodeDiffList.java Tue Aug 19 23:49:39 2014
@@ -227,32 +227,34 @@ abstract class AbstractINodeDiffList<N e
     D diff = getDiffById(snapshotId);
     return diff == null ? Snapshot.CURRENT_STATE_ID : diff.getSnapshotId();
   }
-  
-  /**
-   * Check if changes have happened between two snapshots.
-   * @param earlier The snapshot taken earlier
-   * @param later The snapshot taken later
-   * @return Whether or not modifications (including diretory/file metadata
-   *         change, file creation/deletion under the directory) have happened
-   *         between snapshots.
-   */
-  final boolean changedBetweenSnapshots(Snapshot earlier, Snapshot later) {
+
+  final int[] changedBetweenSnapshots(Snapshot from, Snapshot to) {
+    Snapshot earlier = from;
+    Snapshot later = to;
+    if (Snapshot.ID_COMPARATOR.compare(from, to) > 0) {
+      earlier = to;
+      later = from;
+    }
+
     final int size = diffs.size();
     int earlierDiffIndex = Collections.binarySearch(diffs, earlier.getId());
+    int laterDiffIndex = later == null ? size : Collections
+        .binarySearch(diffs, later.getId());
     if (-earlierDiffIndex - 1 == size) {
       // if the earlierSnapshot is after the latest SnapshotDiff stored in
       // diffs, no modification happened after the earlierSnapshot
-      return false;
+      return null;
     }
-    if (later != null) {
-      int laterDiffIndex = Collections.binarySearch(diffs, later.getId());
-      if (laterDiffIndex == -1 || laterDiffIndex == 0) {
-        // if the laterSnapshot is the earliest SnapshotDiff stored in diffs, or
-        // before it, no modification happened before the laterSnapshot
-        return false;
-      }
+    if (laterDiffIndex == -1 || laterDiffIndex == 0) {
+      // if the laterSnapshot is the earliest SnapshotDiff stored in diffs, or
+      // before it, no modification happened before the laterSnapshot
+      return null;
     }
-    return true;
+    earlierDiffIndex = earlierDiffIndex < 0 ? (-earlierDiffIndex - 1)
+        : earlierDiffIndex;
+    laterDiffIndex = laterDiffIndex < 0 ? (-laterDiffIndex - 1)
+        : laterDiffIndex;
+    return new int[]{earlierDiffIndex, laterDiffIndex};
   }
 
   /**

Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/DirectoryWithSnapshotFeature.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/DirectoryWithSnapshotFeature.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/DirectoryWithSnapshotFeature.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/DirectoryWithSnapshotFeature.java Tue Aug 19 23:49:39 2014
@@ -17,9 +17,9 @@
  */
 package org.apache.hadoop.hdfs.server.namenode.snapshot;
 
+import java.io.DataOutput;
+import java.io.IOException;
 import java.util.ArrayDeque;
-import java.util.ArrayList;
-import java.util.Collections;
 import java.util.Deque;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -28,10 +28,9 @@ import java.util.Map;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
-import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport.DiffReportEntry;
-import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport.DiffType;
 import org.apache.hadoop.hdfs.server.namenode.Content;
 import org.apache.hadoop.hdfs.server.namenode.ContentSummaryComputationContext;
+import org.apache.hadoop.hdfs.server.namenode.FSImageSerialization;
 import org.apache.hadoop.hdfs.server.namenode.INode;
 import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo;
 import org.apache.hadoop.hdfs.server.namenode.INodeDirectory;
@@ -39,6 +38,7 @@ import org.apache.hadoop.hdfs.server.nam
 import org.apache.hadoop.hdfs.server.namenode.INodeFile;
 import org.apache.hadoop.hdfs.server.namenode.INodeReference;
 import org.apache.hadoop.hdfs.server.namenode.Quota;
+import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotFSImageFormat.ReferenceMap;
 import org.apache.hadoop.hdfs.util.Diff;
 import org.apache.hadoop.hdfs.util.Diff.Container;
 import org.apache.hadoop.hdfs.util.Diff.ListType;
@@ -48,7 +48,9 @@ import org.apache.hadoop.hdfs.util.ReadO
 import com.google.common.base.Preconditions;
 
 /**
- * Feature for directory with snapshot-related information.
+ * Feature used to store and process the snapshot diff information for a
+ * directory. In particular, it contains a directory diff list recording changes
+ * made to the directory and its children for each snapshot.
  */
 @InterfaceAudience.Private
 public class DirectoryWithSnapshotFeature implements INode.Feature {
@@ -120,6 +122,35 @@ public class DirectoryWithSnapshotFeatur
       return counts;
     }
 
+    /** Serialize {@link #created} */
+    private void writeCreated(DataOutput out) throws IOException {
+      final List<INode> created = getList(ListType.CREATED);
+      out.writeInt(created.size());
+      for (INode node : created) {
+        // For INode in created list, we only need to record its local name
+        byte[] name = node.getLocalNameBytes();
+        out.writeShort(name.length);
+        out.write(name);
+      }
+    }
+
+    /** Serialize {@link #deleted} */
+    private void writeDeleted(DataOutput out,
+        ReferenceMap referenceMap) throws IOException {
+      final List<INode> deleted = getList(ListType.DELETED);
+      out.writeInt(deleted.size());
+      for (INode node : deleted) {
+        FSImageSerialization.saveINode2Image(node, out, true, referenceMap);
+      }
+    }
+
+    /** Serialize to out */
+    private void write(DataOutput out, ReferenceMap referenceMap
+        ) throws IOException {
+      writeCreated(out);
+      writeDeleted(out, referenceMap);
+    }
+
     /** Get the list of INodeDirectory contained in the deleted list */
     private void getDirsInDeleted(List<INodeDirectory> dirList) {
       for (INode node : getList(ListType.DELETED)) {
@@ -128,59 +159,6 @@ public class DirectoryWithSnapshotFeatur
         }
       }
     }
-
-    /**
-     * Interpret the diff and generate a list of {@link DiffReportEntry}.
-     * @param parentPath The relative path of the parent.
-     * @param fromEarlier True indicates {@code diff=later-earlier},
-     *                    False indicates {@code diff=earlier-later}
-     * @return A list of {@link DiffReportEntry} as the diff report.
-     */
-    public List<DiffReportEntry> generateReport(byte[][] parentPath,
-        boolean fromEarlier) {
-      List<DiffReportEntry> cList = new ArrayList<DiffReportEntry>();
-      List<DiffReportEntry> dList = new ArrayList<DiffReportEntry>();
-      int c = 0, d = 0;
-      List<INode> created = getList(ListType.CREATED);
-      List<INode> deleted = getList(ListType.DELETED);
-      byte[][] fullPath = new byte[parentPath.length + 1][];
-      System.arraycopy(parentPath, 0, fullPath, 0, parentPath.length);
-      for (; c < created.size() && d < deleted.size(); ) {
-        INode cnode = created.get(c);
-        INode dnode = deleted.get(d);
-        if (cnode.compareTo(dnode.getLocalNameBytes()) == 0) {
-          fullPath[fullPath.length - 1] = cnode.getLocalNameBytes();
-          // must be the case: delete first and then create an inode with the
-          // same name
-          cList.add(new DiffReportEntry(DiffType.CREATE, fullPath));
-          dList.add(new DiffReportEntry(DiffType.DELETE, fullPath));
-          c++;
-          d++;
-        } else if (cnode.compareTo(dnode.getLocalNameBytes()) < 0) {
-          fullPath[fullPath.length - 1] = cnode.getLocalNameBytes();
-          cList.add(new DiffReportEntry(fromEarlier ? DiffType.CREATE
-              : DiffType.DELETE, fullPath));
-          c++;
-        } else {
-          fullPath[fullPath.length - 1] = dnode.getLocalNameBytes();
-          dList.add(new DiffReportEntry(fromEarlier ? DiffType.DELETE
-              : DiffType.CREATE, fullPath));
-          d++;
-        }
-      }
-      for (; d < deleted.size(); d++) {
-        fullPath[fullPath.length - 1] = deleted.get(d).getLocalNameBytes();
-        dList.add(new DiffReportEntry(fromEarlier ? DiffType.DELETE
-            : DiffType.CREATE, fullPath));
-      }
-      for (; c < created.size(); c++) {
-        fullPath[fullPath.length - 1] = created.get(c).getLocalNameBytes();
-        cList.add(new DiffReportEntry(fromEarlier ? DiffType.CREATE
-            : DiffType.DELETE, fullPath));
-      }
-      dList.addAll(cList);
-      return dList;
-    }
   }
 
   /**
@@ -315,6 +293,25 @@ public class DirectoryWithSnapshotFeatur
     }
 
     @Override
+    void write(DataOutput out, ReferenceMap referenceMap) throws IOException {
+      writeSnapshot(out);
+      out.writeInt(childrenSize);
+
+      // Write snapshotINode
+      out.writeBoolean(isSnapshotRoot);
+      if (!isSnapshotRoot) {
+        if (snapshotINode != null) {
+          out.writeBoolean(true);
+          FSImageSerialization.writeINodeDirectoryAttributes(snapshotINode, out);
+        } else {
+          out.writeBoolean(false);
+        }
+      }
+      // Write diff. Node need to write poseriorDiff, since diffs is a list.
+      diff.write(out, referenceMap);
+    }
+
+    @Override
     Quota.Counts destroyDiffAndCollectBlocks(INodeDirectory currentINode,
         BlocksMapUpdateInfo collectedBlocks, final List<INode> removedINodes) {
       // this diff has been deleted
@@ -364,6 +361,25 @@ public class DirectoryWithSnapshotFeatur
       }
       return false;
     }
+
+    /**
+     * Find the corresponding snapshot whose deleted list contains the given
+     * inode.
+     * @return the id of the snapshot. {@link Snapshot#NO_SNAPSHOT_ID} if the
+     * given inode is not in any of the snapshot.
+     */
+    public int findSnapshotDeleted(final INode child) {
+      final List<DirectoryDiff> diffList = asList();
+      for(int i = diffList.size() - 1; i >= 0; i--) {
+        final ChildrenDiff diff = diffList.get(i).diff;
+        final int d = diff.searchIndex(ListType.DELETED,
+            child.getLocalNameBytes());
+        if (d >= 0 && diff.getList(ListType.DELETED).get(d) == child) {
+          return diffList.get(i).getSnapshotId();
+        }
+      }
+      return Snapshot.NO_SNAPSHOT_ID;
+    }
   }
   
   private static Map<INode, INode> cloneDiffList(List<INode> diffList) {
@@ -653,34 +669,21 @@ public class DirectoryWithSnapshotFeatur
    */
   boolean computeDiffBetweenSnapshots(Snapshot fromSnapshot,
       Snapshot toSnapshot, ChildrenDiff diff, INodeDirectory currentINode) {
-    Snapshot earlier = fromSnapshot;
-    Snapshot later = toSnapshot;
-    if (Snapshot.ID_COMPARATOR.compare(fromSnapshot, toSnapshot) > 0) {
-      earlier = toSnapshot;
-      later = fromSnapshot;
-    }
-
-    boolean modified = diffs.changedBetweenSnapshots(earlier, later);
-    if (!modified) {
+    int[] diffIndexPair = diffs.changedBetweenSnapshots(fromSnapshot,
+        toSnapshot);
+    if (diffIndexPair == null) {
       return false;
     }
-
-    final List<DirectoryDiff> difflist = diffs.asList();
-    final int size = difflist.size();
-    int earlierDiffIndex = Collections.binarySearch(difflist, earlier.getId());
-    int laterDiffIndex = later == null ? size : Collections
-        .binarySearch(difflist, later.getId());
-    earlierDiffIndex = earlierDiffIndex < 0 ? (-earlierDiffIndex - 1)
-        : earlierDiffIndex;
-    laterDiffIndex = laterDiffIndex < 0 ? (-laterDiffIndex - 1)
-        : laterDiffIndex;
+    int earlierDiffIndex = diffIndexPair[0];
+    int laterDiffIndex = diffIndexPair[1];
 
     boolean dirMetadataChanged = false;
     INodeDirectoryAttributes dirCopy = null;
+    List<DirectoryDiff> difflist = diffs.asList();
     for (int i = earlierDiffIndex; i < laterDiffIndex; i++) {
       DirectoryDiff sdiff = difflist.get(i);
       diff.combinePosterior(sdiff.diff, null);
-      if (dirMetadataChanged == false && sdiff.snapshotINode != null) {
+      if (!dirMetadataChanged && sdiff.snapshotINode != null) {
         if (dirCopy == null) {
           dirCopy = sdiff.snapshotINode;
         } else if (!dirCopy.metadataEquals(sdiff.snapshotINode)) {
@@ -692,7 +695,7 @@ public class DirectoryWithSnapshotFeatur
     if (!diff.isEmpty() || dirMetadataChanged) {
       return true;
     } else if (dirCopy != null) {
-      for (int i = laterDiffIndex; i < size; i++) {
+      for (int i = laterDiffIndex; i < difflist.size(); i++) {
         if (!dirCopy.metadataEquals(difflist.get(i).snapshotINode)) {
           return true;
         }

Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FSImageFormatPBSnapshot.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FSImageFormatPBSnapshot.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FSImageFormatPBSnapshot.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FSImageFormatPBSnapshot.java Tue Aug 19 23:49:39 2014
@@ -65,6 +65,7 @@ import org.apache.hadoop.hdfs.server.nam
 import org.apache.hadoop.hdfs.server.namenode.snapshot.DirectoryWithSnapshotFeature.DirectoryDiff;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.DirectoryWithSnapshotFeature.DirectoryDiffList;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot.Root;
+import org.apache.hadoop.hdfs.server.namenode.XAttrFeature;
 import org.apache.hadoop.hdfs.util.Diff.ListType;
 
 import com.google.common.base.Preconditions;
@@ -126,9 +127,8 @@ public class FSImageFormatPBSnapshot {
     }
 
     /**
-     * Load the snapshots section from fsimage. Also convert snapshottable
-     * directories into {@link INodeDirectorySnapshottable}.
-     *
+     * Load the snapshots section from fsimage. Also add snapshottable feature
+     * to snapshottable directories.
      */
     public void loadSnapshotSection(InputStream in) throws IOException {
       SnapshotManager sm = fsn.getSnapshotManager();
@@ -138,16 +138,13 @@ public class FSImageFormatPBSnapshot {
       sm.setSnapshotCounter(section.getSnapshotCounter());
       for (long sdirId : section.getSnapshottableDirList()) {
         INodeDirectory dir = fsDir.getInode(sdirId).asDirectory();
-        final INodeDirectorySnapshottable sdir;
         if (!dir.isSnapshottable()) {
-          sdir = new INodeDirectorySnapshottable(dir);
-          fsDir.addToInodeMap(sdir);
+          dir.addSnapshottableFeature();
         } else {
           // dir is root, and admin set root to snapshottable before
-          sdir = (INodeDirectorySnapshottable) dir;
-          sdir.setSnapshotQuota(INodeDirectorySnapshottable.SNAPSHOT_LIMIT);
+          dir.setSnapshotQuota(DirectorySnapshottableFeature.SNAPSHOT_LIMIT);
         }
-        sm.addSnapshottable(sdir);
+        sm.addSnapshottable(dir);
       }
       loadSnapshots(in, snum);
     }
@@ -159,12 +156,11 @@ public class FSImageFormatPBSnapshot {
         INodeDirectory root = loadINodeDirectory(pbs.getRoot(),
             parent.getLoaderContext());
         int sid = pbs.getSnapshotId();
-        INodeDirectorySnapshottable parent = (INodeDirectorySnapshottable) fsDir
-            .getInode(root.getId()).asDirectory();
+        INodeDirectory parent = fsDir.getInode(root.getId()).asDirectory();
         Snapshot snapshot = new Snapshot(sid, root, parent);
         // add the snapshot to parent, since we follow the sequence of
         // snapshotsByNames when saving, we do not need to sort when loading
-        parent.addSnapshot(snapshot);
+        parent.getDirectorySnapshottableFeature().addSnapshot(snapshot);
         snapshotMap.put(sid, snapshot);
       }
     }
@@ -215,11 +211,16 @@ public class FSImageFormatPBSnapshot {
             acl = new AclFeature(FSImageFormatPBINode.Loader.loadAclEntries(
                 fileInPb.getAcl(), state.getStringTable()));
           }
+          XAttrFeature xAttrs = null;
+          if (fileInPb.hasXAttrs()) {
+            xAttrs = new XAttrFeature(FSImageFormatPBINode.Loader.loadXAttrs(
+                fileInPb.getXAttrs(), state.getStringTable()));
+          }
 
           copy = new INodeFileAttributes.SnapshotCopy(pbf.getName()
               .toByteArray(), permission, acl, fileInPb.getModificationTime(),
               fileInPb.getAccessTime(), (short) fileInPb.getReplication(),
-              fileInPb.getPreferredBlockSize());
+              fileInPb.getPreferredBlockSize(), xAttrs);
         }
 
         FileDiff diff = new FileDiff(pbf.getSnapshotId(), copy, null,
@@ -310,16 +311,21 @@ public class FSImageFormatPBSnapshot {
             acl = new AclFeature(FSImageFormatPBINode.Loader.loadAclEntries(
                 dirCopyInPb.getAcl(), state.getStringTable()));
           }
+          XAttrFeature xAttrs = null;
+          if (dirCopyInPb.hasXAttrs()) {
+            xAttrs = new XAttrFeature(FSImageFormatPBINode.Loader.loadXAttrs(
+                dirCopyInPb.getXAttrs(), state.getStringTable()));
+          }
 
           long modTime = dirCopyInPb.getModificationTime();
           boolean noQuota = dirCopyInPb.getNsQuota() == -1
               && dirCopyInPb.getDsQuota() == -1;
 
           copy = noQuota ? new INodeDirectoryAttributes.SnapshotCopy(name,
-              permission, acl, modTime)
+              permission, acl, modTime, xAttrs)
               : new INodeDirectoryAttributes.CopyWithQuota(name, permission,
                   acl, modTime, dirCopyInPb.getNsQuota(),
-                  dirCopyInPb.getDsQuota());
+                  dirCopyInPb.getDsQuota(), xAttrs);
         }
         // load created list
         List<INode> clist = loadCreatedList(in, dir,
@@ -362,14 +368,15 @@ public class FSImageFormatPBSnapshot {
           .setSnapshotCounter(sm.getSnapshotCounter())
           .setNumSnapshots(sm.getNumSnapshots());
 
-      INodeDirectorySnapshottable[] snapshottables = sm.getSnapshottableDirs();
-      for (INodeDirectorySnapshottable sdir : snapshottables) {
+      INodeDirectory[] snapshottables = sm.getSnapshottableDirs();
+      for (INodeDirectory sdir : snapshottables) {
         b.addSnapshottableDir(sdir.getId());
       }
       b.build().writeDelimitedTo(out);
       int i = 0;
-      for(INodeDirectorySnapshottable sdir : snapshottables) {
-        for(Snapshot s : sdir.getSnapshotsByNames()) {
+      for(INodeDirectory sdir : snapshottables) {
+        for (Snapshot s : sdir.getDirectorySnapshottableFeature()
+            .getSnapshotList()) {
           Root sroot = s.getRoot();
           SnapshotSection.Snapshot.Builder sb = SnapshotSection.Snapshot
               .newBuilder().setSnapshotId(s.getId());

Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FileDiff.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FileDiff.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FileDiff.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FileDiff.java Tue Aug 19 23:49:39 2014
@@ -17,13 +17,17 @@
  */
 package org.apache.hadoop.hdfs.server.namenode.snapshot;
 
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.hdfs.server.namenode.FSImageSerialization;
 import org.apache.hadoop.hdfs.server.namenode.INode;
 import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo;
 import org.apache.hadoop.hdfs.server.namenode.INodeFile;
 import org.apache.hadoop.hdfs.server.namenode.INodeFileAttributes;
 import org.apache.hadoop.hdfs.server.namenode.Quota;
-
-import java.util.List;
+import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotFSImageFormat.ReferenceMap;
 
 /**
  * The difference of an {@link INodeFile} between two snapshots.
@@ -67,6 +71,20 @@ public class FileDiff extends
   }
 
   @Override
+  void write(DataOutput out, ReferenceMap referenceMap) throws IOException {
+    writeSnapshot(out);
+    out.writeLong(fileSize);
+
+    // write snapshotINode
+    if (snapshotINode != null) {
+      out.writeBoolean(true);
+      FSImageSerialization.writeINodeFileAttributes(snapshotINode, out);
+    } else {
+      out.writeBoolean(false);
+    }
+  }
+
+  @Override
   Quota.Counts destroyDiffAndCollectBlocks(INodeFile currentINode,
       BlocksMapUpdateInfo collectedBlocks, final List<INode> removedINodes) {
     return currentINode.getFileWithSnapshotFeature()

Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FileWithSnapshotFeature.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FileWithSnapshotFeature.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FileWithSnapshotFeature.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FileWithSnapshotFeature.java Tue Aug 19 23:49:39 2014
@@ -25,6 +25,7 @@ import org.apache.hadoop.hdfs.server.blo
 import org.apache.hadoop.hdfs.server.namenode.INode;
 import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo;
 import org.apache.hadoop.hdfs.server.namenode.INodeFile;
+import org.apache.hadoop.hdfs.server.namenode.INodeFileAttributes;
 import org.apache.hadoop.hdfs.server.namenode.Quota;
 
 /**
@@ -73,7 +74,41 @@ public class FileWithSnapshotFeature imp
     }
     return max;
   }
-  
+
+  boolean changedBetweenSnapshots(INodeFile file, Snapshot from, Snapshot to) {
+    int[] diffIndexPair = diffs.changedBetweenSnapshots(from, to);
+    if (diffIndexPair == null) {
+      return false;
+    }
+    int earlierDiffIndex = diffIndexPair[0];
+    int laterDiffIndex = diffIndexPair[1];
+
+    final List<FileDiff> diffList = diffs.asList();
+    final long earlierLength = diffList.get(earlierDiffIndex).getFileSize();
+    final long laterLength = laterDiffIndex == diffList.size() ? file
+        .computeFileSize(true, false) : diffList.get(laterDiffIndex)
+        .getFileSize();
+    if (earlierLength != laterLength) { // file length has been changed
+      return true;
+    }
+
+    INodeFileAttributes earlierAttr = null; // check the metadata
+    for (int i = earlierDiffIndex; i < laterDiffIndex; i++) {
+      FileDiff diff = diffList.get(i);
+      if (diff.snapshotINode != null) {
+        earlierAttr = diff.snapshotINode;
+        break;
+      }
+    }
+    if (earlierAttr == null) { // no meta-change at all, return false
+      return false;
+    }
+    INodeFileAttributes laterAttr = diffs.getSnapshotINode(
+        Math.max(Snapshot.getSnapshotId(from), Snapshot.getSnapshotId(to)),
+        file);
+    return !earlierAttr.metadataEquals(laterAttr);
+  }
+
   public String getDetailedString() {
     return (isCurrentFileDeleted()? "(DELETED), ": ", ") + diffs;
   }
@@ -159,7 +194,7 @@ public class FileWithSnapshotFeature imp
         // resize the array.  
         final BlockInfo[] newBlocks;
         if (n == 0) {
-          newBlocks = null;
+          newBlocks = BlockInfo.EMPTY_ARRAY;
         } else {
           newBlocks = new BlockInfo[n];
           System.arraycopy(oldBlocks, 0, newBlocks, 0, n);

Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/Snapshot.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/Snapshot.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/Snapshot.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/Snapshot.java Tue Aug 19 23:49:39 2014
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hdfs.server.namenode.snapshot;
 
 import java.io.DataInput;
+import java.io.DataOutput;
 import java.io.IOException;
 import java.text.SimpleDateFormat;
 import java.util.Arrays;
@@ -30,10 +31,13 @@ import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.server.namenode.AclFeature;
 import org.apache.hadoop.hdfs.server.namenode.FSImageFormat;
+import org.apache.hadoop.hdfs.server.namenode.FSImageSerialization;
 import org.apache.hadoop.hdfs.server.namenode.INode;
 import org.apache.hadoop.hdfs.server.namenode.INodeDirectory;
+import org.apache.hadoop.hdfs.server.namenode.XAttrFeature;
 import org.apache.hadoop.hdfs.util.ReadOnlyList;
 
+import com.google.common.base.Predicate;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 
@@ -142,9 +146,20 @@ public class Snapshot implements Compara
   /** The root directory of the snapshot. */
   static public class Root extends INodeDirectory {
     Root(INodeDirectory other) {
-      // Always preserve ACL.
+      // Always preserve ACL, XAttr.
       super(other, false, Lists.newArrayList(
-        Iterables.filter(Arrays.asList(other.getFeatures()), AclFeature.class))
+        Iterables.filter(Arrays.asList(other.getFeatures()), new Predicate<Feature>() {
+
+          @Override
+          public boolean apply(Feature input) {
+            if (AclFeature.class.isInstance(input) 
+                || XAttrFeature.class.isInstance(input)) {
+              return true;
+            }
+            return false;
+          }
+          
+        }))
         .toArray(new Feature[0]));
     }
 
@@ -169,15 +184,14 @@ public class Snapshot implements Compara
   /** The root directory of the snapshot. */
   private final Root root;
 
-  Snapshot(int id, String name, INodeDirectorySnapshottable dir) {
+  Snapshot(int id, String name, INodeDirectory dir) {
     this(id, dir, dir);
     this.root.setLocalName(DFSUtil.string2Bytes(name));
   }
 
-  Snapshot(int id, INodeDirectory dir, INodeDirectorySnapshottable parent) {
+  Snapshot(int id, INodeDirectory dir, INodeDirectory parent) {
     this.id = id;
     this.root = new Root(dir);
-
     this.root.setParent(parent);
   }
   
@@ -214,4 +228,11 @@ public class Snapshot implements Compara
   public String toString() {
     return getClass().getSimpleName() + "." + root.getLocalName() + "(id=" + id + ")";
   }
+
+  /** Serialize the fields to out */
+  void write(DataOutput out) throws IOException {
+    out.writeInt(id);
+    // write root
+    FSImageSerialization.writeINodeDirectory(root, out);
+  }
 }

Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/SnapshotFSImageFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/SnapshotFSImageFormat.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/SnapshotFSImageFormat.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/SnapshotFSImageFormat.java Tue Aug 19 23:49:39 2014
@@ -29,21 +29,79 @@ import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.server.namenode.FSImageFormat;
 import org.apache.hadoop.hdfs.server.namenode.FSImageSerialization;
 import org.apache.hadoop.hdfs.server.namenode.INode;
+import org.apache.hadoop.hdfs.server.namenode.INodeAttributes;
 import org.apache.hadoop.hdfs.server.namenode.INodeDirectory;
 import org.apache.hadoop.hdfs.server.namenode.INodeDirectoryAttributes;
+import org.apache.hadoop.hdfs.server.namenode.INodeFile;
 import org.apache.hadoop.hdfs.server.namenode.INodeFileAttributes;
 import org.apache.hadoop.hdfs.server.namenode.INodeReference;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.DirectoryWithSnapshotFeature.DirectoryDiff;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.DirectoryWithSnapshotFeature.DirectoryDiffList;
 import org.apache.hadoop.hdfs.tools.snapshot.SnapshotDiff;
 import org.apache.hadoop.hdfs.util.Diff.ListType;
-import org.apache.hadoop.hdfs.server.namenode.FSImageFormat.Loader;
+import org.apache.hadoop.hdfs.util.ReadOnlyList;
+
+import com.google.common.base.Preconditions;
 
 /**
  * A helper class defining static methods for reading/writing snapshot related
  * information from/to FSImage.
  */
 public class SnapshotFSImageFormat {
+  /**
+   * Save snapshots and snapshot quota for a snapshottable directory.
+   * @param current The directory that the snapshots belongs to.
+   * @param out The {@link DataOutput} to write.
+   * @throws IOException
+   */
+  public static void saveSnapshots(INodeDirectory current, DataOutput out)
+      throws IOException {
+    DirectorySnapshottableFeature sf = current.getDirectorySnapshottableFeature();
+    Preconditions.checkArgument(sf != null);
+    // list of snapshots in snapshotsByNames
+    ReadOnlyList<Snapshot> snapshots = sf.getSnapshotList();
+    out.writeInt(snapshots.size());
+    for (Snapshot s : snapshots) {
+      // write the snapshot id
+      out.writeInt(s.getId());
+    }
+    // snapshot quota
+    out.writeInt(sf.getSnapshotQuota());
+  }
+
+  /**
+   * Save SnapshotDiff list for an INodeDirectoryWithSnapshot.
+   * @param sNode The directory that the SnapshotDiff list belongs to.
+   * @param out The {@link DataOutput} to write.
+   */
+  private static <N extends INode, A extends INodeAttributes, D extends AbstractINodeDiff<N, A, D>>
+      void saveINodeDiffs(final AbstractINodeDiffList<N, A, D> diffs,
+      final DataOutput out, ReferenceMap referenceMap) throws IOException {
+    // Record the diffs in reversed order, so that we can find the correct
+    // reference for INodes in the created list when loading the FSImage
+    if (diffs == null) {
+      out.writeInt(-1); // no diffs
+    } else {
+      final List<D> list = diffs.asList();
+      final int size = list.size();
+      out.writeInt(size);
+      for (int i = size - 1; i >= 0; i--) {
+        list.get(i).write(out, referenceMap);
+      }
+    }
+  }
+
+  public static void saveDirectoryDiffList(final INodeDirectory dir,
+      final DataOutput out, final ReferenceMap referenceMap
+      ) throws IOException {
+    saveINodeDiffs(dir.getDiffs(), out, referenceMap);
+  }
+
+  public static void saveFileDiffList(final INodeFile file,
+      final DataOutput out) throws IOException {
+    saveINodeDiffs(file.getDiffs(), out, null);
+  }
+
   public static FileDiffList loadFileDiffList(DataInput in,
       FSImageFormat.Loader loader) throws IOException {
     final int size = in.readInt();
@@ -162,19 +220,22 @@ public class SnapshotFSImageFormat {
    * @param loader
    *          The loader
    */
-  public static void loadSnapshotList(
-      INodeDirectorySnapshottable snapshottableParent, int numSnapshots,
-      DataInput in, FSImageFormat.Loader loader) throws IOException {
+  public static void loadSnapshotList(INodeDirectory snapshottableParent,
+      int numSnapshots, DataInput in, FSImageFormat.Loader loader)
+      throws IOException {
+    DirectorySnapshottableFeature sf = snapshottableParent
+        .getDirectorySnapshottableFeature();
+    Preconditions.checkArgument(sf != null);
     for (int i = 0; i < numSnapshots; i++) {
       // read snapshots
       final Snapshot s = loader.getSnapshot(in);
       s.getRoot().setParent(snapshottableParent);
-      snapshottableParent.addSnapshot(s);
+      sf.addSnapshot(s);
     }
     int snapshotQuota = in.readInt();
     snapshottableParent.setSnapshotQuota(snapshotQuota);
   }
-  
+
   /**
    * Load the {@link SnapshotDiff} list for the INodeDirectoryWithSnapshot
    * directory.
@@ -264,6 +325,23 @@ public class SnapshotFSImageFormat {
      * Used to record whether the subtree of the reference node has been saved 
      */
     private final Map<Long, Long> dirMap = new HashMap<Long, Long>();
+
+    public void writeINodeReferenceWithCount(
+        INodeReference.WithCount withCount, DataOutput out,
+        boolean writeUnderConstruction) throws IOException {
+      final INode referred = withCount.getReferredINode();
+      final long id = withCount.getId();
+      final boolean firstReferred = !referenceMap.containsKey(id);
+      out.writeBoolean(firstReferred);
+
+      if (firstReferred) {
+        FSImageSerialization.saveINode2Image(referred, out,
+            writeUnderConstruction, this);
+        referenceMap.put(id, withCount);
+      } else {
+        out.writeLong(id);
+      }
+    }
     
     public boolean toProcessSubtree(long id) {
       if (dirMap.containsKey(id)) {