You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by ss...@apache.org on 2012/10/16 02:03:59 UTC

svn commit: r1398581 [3/9] - in /hadoop/common/branches/MR-3902/hadoop-hdfs-project: hadoop-hdfs-httpfs/ hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/client/ hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/server/ hadoop-hdfs-h...

Propchange: hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/
------------------------------------------------------------------------------
  Merged /hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java:r1363593-1396941

Modified: hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/Hdfs.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/Hdfs.java?rev=1398581&r1=1398580&r2=1398581&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/Hdfs.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/Hdfs.java Tue Oct 16 00:02:55 2012
@@ -312,9 +312,6 @@ public class Hdfs extends AbstractFileSy
     return listing.toArray(new FileStatus[listing.size()]);
   }
 
-  /**
-   * {@inheritDoc}
-   */
   @Override
   public RemoteIterator<Path> listCorruptFileBlocks(Path path)
     throws IOException {
@@ -324,7 +321,7 @@ public class Hdfs extends AbstractFileSy
   @Override
   public void mkdir(Path dir, FsPermission permission, boolean createParent)
     throws IOException, UnresolvedLinkException {
-    dfs.mkdirs(getUriPath(dir), permission, createParent);
+    dfs.primitiveMkdir(getUriPath(dir), permission, createParent);
   }
 
   @SuppressWarnings("deprecation")

Modified: hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/HdfsVolumeId.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/HdfsVolumeId.java?rev=1398581&r1=1398580&r2=1398581&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/HdfsVolumeId.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/HdfsVolumeId.java Tue Oct 16 00:02:55 2012
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.fs;
 
+import org.apache.commons.codec.binary.Base64;
 import org.apache.commons.lang.builder.EqualsBuilder;
 import org.apache.commons.lang.builder.HashCodeBuilder;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -31,10 +32,10 @@ import org.apache.hadoop.classification.
 @InterfaceAudience.Public
 public class HdfsVolumeId implements VolumeId {
 
-  private final byte id;
+  private final byte[] id;
   private final boolean isValid;
 
-  public HdfsVolumeId(byte id, boolean isValid) {
+  public HdfsVolumeId(byte[] id, boolean isValid) {
     this.id = id;
     this.isValid = isValid;
   }
@@ -69,6 +70,6 @@ public class HdfsVolumeId implements Vol
 
   @Override
   public String toString() {
-    return Byte.toString(id);
+    return Base64.encodeBase64String(id);
   }
 }

Modified: hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockStorageLocationUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockStorageLocationUtil.java?rev=1398581&r1=1398580&r2=1398581&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockStorageLocationUtil.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockStorageLocationUtil.java Tue Oct 16 00:02:55 2012
@@ -19,6 +19,7 @@
 package org.apache.hadoop.hdfs;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -201,7 +202,7 @@ class BlockStorageLocationUtil {
       ArrayList<VolumeId> l = new ArrayList<VolumeId>(b.getLocations().length);
       // Start off all IDs as invalid, fill it in later with results from RPCs
       for (int i = 0; i < b.getLocations().length; i++) {
-        l.add(new HdfsVolumeId((byte)-1, false));
+        l.add(new HdfsVolumeId(null, false));
       }
       blockVolumeIds.put(b, l);
     }
@@ -234,8 +235,8 @@ class BlockStorageLocationUtil {
         }
         // Get the VolumeId by indexing into the list of VolumeIds
         // provided by the datanode
-        HdfsVolumeId id = new HdfsVolumeId(metaVolumeIds.get(volumeIndex)[0],
-            true);
+        byte[] volumeId = metaVolumeIds.get(volumeIndex);
+        HdfsVolumeId id = new HdfsVolumeId(volumeId, true);
         // Find out which index we are in the LocatedBlock's replicas
         LocatedBlock locBlock = extBlockToLocBlock.get(extBlock);
         DatanodeInfo[] dnInfos = locBlock.getLocations();
@@ -255,8 +256,8 @@ class BlockStorageLocationUtil {
         }
         // Place VolumeId at the same index as the DN's index in the list of
         // replicas
-        List<VolumeId> VolumeIds = blockVolumeIds.get(locBlock);
-        VolumeIds.set(index, id);
+        List<VolumeId> volumeIds = blockVolumeIds.get(locBlock);
+        volumeIds.set(index, id);
       }
     }
     return blockVolumeIds;

Modified: hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java?rev=1398581&r1=1398580&r2=1398581&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java Tue Oct 16 00:02:55 2012
@@ -39,6 +39,8 @@ import static org.apache.hadoop.hdfs.DFS
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_RETRY_WINDOW_BASE;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADER;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADER_DEFAULT;
@@ -77,6 +79,7 @@ import javax.net.SocketFactory;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.BlockStorageLocation;
@@ -89,7 +92,9 @@ import org.apache.hadoop.fs.FsServerDefa
 import org.apache.hadoop.fs.FsStatus;
 import org.apache.hadoop.fs.HdfsBlockLocation;
 import org.apache.hadoop.fs.InvalidPathException;
+import org.apache.hadoop.fs.MD5MD5CRC32CastagnoliFileChecksum;
 import org.apache.hadoop.fs.MD5MD5CRC32FileChecksum;
+import org.apache.hadoop.fs.MD5MD5CRC32GzipFileChecksum;
 import org.apache.hadoop.fs.Options;
 import org.apache.hadoop.fs.Options.ChecksumOpt;
 import org.apache.hadoop.fs.ParentNotDirectoryException;
@@ -206,6 +211,7 @@ public class DFSClient implements java.i
     final int writePacketSize;
     final int socketTimeout;
     final int socketCacheCapacity;
+    final long socketCacheExpiry;
     /** Wait time window (in msec) if BlockMissingException is caught */
     final int timeWindow;
     final int nCachedConnRetry;
@@ -254,6 +260,8 @@ public class DFSClient implements java.i
       taskId = conf.get("mapreduce.task.attempt.id", "NONMAPREDUCE");
       socketCacheCapacity = conf.getInt(DFS_CLIENT_SOCKET_CACHE_CAPACITY_KEY,
           DFS_CLIENT_SOCKET_CACHE_CAPACITY_DEFAULT);
+      socketCacheExpiry = conf.getLong(DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_KEY,
+          DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_DEFAULT);
       prefetchSize = conf.getLong(DFS_CLIENT_READ_PREFETCH_SIZE_KEY,
           10 * defaultBlockSize);
       timeWindow = conf
@@ -424,7 +432,7 @@ public class DFSClient implements java.i
       Joiner.on(',').join(localInterfaceAddrs) + "]");
     }
     
-    this.socketCache = new SocketCache(dfsClientConf.socketCacheCapacity);
+    this.socketCache = SocketCache.getInstance(dfsClientConf.socketCacheCapacity, dfsClientConf.socketCacheExpiry);
   }
 
   /**
@@ -531,7 +539,7 @@ public class DFSClient implements java.i
    *  until the first output stream is created. The same instance will
    *  be returned until all output streams are closed.
    */
-  public synchronized LeaseRenewer getLeaseRenewer() throws IOException {
+  public LeaseRenewer getLeaseRenewer() throws IOException {
       return LeaseRenewer.getInstance(authority, ugi, this);
   }
 
@@ -638,7 +646,6 @@ public class DFSClient implements java.i
   void abort() {
     clientRunning = false;
     closeAllFilesBeingWritten(true);
-    socketCache.clear();
 
     try {
       // remove reference to this client and stop the renewer,
@@ -685,7 +692,6 @@ public class DFSClient implements java.i
   public synchronized void close() throws IOException {
     if(clientRunning) {
       closeAllFilesBeingWritten(false);
-      socketCache.clear();
       clientRunning = false;
       getLeaseRenewer().closeClient(this);
       // close connections to the namenode
@@ -694,6 +700,17 @@ public class DFSClient implements java.i
   }
 
   /**
+   * Close all open streams, abandoning all of the leases and files being
+   * created.
+   * @param abort whether streams should be gracefully closed
+   */
+  public void closeOutputStreams(boolean abort) {
+    if (clientRunning) {
+      closeAllFilesBeingWritten(abort);
+    }
+  }
+
+  /**
    * Get the default block size for this cluster
    * @return the default block size in bytes
    */
@@ -1603,7 +1620,8 @@ public class DFSClient implements java.i
     }
     List<LocatedBlock> locatedblocks = blockLocations.getLocatedBlocks();
     final DataOutputBuffer md5out = new DataOutputBuffer();
-    int bytesPerCRC = 0;
+    int bytesPerCRC = -1;
+    DataChecksum.Type crcType = DataChecksum.Type.DEFAULT;
     long crcPerBlock = 0;
     boolean refetchBlocks = false;
     int lastRetriedIndex = -1;
@@ -1707,6 +1725,17 @@ public class DFSClient implements java.i
               checksumData.getMd5().toByteArray());
           md5.write(md5out);
           
+          // read crc-type
+          final DataChecksum.Type ct = HdfsProtoUtil.
+              fromProto(checksumData.getCrcType());
+          if (i == 0) { // first block
+            crcType = ct;
+          } else if (crcType != DataChecksum.Type.MIXED
+              && crcType != ct) {
+            // if crc types are mixed in a file
+            crcType = DataChecksum.Type.MIXED;
+          }
+
           done = true;
 
           if (LOG.isDebugEnabled()) {
@@ -1732,7 +1761,18 @@ public class DFSClient implements java.i
 
     //compute file MD5
     final MD5Hash fileMD5 = MD5Hash.digest(md5out.getData()); 
-    return new MD5MD5CRC32FileChecksum(bytesPerCRC, crcPerBlock, fileMD5);
+    switch (crcType) {
+      case CRC32:
+        return new MD5MD5CRC32GzipFileChecksum(bytesPerCRC,
+            crcPerBlock, fileMD5);
+      case CRC32C:
+        return new MD5MD5CRC32CastagnoliFileChecksum(bytesPerCRC,
+            crcPerBlock, fileMD5);
+      default:
+        // we should never get here since the validity was checked
+        // when getCrcType() was called above.
+        return null;
+    }
   }
 
   /**
@@ -1845,6 +1885,20 @@ public class DFSClient implements java.i
       throw re.unwrapRemoteException(AccessControlException.class);
     }
   }
+
+  /**
+   * Rolls the edit log on the active NameNode.
+   * @return the txid of the new log segment 
+   *
+   * @see ClientProtocol#rollEdits()
+   */
+  long rollEdits() throws AccessControlException, IOException {
+    try {
+      return namenode.rollEdits();
+    } catch(RemoteException re) {
+      throw re.unwrapRemoteException(AccessControlException.class);
+    }
+  }
   
   /**
    * enable/disable restore failed storage.
@@ -1917,34 +1971,29 @@ public class DFSClient implements java.i
    */
   public boolean mkdirs(String src, FsPermission permission,
       boolean createParent) throws IOException {
-    checkOpen();
     if (permission == null) {
       permission = FsPermission.getDefault();
     }
     FsPermission masked = permission.applyUMask(dfsClientConf.uMask);
-    if(LOG.isDebugEnabled()) {
-      LOG.debug(src + ": masked=" + masked);
-    }
-    try {
-      return namenode.mkdirs(src, masked, createParent);
-    } catch(RemoteException re) {
-      throw re.unwrapRemoteException(AccessControlException.class,
-                                     InvalidPathException.class,
-                                     FileAlreadyExistsException.class,
-                                     FileNotFoundException.class,
-                                     ParentNotDirectoryException.class,
-                                     SafeModeException.class,
-                                     NSQuotaExceededException.class,
-                                     UnresolvedPathException.class);
-    }
+    return primitiveMkdir(src, masked, createParent);
   }
-  
+
   /**
    * Same {{@link #mkdirs(String, FsPermission, boolean)} except
    * that the permissions has already been masked against umask.
    */
   public boolean primitiveMkdir(String src, FsPermission absPermission)
     throws IOException {
+    return primitiveMkdir(src, absPermission, true);
+  }
+
+  /**
+   * Same {{@link #mkdirs(String, FsPermission, boolean)} except
+   * that the permissions has already been masked against umask.
+   */
+  public boolean primitiveMkdir(String src, FsPermission absPermission, 
+    boolean createParent)
+    throws IOException {
     checkOpen();
     if (absPermission == null) {
       absPermission = 
@@ -1955,15 +2004,20 @@ public class DFSClient implements java.i
       LOG.debug(src + ": masked=" + absPermission);
     }
     try {
-      return namenode.mkdirs(src, absPermission, true);
+      return namenode.mkdirs(src, absPermission, createParent);
     } catch(RemoteException re) {
       throw re.unwrapRemoteException(AccessControlException.class,
+                                     InvalidPathException.class,
+                                     FileAlreadyExistsException.class,
+                                     FileNotFoundException.class,
+                                     ParentNotDirectoryException.class,
+                                     SafeModeException.class,
                                      NSQuotaExceededException.class,
                                      DSQuotaExceededException.class,
                                      UnresolvedPathException.class);
     }
   }
-
+  
   /**
    * Get {@link ContentSummary} rooted at the specified directory.
    * @param path The string representation of the path
@@ -2035,10 +2089,7 @@ public class DFSClient implements java.i
   }
   
   boolean shouldTryShortCircuitRead(InetSocketAddress targetAddr) {
-    if (shortCircuitLocalReads && isLocalAddress(targetAddr)) {
-      return true;
-    }
-    return false;
+    return shortCircuitLocalReads && isLocalAddress(targetAddr);
   }
 
   void reportChecksumFailure(String file, ExtendedBlock blk, DatanodeInfo dn) {

Modified: hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java?rev=1398581&r1=1398580&r2=1398581&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java Tue Oct 16 00:02:55 2012
@@ -74,6 +74,8 @@ public class DFSConfigKeys extends Commo
   public static final String  DFS_CLIENT_FAILOVER_CONNECTION_RETRIES_ON_SOCKET_TIMEOUTS_KEY = "dfs.client.failover.connection.retries.on.timeouts";
   public static final int     DFS_CLIENT_FAILOVER_CONNECTION_RETRIES_ON_SOCKET_TIMEOUTS_DEFAULT = 0;
   
+  public static final String  DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_KEY = "dfs.client.socketcache.expiryMsec";
+  public static final long    DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_DEFAULT = 2 * 60 * 1000;
   public static final String  DFS_NAMENODE_BACKUP_ADDRESS_KEY = "dfs.namenode.backup.address";
   public static final String  DFS_NAMENODE_BACKUP_ADDRESS_DEFAULT = "localhost:50100";
   public static final String  DFS_NAMENODE_BACKUP_HTTP_ADDRESS_KEY = "dfs.namenode.backup.http-address";
@@ -174,6 +176,25 @@ public class DFSConfigKeys extends Commo
   public static final boolean DFS_DATANODE_SYNCONCLOSE_DEFAULT = false;
   public static final String  DFS_DATANODE_SOCKET_REUSE_KEEPALIVE_KEY = "dfs.datanode.socket.reuse.keepalive";
   public static final int     DFS_DATANODE_SOCKET_REUSE_KEEPALIVE_DEFAULT = 1000;
+  
+  // Whether to enable datanode's stale state detection and usage
+  public static final String DFS_NAMENODE_CHECK_STALE_DATANODE_KEY = "dfs.namenode.check.stale.datanode";
+  public static final boolean DFS_NAMENODE_CHECK_STALE_DATANODE_DEFAULT = false;
+  // Whether to enable datanode's stale state detection and usage
+  public static final String DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_KEY = "dfs.namenode.avoid.write.stale.datanode";
+  public static final boolean DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_DEFAULT = false;
+  // The default value of the time interval for marking datanodes as stale
+  public static final String DFS_NAMENODE_STALE_DATANODE_INTERVAL_KEY = "dfs.namenode.stale.datanode.interval";
+  public static final long DFS_NAMENODE_STALE_DATANODE_INTERVAL_DEFAULT = 30 * 1000; // 30s
+  // The stale interval cannot be too small since otherwise this may cause too frequent churn on stale states. 
+  // This value uses the times of heartbeat interval to define the minimum value for stale interval.  
+  public static final String DFS_NAMENODE_STALE_DATANODE_MINIMUM_INTERVAL_KEY = "dfs.namenode.stale.datanode.minimum.interval";
+  public static final int DFS_NAMENODE_STALE_DATANODE_MINIMUM_INTERVAL_DEFAULT = 3; // i.e. min_interval is 3 * heartbeat_interval = 9s
+  
+  // When the number stale datanodes marked as stale reached this certian ratio, 
+  // stop avoiding writing to stale nodes so as to prevent causing hotspots.
+  public static final String DFS_NAMENODE_USE_STALE_DATANODE_FOR_WRITE_RATIO_KEY = "dfs.namenode.write.stale.datanode.ratio";
+  public static final float DFS_NAMENODE_USE_STALE_DATANODE_FOR_WRITE_RATIO_DEFAULT = 0.5f;
 
   // Replication monitoring related keys
   public static final String DFS_NAMENODE_INVALIDATE_WORK_PCT_PER_ITERATION =
@@ -329,6 +350,10 @@ public class DFSConfigKeys extends Commo
                                            "dfs.image.transfer.bandwidthPerSec";
   public static final long DFS_IMAGE_TRANSFER_RATE_DEFAULT = 0;  //no throttling
 
+  // Image transfer timeout
+  public static final String DFS_IMAGE_TRANSFER_TIMEOUT_KEY = "dfs.image.transfer.timeout";
+  public static final int DFS_IMAGE_TRANSFER_TIMEOUT_DEFAULT = 60 * 1000;
+
   //Keys with no defaults
   public static final String  DFS_DATANODE_PLUGINS_KEY = "dfs.datanode.plugins";
   public static final String  DFS_DATANODE_FSDATASET_FACTORY_KEY = "dfs.datanode.fsdataset.factory";
@@ -382,4 +407,42 @@ public class DFSConfigKeys extends Commo
   public static final String DFS_ENCRYPT_DATA_TRANSFER_KEY = "dfs.encrypt.data.transfer";
   public static final boolean DFS_ENCRYPT_DATA_TRANSFER_DEFAULT = false;
   public static final String DFS_DATA_ENCRYPTION_ALGORITHM_KEY = "dfs.encrypt.data.transfer.algorithm";
+  
+  // Journal-node related configs. These are read on the JN side.
+  public static final String  DFS_JOURNALNODE_EDITS_DIR_KEY = "dfs.journalnode.edits.dir";
+  public static final String  DFS_JOURNALNODE_EDITS_DIR_DEFAULT = "/tmp/hadoop/dfs/journalnode/";
+  public static final String  DFS_JOURNALNODE_RPC_ADDRESS_KEY = "dfs.journalnode.rpc-address";
+  public static final int     DFS_JOURNALNODE_RPC_PORT_DEFAULT = 8485;
+  public static final String  DFS_JOURNALNODE_RPC_ADDRESS_DEFAULT = "0.0.0.0:" + DFS_JOURNALNODE_RPC_PORT_DEFAULT;
+    
+  public static final String  DFS_JOURNALNODE_HTTP_ADDRESS_KEY = "dfs.journalnode.http-address";
+  public static final int     DFS_JOURNALNODE_HTTP_PORT_DEFAULT = 8480;
+  public static final String  DFS_JOURNALNODE_HTTP_ADDRESS_DEFAULT = "0.0.0.0:" + DFS_JOURNALNODE_HTTP_PORT_DEFAULT;
+
+  public static final String  DFS_JOURNALNODE_KEYTAB_FILE_KEY = "dfs.journalnode.keytab.file";
+  public static final String  DFS_JOURNALNODE_USER_NAME_KEY = "dfs.journalnode.kerberos.principal";
+  public static final String  DFS_JOURNALNODE_INTERNAL_SPNEGO_USER_NAME_KEY = "dfs.journalnode.kerberos.internal.spnego.principal";
+
+  // Journal-node related configs for the client side.
+  public static final String  DFS_QJOURNAL_QUEUE_SIZE_LIMIT_KEY = "dfs.qjournal.queued-edits.limit.mb";
+  public static final int     DFS_QJOURNAL_QUEUE_SIZE_LIMIT_DEFAULT = 10;
+  
+  // Quorum-journal timeouts for various operations. Unlikely to need
+  // to be tweaked, but configurable just in case.
+  public static final String  DFS_QJOURNAL_START_SEGMENT_TIMEOUT_KEY = "dfs.qjournal.start-segment.timeout.ms";
+  public static final String  DFS_QJOURNAL_PREPARE_RECOVERY_TIMEOUT_KEY = "dfs.qjournal.prepare-recovery.timeout.ms";
+  public static final String  DFS_QJOURNAL_ACCEPT_RECOVERY_TIMEOUT_KEY = "dfs.qjournal.accept-recovery.timeout.ms";
+  public static final String  DFS_QJOURNAL_FINALIZE_SEGMENT_TIMEOUT_KEY = "dfs.qjournal.finalize-segment.timeout.ms";
+  public static final String  DFS_QJOURNAL_SELECT_INPUT_STREAMS_TIMEOUT_KEY = "dfs.qjournal.select-input-streams.timeout.ms";
+  public static final String  DFS_QJOURNAL_GET_JOURNAL_STATE_TIMEOUT_KEY = "dfs.qjournal.get-journal-state.timeout.ms";
+  public static final String  DFS_QJOURNAL_NEW_EPOCH_TIMEOUT_KEY = "dfs.qjournal.new-epoch.timeout.ms";
+  public static final String  DFS_QJOURNAL_WRITE_TXNS_TIMEOUT_KEY = "dfs.qjournal.write-txns.timeout.ms";
+  public static final int     DFS_QJOURNAL_START_SEGMENT_TIMEOUT_DEFAULT = 20000;
+  public static final int     DFS_QJOURNAL_PREPARE_RECOVERY_TIMEOUT_DEFAULT = 120000;
+  public static final int     DFS_QJOURNAL_ACCEPT_RECOVERY_TIMEOUT_DEFAULT = 120000;
+  public static final int     DFS_QJOURNAL_FINALIZE_SEGMENT_TIMEOUT_DEFAULT = 120000;
+  public static final int     DFS_QJOURNAL_SELECT_INPUT_STREAMS_TIMEOUT_DEFAULT = 20000;
+  public static final int     DFS_QJOURNAL_GET_JOURNAL_STATE_TIMEOUT_DEFAULT = 120000;
+  public static final int     DFS_QJOURNAL_NEW_EPOCH_TIMEOUT_DEFAULT = 120000;
+  public static final int     DFS_QJOURNAL_WRITE_TXNS_TIMEOUT_DEFAULT = 20000;
 }

Modified: hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java?rev=1398581&r1=1398580&r2=1398581&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java Tue Oct 16 00:02:55 2012
@@ -243,6 +243,10 @@ public class DFSInputStream extends FSIn
         locatedBlocks.getFileLength() + lastBlockBeingWrittenLength;
   }
 
+  private synchronized boolean blockUnderConstruction() {
+    return locatedBlocks.isUnderConstruction();
+  }
+
   /**
    * Returns the datanode from which the stream is currently reading.
    */
@@ -878,7 +882,9 @@ public class DFSInputStream extends FSIn
                                        String clientName)
       throws IOException {
     
-    if (dfsClient.shouldTryShortCircuitRead(dnAddr)) {
+    // Can't local read a block under construction, see HDFS-2757
+    if (dfsClient.shouldTryShortCircuitRead(dnAddr) &&
+        !blockUnderConstruction()) {
       return DFSClient.getLocalBlockReader(dfsClient.conf, src, block,
           blockToken, chosenNode, dfsClient.hdfsTimeout, startOffset,
           dfsClient.connectToDnViaHostname());

Modified: hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java?rev=1398581&r1=1398580&r2=1398581&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java Tue Oct 16 00:02:55 2012
@@ -56,8 +56,8 @@ import org.apache.hadoop.hdfs.protocol.L
 import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
 import org.apache.hadoop.hdfs.protocol.UnresolvedPathException;
 import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
-import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol;
 import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferEncryptor;
+import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol;
 import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
 import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
 import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
@@ -107,8 +107,8 @@ import com.google.common.annotations.Vis
 ****************************************************************/
 @InterfaceAudience.Private
 public class DFSOutputStream extends FSOutputSummer implements Syncable {
-  private final DFSClient dfsClient;
   private static final int MAX_PACKETS = 80; // each packet 64K, total 5MB
+  private final DFSClient dfsClient;
   private Socket s;
   // closed is accessed by different threads under different locks.
   private volatile boolean closed = false;
@@ -138,15 +138,15 @@ public class DFSOutputStream extends FSO
   private final short blockReplication; // replication factor of file
   private boolean shouldSyncBlock = false; // force blocks to disk upon close
   
-  private class Packet {
-    long    seqno;               // sequencenumber of buffer in block
-    long    offsetInBlock;       // offset in block
-    private boolean lastPacketInBlock;   // is this the last packet in block?
-    boolean syncBlock;          // this packet forces the current block to disk
-    int     numChunks;           // number of chunks currently in packet
-    int     maxChunks;           // max chunks in packet
-
+  private static class Packet {
+    private static final long HEART_BEAT_SEQNO = -1L;
+    long seqno; // sequencenumber of buffer in block
+    final long offsetInBlock; // offset in block
+    boolean syncBlock; // this packet forces the current block to disk
+    int numChunks; // number of chunks currently in packet
+    final int maxChunks; // max chunks in packet
     byte[]  buf;
+    private boolean lastPacketInBlock; // is this the last packet in block?
 
     /**
      * buf is pointed into like follows:
@@ -164,45 +164,36 @@ public class DFSOutputStream extends FSO
      */
     int checksumStart;
     int checksumPos;
-    int dataStart;
+    final int dataStart;
     int dataPos;
 
-    private static final long HEART_BEAT_SEQNO = -1L;
-
     /**
      * Create a heartbeat packet.
      */
-    Packet() {
-      this.lastPacketInBlock = false;
-      this.numChunks = 0;
-      this.offsetInBlock = 0;
-      this.seqno = HEART_BEAT_SEQNO;
-      
-      buf = new byte[PacketHeader.PKT_MAX_HEADER_LEN];
-      
-      checksumStart = checksumPos = dataPos = dataStart = PacketHeader.PKT_MAX_HEADER_LEN;
-      maxChunks = 0;
+    Packet(int checksumSize) {
+      this(0, 0, 0, HEART_BEAT_SEQNO, checksumSize);
     }
     
     /**
      * Create a new packet.
      * 
-     * @param pktSize maximum size of the packet, including checksum data and actual data.
+     * @param pktSize maximum size of the packet, 
+     *                including checksum data and actual data.
      * @param chunksPerPkt maximum number of chunks per packet.
      * @param offsetInBlock offset in bytes into the HDFS block.
      */
-    Packet(int pktSize, int chunksPerPkt, long offsetInBlock) {
+    Packet(int pktSize, int chunksPerPkt, long offsetInBlock, 
+                              long seqno, int checksumSize) {
       this.lastPacketInBlock = false;
       this.numChunks = 0;
       this.offsetInBlock = offsetInBlock;
-      this.seqno = currentSeqno;
-      currentSeqno++;
+      this.seqno = seqno;
       
       buf = new byte[PacketHeader.PKT_MAX_HEADER_LEN + pktSize];
       
       checksumStart = PacketHeader.PKT_MAX_HEADER_LEN;
       checksumPos = checksumStart;
-      dataStart = checksumStart + (chunksPerPkt * checksum.getChecksumSize());
+      dataStart = checksumStart + (chunksPerPkt * checksumSize);
       dataPos = dataStart;
       maxChunks = chunksPerPkt;
     }
@@ -412,6 +403,7 @@ public class DFSOutputStream extends FSO
             response.join();
             response = null;
           } catch (InterruptedException  e) {
+            DFSClient.LOG.warn("Caught exception ", e);
           }
         }
 
@@ -439,6 +431,7 @@ public class DFSOutputStream extends FSO
               try {
                 dataQueue.wait(timeout);
               } catch (InterruptedException  e) {
+                DFSClient.LOG.warn("Caught exception ", e);
               }
               doSleep = false;
               now = Time.now();
@@ -448,7 +441,7 @@ public class DFSOutputStream extends FSO
             }
             // get packet to be sent.
             if (dataQueue.isEmpty()) {
-              one = new Packet();  // heartbeat packet
+              one = new Packet(checksum.getChecksumSize());  // heartbeat packet
             } else {
               one = dataQueue.getFirst(); // regular data packet
             }
@@ -488,6 +481,7 @@ public class DFSOutputStream extends FSO
                   // wait for acks to arrive from datanodes
                   dataQueue.wait(1000);
                 } catch (InterruptedException  e) {
+                  DFSClient.LOG.warn("Caught exception ", e);
                 }
               }
             }
@@ -518,7 +512,7 @@ public class DFSOutputStream extends FSO
             blockStream.flush();   
           } catch (IOException e) {
             // HDFS-3398 treat primary DN is down since client is unable to 
-            // write to primary DN 
+            // write to primary DN
             errorIndex = 0;
             throw e;
           }
@@ -607,6 +601,7 @@ public class DFSOutputStream extends FSO
           response.close();
           response.join();
         } catch (InterruptedException  e) {
+          DFSClient.LOG.warn("Caught exception ", e);
         } finally {
           response = null;
         }
@@ -1178,6 +1173,7 @@ public class DFSOutputStream extends FSO
                   Thread.sleep(sleeptime);
                   sleeptime *= 2;
                 } catch (InterruptedException ie) {
+                  DFSClient.LOG.warn("Caught exception ", ie);
                 }
               }
             } else {
@@ -1421,7 +1417,7 @@ public class DFSOutputStream extends FSO
 
     if (currentPacket == null) {
       currentPacket = new Packet(packetSize, chunksPerPacket, 
-          bytesCurBlock);
+          bytesCurBlock, currentSeqno++, this.checksum.getChecksumSize());
       if (DFSClient.LOG.isDebugEnabled()) {
         DFSClient.LOG.debug("DFSClient writeChunk allocating new packet seqno=" + 
             currentPacket.seqno +
@@ -1468,7 +1464,8 @@ public class DFSOutputStream extends FSO
       // indicate the end of block and reset bytesCurBlock.
       //
       if (bytesCurBlock == blockSize) {
-        currentPacket = new Packet(0, 0, bytesCurBlock);
+        currentPacket = new Packet(0, 0, bytesCurBlock, 
+            currentSeqno++, this.checksum.getChecksumSize());
         currentPacket.lastPacketInBlock = true;
         currentPacket.syncBlock = shouldSyncBlock;
         waitAndQueueCurrentPacket();
@@ -1540,7 +1537,7 @@ public class DFSOutputStream extends FSO
             // but sync was requested.
             // Send an empty packet
             currentPacket = new Packet(packetSize, chunksPerPacket,
-                bytesCurBlock);
+                bytesCurBlock, currentSeqno++, this.checksum.getChecksumSize());
           }
         } else {
           // We already flushed up to this offset.
@@ -1557,7 +1554,7 @@ public class DFSOutputStream extends FSO
             // and sync was requested.
             // So send an empty sync packet.
             currentPacket = new Packet(packetSize, chunksPerPacket,
-                bytesCurBlock);
+                bytesCurBlock, currentSeqno++, this.checksum.getChecksumSize());
           } else {
             // just discard the current packet since it is already been sent.
             currentPacket = null;
@@ -1738,7 +1735,8 @@ public class DFSOutputStream extends FSO
 
       if (bytesCurBlock != 0) {
         // send an empty packet to mark the end of the block
-        currentPacket = new Packet(0, 0, bytesCurBlock);
+        currentPacket = new Packet(0, 0, bytesCurBlock, 
+            currentSeqno++, this.checksum.getChecksumSize());
         currentPacket.lastPacketInBlock = true;
         currentPacket.syncBlock = shouldSyncBlock;
       }
@@ -1778,6 +1776,7 @@ public class DFSOutputStream extends FSO
             DFSClient.LOG.info("Could not complete file " + src + " retrying...");
           }
         } catch (InterruptedException ie) {
+          DFSClient.LOG.warn("Caught exception ", ie);
         }
       }
     }

Modified: hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java?rev=1398581&r1=1398580&r2=1398581&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java Tue Oct 16 00:02:55 2012
@@ -75,6 +75,7 @@ import org.apache.hadoop.ipc.ProtobufRpc
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.net.NodeBase;
+import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.ToolRunner;
@@ -127,6 +128,43 @@ public class DFSUtil {
           a.isDecommissioned() ? 1 : -1;
       }
     };
+    
+      
+  /**
+   * Comparator for sorting DataNodeInfo[] based on decommissioned/stale states.
+   * Decommissioned/stale nodes are moved to the end of the array on sorting
+   * with this compartor.
+   */ 
+  @InterfaceAudience.Private 
+  public static class DecomStaleComparator implements Comparator<DatanodeInfo> {
+    private long staleInterval;
+
+    /**
+     * Constructor of DecomStaleComparator
+     * 
+     * @param interval
+     *          The time invertal for marking datanodes as stale is passed from
+     *          outside, since the interval may be changed dynamically
+     */
+    public DecomStaleComparator(long interval) {
+      this.staleInterval = interval;
+    }
+
+    @Override
+    public int compare(DatanodeInfo a, DatanodeInfo b) {
+      // Decommissioned nodes will still be moved to the end of the list
+      if (a.isDecommissioned()) {
+        return b.isDecommissioned() ? 0 : 1;
+      } else if (b.isDecommissioned()) {
+        return -1;
+      }
+      // Stale nodes will be moved behind the normal nodes
+      boolean aStale = a.isStale(staleInterval);
+      boolean bStale = b.isStale(staleInterval);
+      return aStale == bStale ? 0 : (aStale ? 1 : -1);
+    }
+  }    
+    
   /**
    * Address matcher for matching an address to local address
    */
@@ -451,6 +489,34 @@ public class DFSUtil {
   }
 
   /**
+   * @return a collection of all configured NN Kerberos principals.
+   */
+  public static Set<String> getAllNnPrincipals(Configuration conf) throws IOException {
+    Set<String> principals = new HashSet<String>();
+    for (String nsId : DFSUtil.getNameServiceIds(conf)) {
+      if (HAUtil.isHAEnabled(conf, nsId)) {
+        for (String nnId : DFSUtil.getNameNodeIds(conf, nsId)) {
+          Configuration confForNn = new Configuration(conf);
+          NameNode.initializeGenericKeys(confForNn, nsId, nnId);
+          String principal = SecurityUtil.getServerPrincipal(confForNn
+              .get(DFSConfigKeys.DFS_NAMENODE_USER_NAME_KEY),
+              NameNode.getAddress(confForNn).getHostName());
+          principals.add(principal);
+        }
+      } else {
+        Configuration confForNn = new Configuration(conf);
+        NameNode.initializeGenericKeys(confForNn, nsId, null);
+        String principal = SecurityUtil.getServerPrincipal(confForNn
+            .get(DFSConfigKeys.DFS_NAMENODE_USER_NAME_KEY),
+            NameNode.getAddress(confForNn).getHostName());
+        principals.add(principal);
+      }
+    }
+
+    return principals;
+  }
+
+  /**
    * Returns list of InetSocketAddress corresponding to HA NN RPC addresses from
    * the configuration.
    * 

Modified: hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java?rev=1398581&r1=1398580&r2=1398581&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java Tue Oct 16 00:02:55 2012
@@ -535,10 +535,10 @@ public class DistributedFileSystem exten
   @Override
   public void close() throws IOException {
     try {
-      super.processDeleteOnExit();
-      dfs.close();
-    } finally {
+      dfs.closeOutputStreams(false);
       super.close();
+    } finally {
+      dfs.close();
     }
   }
 
@@ -624,6 +624,16 @@ public class DistributedFileSystem exten
   public void saveNamespace() throws AccessControlException, IOException {
     dfs.saveNamespace();
   }
+  
+  /**
+   * Rolls the edit log on the active NameNode.
+   * Requires super-user privileges.
+   * @see org.apache.hadoop.hdfs.protocol.ClientProtocol#rollEdits()
+   * @return the transaction ID of the newly created segment
+   */
+  public long rollEdits() throws AccessControlException, IOException {
+    return dfs.rollEdits();
+  }
 
   /**
    * enable/disable/check restoreFaileStorage

Modified: hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HDFSPolicyProvider.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HDFSPolicyProvider.java?rev=1398581&r1=1398580&r2=1398581&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HDFSPolicyProvider.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HDFSPolicyProvider.java Tue Oct 16 00:02:55 2012
@@ -23,6 +23,7 @@ import org.apache.hadoop.ha.HAServicePro
 import org.apache.hadoop.ha.ZKFCProtocol;
 import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocol;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
@@ -46,6 +47,7 @@ public class HDFSPolicyProvider extends 
     new Service("security.inter.datanode.protocol.acl", 
                 InterDatanodeProtocol.class),
     new Service("security.namenode.protocol.acl", NamenodeProtocol.class),
+    new Service("security.qjournal.service.protocol.acl", QJournalProtocol.class),
     new Service(CommonConfigurationKeys.SECURITY_HA_SERVICE_PROTOCOL_ACL,
         HAServiceProtocol.class),
     new Service(CommonConfigurationKeys.SECURITY_ZKFC_PROTOCOL_ACL,

Modified: hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HdfsConfiguration.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HdfsConfiguration.java?rev=1398581&r1=1398580&r2=1398581&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HdfsConfiguration.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HdfsConfiguration.java Tue Oct 16 00:02:55 2012
@@ -105,4 +105,9 @@ public class HdfsConfiguration extends C
     deprecate("dfs.federation.nameservices", DFSConfigKeys.DFS_NAMESERVICES);
     deprecate("dfs.federation.nameservice.id", DFSConfigKeys.DFS_NAMESERVICE_ID);
   }
+
+  public static void main(String[] args) {
+    init();
+    Configuration.dumpDeprecatedKeys();
+  }
 }

Modified: hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HftpFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HftpFileSystem.java?rev=1398581&r1=1398580&r2=1398581&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HftpFileSystem.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HftpFileSystem.java Tue Oct 16 00:02:55 2012
@@ -21,6 +21,7 @@ package org.apache.hadoop.hdfs;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InputStream;
+import java.net.ConnectException;
 import java.net.HttpURLConnection;
 import java.net.InetSocketAddress;
 import java.net.URI;
@@ -247,14 +248,16 @@ public class HftpFileSystem extends File
           Credentials c;
           try {
             c = DelegationTokenFetcher.getDTfromRemote(nnHttpUrl, renewer);
-          } catch (Exception e) {
-            LOG.info("Couldn't get a delegation token from " + nnHttpUrl + 
-            " using http.");
-            if(LOG.isDebugEnabled()) {
-              LOG.debug("error was ", e);
+          } catch (IOException e) {
+            if (e.getCause() instanceof ConnectException) {
+              LOG.warn("Couldn't connect to " + nnHttpUrl +
+                  ", assuming security is disabled");
+              return null;
+            }
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("Exception getting delegation token", e);
             }
-            //Maybe the server is in unsecure mode (that's bad but okay)
-            return null;
+            throw e;
           }
           for (Token<? extends TokenIdentifier> t : c.getAllTokens()) {
             if(LOG.isDebugEnabled()) {

Modified: hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HsftpFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HsftpFileSystem.java?rev=1398581&r1=1398580&r2=1398581&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HsftpFileSystem.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HsftpFileSystem.java Tue Oct 16 00:02:55 2012
@@ -23,7 +23,6 @@ import java.io.IOException;
 import java.net.HttpURLConnection;
 import java.net.InetSocketAddress;
 import java.net.URI;
-import java.net.URISyntaxException;
 import java.net.URL;
 import java.security.KeyStore;
 import java.security.cert.X509Certificate;

Modified: hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java?rev=1398581&r1=1398580&r2=1398581&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java Tue Oct 16 00:02:55 2012
@@ -57,6 +57,7 @@ import org.apache.hadoop.io.retry.Failov
 import org.apache.hadoop.io.retry.RetryPolicies;
 import org.apache.hadoop.io.retry.RetryPolicy;
 import org.apache.hadoop.io.retry.RetryProxy;
+import org.apache.hadoop.io.retry.RetryUtils;
 import org.apache.hadoop.ipc.ProtobufRpcEngine;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RemoteException;
@@ -68,7 +69,6 @@ import org.apache.hadoop.security.author
 import org.apache.hadoop.tools.GetUserMappingsProtocol;
 
 import com.google.common.base.Preconditions;
-import com.google.protobuf.ServiceException;
 
 /**
  * Create proxy objects to communicate with a remote NN. All remote access to an
@@ -243,106 +243,20 @@ public class NameNodeProxies {
     return new NamenodeProtocolTranslatorPB(proxy);
   }
   
-  /**
-   * Return the default retry policy used in RPC.
-   * 
-   * If dfs.client.retry.policy.enabled == false, use TRY_ONCE_THEN_FAIL.
-   * 
-   * Otherwise, first unwrap ServiceException if possible, and then 
-   * (1) use multipleLinearRandomRetry for
-   *     - SafeModeException, or
-   *     - IOException other than RemoteException, or
-   *     - ServiceException; and
-   * (2) use TRY_ONCE_THEN_FAIL for
-   *     - non-SafeMode RemoteException, or
-   *     - non-IOException.
-   *     
-   * Note that dfs.client.retry.max < 0 is not allowed.
-   */
-  public static RetryPolicy getDefaultRetryPolicy(Configuration conf) {
-    final RetryPolicy multipleLinearRandomRetry = getMultipleLinearRandomRetry(conf);
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("multipleLinearRandomRetry = " + multipleLinearRandomRetry);
-    }
-    if (multipleLinearRandomRetry == null) {
-      //no retry
-      return RetryPolicies.TRY_ONCE_THEN_FAIL;
-    } else {
-      return new RetryPolicy() {
-        @Override
-        public RetryAction shouldRetry(Exception e, int retries, int failovers,
-            boolean isMethodIdempotent) throws Exception {
-          if (e instanceof ServiceException) {
-            //unwrap ServiceException
-            final Throwable cause = e.getCause();
-            if (cause != null && cause instanceof Exception) {
-              e = (Exception)cause;
-            }
-          }
-
-          //see (1) and (2) in the javadoc of this method.
-          final RetryPolicy p;
-          if (e instanceof RemoteException) {
-            final RemoteException re = (RemoteException)e;
-            p = SafeModeException.class.getName().equals(re.getClassName())?
-                multipleLinearRandomRetry: RetryPolicies.TRY_ONCE_THEN_FAIL;
-          } else if (e instanceof IOException || e instanceof ServiceException) {
-            p = multipleLinearRandomRetry;
-          } else { //non-IOException
-            p = RetryPolicies.TRY_ONCE_THEN_FAIL;
-          }
-
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("RETRY " + retries + ") policy="
-                + p.getClass().getSimpleName() + ", exception=" + e);
-          }
-          LOG.info("RETRY " + retries + ") policy="
-              + p.getClass().getSimpleName() + ", exception=" + e);
-          return p.shouldRetry(e, retries, failovers, isMethodIdempotent);
-        }
-
-        @Override
-        public String toString() {
-          return "RetryPolicy[" + multipleLinearRandomRetry + ", "
-              + RetryPolicies.TRY_ONCE_THEN_FAIL.getClass().getSimpleName()
-              + "]";
-        }
-      };
-    }
-  }
-
-  /**
-   * Return the MultipleLinearRandomRetry policy specified in the conf,
-   * or null if the feature is disabled.
-   * If the policy is specified in the conf but the policy cannot be parsed,
-   * the default policy is returned.
-   * 
-   * Conf property: N pairs of sleep-time and number-of-retries
-   *   dfs.client.retry.policy = "s1,n1,s2,n2,..."
-   */
-  private static RetryPolicy getMultipleLinearRandomRetry(Configuration conf) {
-    final boolean enabled = conf.getBoolean(
-        DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_ENABLED_KEY,
-        DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_ENABLED_DEFAULT);
-    if (!enabled) {
-      return null;
-    }
-
-    final String policy = conf.get(
-        DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_SPEC_KEY,
-        DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_SPEC_DEFAULT);
-
-    final RetryPolicy r = RetryPolicies.MultipleLinearRandomRetry.parseCommaSeparatedString(policy);
-    return r != null? r: RetryPolicies.MultipleLinearRandomRetry.parseCommaSeparatedString(
-        DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_SPEC_DEFAULT);
-  }
-
   private static ClientProtocol createNNProxyWithClientProtocol(
       InetSocketAddress address, Configuration conf, UserGroupInformation ugi,
       boolean withRetries) throws IOException {
     RPC.setProtocolEngine(conf, ClientNamenodeProtocolPB.class, ProtobufRpcEngine.class);
 
-    final RetryPolicy defaultPolicy = getDefaultRetryPolicy(conf);
+    final RetryPolicy defaultPolicy = 
+        RetryUtils.getDefaultRetryPolicy(
+            conf, 
+            DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_ENABLED_KEY, 
+            DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_ENABLED_DEFAULT, 
+            DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_SPEC_KEY,
+            DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_SPEC_DEFAULT,
+            SafeModeException.class);
+    
     final long version = RPC.getProtocolVersion(ClientNamenodeProtocolPB.class);
     ClientNamenodeProtocolPB proxy = RPC.getProtocolProxy(
         ClientNamenodeProtocolPB.class, version, address, ugi, conf,

Modified: hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java?rev=1398581&r1=1398580&r2=1398581&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java Tue Oct 16 00:02:55 2012
@@ -23,7 +23,6 @@ import java.io.BufferedOutputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
-import java.io.InputStream;
 import java.io.OutputStream;
 import java.net.InetSocketAddress;
 import java.net.Socket;
@@ -35,7 +34,6 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
-import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferEncryptor;
 import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
 import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
 import org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver;
@@ -47,8 +45,6 @@ import org.apache.hadoop.hdfs.protocol.p
 import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
-import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
-import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.net.SocketInputWrapper;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.DataChecksum;

Modified: hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/SocketCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/SocketCache.java?rev=1398581&r1=1398580&r2=1398581&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/SocketCache.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/SocketCache.java Tue Oct 16 00:02:55 2012
@@ -26,51 +26,131 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map.Entry;
 
+import java.io.IOException;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.LinkedListMultimap;
 import org.apache.commons.logging.Log;
+import org.apache.hadoop.HadoopIllegalArgumentException;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
 import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.util.Daemon;
+import org.apache.hadoop.util.StringUtils;
 
 /**
- * A cache of sockets.
+ * A cache of input stream sockets to Data Node.
  */
 class SocketCache {
-  static final Log LOG = LogFactory.getLog(SocketCache.class);
+  private static final Log LOG = LogFactory.getLog(SocketCache.class);
 
-  private final LinkedListMultimap<SocketAddress, SocketAndStreams> multimap;
-  private final int capacity;
+  @InterfaceAudience.Private
+  static class SocketAndStreams implements Closeable {
+    public final Socket sock;
+    public final IOStreamPair ioStreams;
+    long createTime;
+    
+    public SocketAndStreams(Socket s, IOStreamPair ioStreams) {
+      this.sock = s;
+      this.ioStreams = ioStreams;
+      this.createTime = System.currentTimeMillis();
+    }
+    
+    @Override
+    public void close() {
+      if (ioStreams != null) { 
+        IOUtils.closeStream(ioStreams.in);
+        IOUtils.closeStream(ioStreams.out);
+      }
+      IOUtils.closeSocket(sock);
+    }
 
-  /**
-   * Create a SocketCache with the given capacity.
-   * @param capacity  Max cache size.
-   */
-  public SocketCache(int capacity) {
-    multimap = LinkedListMultimap.create();
-    this.capacity = capacity;
-    if (capacity <= 0) {
-      LOG.debug("SocketCache disabled in configuration.");
+    public long getCreateTime() {
+      return this.createTime;
     }
   }
 
+  private Daemon daemon;
+  /** A map for per user per datanode. */
+  private static LinkedListMultimap<SocketAddress, SocketAndStreams> multimap =
+    LinkedListMultimap.create();
+  private static int capacity;
+  private static long expiryPeriod;
+  private static SocketCache scInstance = new SocketCache();
+  private static boolean isInitedOnce = false;
+ 
+  public static synchronized SocketCache getInstance(int c, long e) {
+    // capacity is only initialized once
+    if (isInitedOnce == false) {
+      capacity = c;
+      expiryPeriod = e;
+
+      if (capacity == 0 ) {
+        LOG.info("SocketCache disabled.");
+      }
+      else if (expiryPeriod == 0) {
+        throw new IllegalStateException("Cannot initialize expiryPeriod to " +
+           expiryPeriod + "when cache is enabled.");
+      }
+      isInitedOnce = true;
+    } else { //already initialized once
+      if (capacity != c || expiryPeriod != e) {
+        LOG.info("capacity and expiry periods already set to " + capacity + 
+          " and " + expiryPeriod + " respectively. Cannot set it to " + c + 
+          " and " + e);
+      }
+    }
+
+    return scInstance;
+  }
+
+  private boolean isDaemonStarted() {
+    return (daemon == null)? false: true;
+  }
+
+  private synchronized void startExpiryDaemon() {
+    // start daemon only if not already started
+    if (isDaemonStarted() == true) {
+      return;
+    }
+    
+    daemon = new Daemon(new Runnable() {
+      @Override
+      public void run() {
+        try {
+          SocketCache.this.run();
+        } catch(InterruptedException e) {
+          //noop
+        } finally {
+          SocketCache.this.clear();
+        }
+      }
+
+      @Override
+      public String toString() {
+        return String.valueOf(SocketCache.this);
+      }
+    });
+    daemon.start();
+  }
+
   /**
    * Get a cached socket to the given address.
    * @param remote  Remote address the socket is connected to.
    * @return  A socket with unknown state, possibly closed underneath. Or null.
    */
   public synchronized SocketAndStreams get(SocketAddress remote) {
+
     if (capacity <= 0) { // disabled
       return null;
     }
-    
-    List<SocketAndStreams> socklist = multimap.get(remote);
-    if (socklist == null) {
+
+    List<SocketAndStreams> sockStreamList = multimap.get(remote);
+    if (sockStreamList == null) {
       return null;
     }
 
-    Iterator<SocketAndStreams> iter = socklist.iterator();
+    Iterator<SocketAndStreams> iter = sockStreamList.iterator();
     while (iter.hasNext()) {
       SocketAndStreams candidate = iter.next();
       iter.remove();
@@ -86,14 +166,16 @@ class SocketCache {
    * @param sock socket not used by anyone.
    */
   public synchronized void put(Socket sock, IOStreamPair ioStreams) {
+
+    Preconditions.checkNotNull(sock);
     SocketAndStreams s = new SocketAndStreams(sock, ioStreams);
     if (capacity <= 0) {
       // Cache disabled.
       s.close();
       return;
     }
-    
-    Preconditions.checkNotNull(sock);
+ 
+    startExpiryDaemon();
 
     SocketAddress remoteAddr = sock.getRemoteSocketAddress();
     if (remoteAddr == null) {
@@ -106,7 +188,7 @@ class SocketCache {
     if (capacity == multimap.size()) {
       evictOldest();
     }
-    multimap.put(remoteAddr, new SocketAndStreams(sock, ioStreams));
+    multimap.put(remoteAddr, s);
   }
 
   public synchronized int size() {
@@ -114,13 +196,34 @@ class SocketCache {
   }
 
   /**
+   * Evict and close sockets older than expiry period from the cache.
+   */
+  private synchronized void evictExpired(long expiryPeriod) {
+    while (multimap.size() != 0) {
+      Iterator<Entry<SocketAddress, SocketAndStreams>> iter =
+        multimap.entries().iterator();
+      Entry<SocketAddress, SocketAndStreams> entry = iter.next();
+      // if oldest socket expired, remove it
+      if (entry == null || 
+        System.currentTimeMillis() - entry.getValue().getCreateTime() < 
+        expiryPeriod) {
+        break;
+      }
+      iter.remove();
+      SocketAndStreams s = entry.getValue();
+      s.close();
+    }
+  }
+
+  /**
    * Evict the oldest entry in the cache.
    */
   private synchronized void evictOldest() {
     Iterator<Entry<SocketAddress, SocketAndStreams>> iter =
       multimap.entries().iterator();
     if (!iter.hasNext()) {
-      throw new IllegalStateException("Cannot evict from empty cache!");
+      throw new IllegalStateException("Cannot evict from empty cache! " +
+        "capacity: " + capacity);
     }
     Entry<SocketAddress, SocketAndStreams> entry = iter.next();
     iter.remove();
@@ -129,38 +232,31 @@ class SocketCache {
   }
 
   /**
-   * Empty the cache, and close all sockets.
+   * Periodically check in the cache and expire the entries
+   * older than expiryPeriod minutes
    */
-  public synchronized void clear() {
-    for (SocketAndStreams s : multimap.values()) {
-      s.close();
+  private void run() throws InterruptedException {
+    for(long lastExpiryTime = System.currentTimeMillis();
+        !Thread.interrupted();
+        Thread.sleep(expiryPeriod)) {
+      final long elapsed = System.currentTimeMillis() - lastExpiryTime;
+      if (elapsed >= expiryPeriod) {
+        evictExpired(expiryPeriod);
+        lastExpiryTime = System.currentTimeMillis();
+      }
     }
-    multimap.clear();
-  }
-
-  @Override
-  protected void finalize() {
     clear();
+    throw new InterruptedException("Daemon Interrupted");
   }
-  
-  @InterfaceAudience.Private
-  static class SocketAndStreams implements Closeable {
-    public final Socket sock;
-    public final IOStreamPair ioStreams;
-    
-    public SocketAndStreams(Socket s, IOStreamPair ioStreams) {
-      this.sock = s;
-      this.ioStreams = ioStreams;
-    }
-    
-    @Override
-    public void close() {
-      if (ioStreams != null) { 
-        IOUtils.closeStream(ioStreams.in);
-        IOUtils.closeStream(ioStreams.out);
-      }
-      IOUtils.closeSocket(sock);
+
+  /**
+   * Empty the cache, and close all sockets.
+   */
+  private synchronized void clear() {
+    for (SocketAndStreams sockAndStream : multimap.values()) {
+      sockAndStream.close();
     }
+    multimap.clear();
   }
 
 }

Modified: hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java?rev=1398581&r1=1398580&r2=1398581&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java Tue Oct 16 00:02:55 2012
@@ -667,6 +667,18 @@ public interface ClientProtocol {
    */
   public void saveNamespace() throws AccessControlException, IOException;
 
+  
+  /**
+   * Roll the edit log.
+   * Requires superuser privileges.
+   * 
+   * @throws AccessControlException if the superuser privilege is violated
+   * @throws IOException if log roll fails
+   * @return the txid of the new segment
+   */
+  @Idempotent
+  public long rollEdits() throws AccessControlException, IOException;
+
   /**
    * Enable/Disable restore failed storage.
    * <p>

Modified: hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java?rev=1398581&r1=1398580&r2=1398581&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java Tue Oct 16 00:02:55 2012
@@ -27,6 +27,7 @@ import org.apache.hadoop.net.NetworkTopo
 import org.apache.hadoop.net.Node;
 import org.apache.hadoop.net.NodeBase;
 import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Time;
 
 /** 
  * This class extends the primary identifier of a Datanode with ephemeral
@@ -321,7 +322,24 @@ public class DatanodeInfo extends Datano
     }
     return adminState;
   }
-
+ 
+  /**
+   * Check if the datanode is in stale state. Here if 
+   * the namenode has not received heartbeat msg from a 
+   * datanode for more than staleInterval (default value is
+   * {@link DFSConfigKeys#DFS_NAMENODE_STALE_DATANODE_INTERVAL_MILLI_DEFAULT}),
+   * the datanode will be treated as stale node.
+   * 
+   * @param staleInterval
+   *          the time interval for marking the node as stale. If the last
+   *          update time is beyond the given time interval, the node will be
+   *          marked as stale.
+   * @return true if the node is stale
+   */
+  public boolean isStale(long staleInterval) {
+    return (Time.now() - lastUpdate) >= staleInterval;
+  }
+  
   /**
    * Sets the admin state of this node.
    */

Modified: hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsProtoUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsProtoUtil.java?rev=1398581&r1=1398580&r2=1398581&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsProtoUtil.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsProtoUtil.java Tue Oct 16 00:02:55 2012
@@ -29,6 +29,7 @@ import org.apache.hadoop.hdfs.protocol.p
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.util.ExactSizeInputStream;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.util.DataChecksum;
 import org.apache.hadoop.security.token.Token;
 
 import com.google.common.collect.Lists;
@@ -155,6 +156,14 @@ public abstract class HdfsProtoUtil {
     return ret;
   }
 
+  public static DataChecksum.Type fromProto(HdfsProtos.ChecksumTypeProto type) {
+    return DataChecksum.Type.valueOf(type.name());
+  }
+
+  public static HdfsProtos.ChecksumTypeProto toProto(DataChecksum.Type type) {
+    return HdfsProtos.ChecksumTypeProto.valueOf(type.name());
+  }
+
   public static InputStream vintPrefixed(final InputStream input)
   throws IOException {
     final int firstByte = input.read();
@@ -167,4 +176,4 @@ public abstract class HdfsProtoUtil {
   
     return new ExactSizeInputStream(input, size);
   }
-}
\ No newline at end of file
+}

Modified: hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtoUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtoUtil.java?rev=1398581&r1=1398580&r2=1398581&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtoUtil.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtoUtil.java Tue Oct 16 00:02:55 2012
@@ -24,9 +24,9 @@ import org.apache.hadoop.hdfs.protocol.E
 import org.apache.hadoop.hdfs.protocol.HdfsProtoUtil;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BaseHeaderProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ChecksumProto;
-import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ChecksumProto.ChecksumType;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientOperationHeaderProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProto;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ChecksumTypeProto;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.DataChecksum;
@@ -52,7 +52,7 @@ public abstract class DataTransferProtoU
   }
 
   public static ChecksumProto toProto(DataChecksum checksum) {
-    ChecksumType type = ChecksumType.valueOf(checksum.getChecksumType().name());
+    ChecksumTypeProto type = ChecksumTypeProto.valueOf(checksum.getChecksumType().name());
     if (type == null) {
       throw new IllegalArgumentException(
           "Can't convert checksum to protobuf: " + checksum);

Modified: hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PipelineAck.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PipelineAck.java?rev=1398581&r1=1398580&r2=1398581&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PipelineAck.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PipelineAck.java Tue Oct 16 00:02:55 2012
@@ -30,6 +30,8 @@ import org.apache.hadoop.hdfs.protocol.p
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.PipelineAckProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
 
+import com.google.protobuf.TextFormat;
+
 /** Pipeline Acknowledgment **/
 @InterfaceAudience.Private
 @InterfaceStability.Evolving
@@ -120,6 +122,6 @@ public class PipelineAck {
   
   @Override //Object
   public String toString() {
-    return proto.toString();
+    return TextFormat.shortDebugString(proto);
   }
 }

Modified: hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java?rev=1398581&r1=1398580&r2=1398581&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java Tue Oct 16 00:02:55 2012
@@ -103,6 +103,8 @@ import org.apache.hadoop.hdfs.protocol.p
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ReportBadBlocksResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RestoreFailedStorageRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RestoreFailedStorageResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RollEditsRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RollEditsResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SaveNamespaceRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SaveNamespaceResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetBalancerBandwidthRequestProto;
@@ -537,6 +539,20 @@ public class ClientNamenodeProtocolServe
     }
 
   }
+  
+  @Override
+  public RollEditsResponseProto rollEdits(RpcController controller,
+      RollEditsRequestProto request) throws ServiceException {
+    try {
+      long txid = server.rollEdits();
+      return RollEditsResponseProto.newBuilder()
+          .setNewSegmentTxId(txid)
+          .build();
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+  }
+
 
   static final RefreshNodesResponseProto VOID_REFRESHNODES_RESPONSE = 
       RefreshNodesResponseProto.newBuilder().build();

Modified: hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java?rev=1398581&r1=1398580&r2=1398581&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java Tue Oct 16 00:02:55 2012
@@ -87,6 +87,8 @@ import org.apache.hadoop.hdfs.protocol.p
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RenewLeaseRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ReportBadBlocksRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RestoreFailedStorageRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RollEditsRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RollEditsResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SaveNamespaceRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetBalancerBandwidthRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetOwnerRequestProto;
@@ -525,6 +527,17 @@ public class ClientNamenodeProtocolTrans
       throw ProtobufHelper.getRemoteException(e);
     }
   }
+  
+  @Override
+  public long rollEdits() throws AccessControlException, IOException {
+    RollEditsRequestProto req = RollEditsRequestProto.getDefaultInstance();
+    try {
+      RollEditsResponseProto resp = rpcProxy.rollEdits(null, req);
+      return resp.getNewSegmentTxId();
+    } catch (ServiceException se) {
+      throw ProtobufHelper.getRemoteException(se);
+    }
+  }
 
   @Override
   public boolean restoreFailedStorage(String arg) 

Modified: hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java?rev=1398581&r1=1398580&r2=1398581&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java Tue Oct 16 00:02:55 2012
@@ -326,12 +326,15 @@ public class PBHelper {
   }
 
   public static RemoteEditLogProto convert(RemoteEditLog log) {
-    return RemoteEditLogProto.newBuilder().setEndTxId(log.getEndTxId())
-        .setStartTxId(log.getStartTxId()).build();
+    return RemoteEditLogProto.newBuilder()
+        .setStartTxId(log.getStartTxId())
+        .setEndTxId(log.getEndTxId())
+        .setIsInProgress(log.isInProgress()).build();
   }
 
   public static RemoteEditLog convert(RemoteEditLogProto l) {
-    return new RemoteEditLog(l.getStartTxId(), l.getEndTxId());
+    return new RemoteEditLog(l.getStartTxId(), l.getEndTxId(),
+        l.getIsInProgress());
   }
 
   public static RemoteEditLogManifestProto convert(

Modified: hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenIdentifier.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenIdentifier.java?rev=1398581&r1=1398580&r2=1398581&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenIdentifier.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenIdentifier.java Tue Oct 16 00:02:55 2012
@@ -118,7 +118,6 @@ public class BlockTokenIdentifier extend
     return a == null ? b == null : a.equals(b);
   }
 
-  /** {@inheritDoc} */
   @Override
   public boolean equals(Object obj) {
     if (obj == this) {
@@ -135,7 +134,6 @@ public class BlockTokenIdentifier extend
     return false;
   }
 
-  /** {@inheritDoc} */
   @Override
   public int hashCode() {
     return (int) expiryDate ^ keyId ^ (int) blockId ^ modes.hashCode()

Modified: hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockCollection.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockCollection.java?rev=1398581&r1=1398580&r2=1398581&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockCollection.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockCollection.java Tue Oct 16 00:02:55 2012
@@ -19,12 +19,14 @@ package org.apache.hadoop.hdfs.server.bl
 
 import java.io.IOException;
 
+import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.fs.ContentSummary;
 
 /** 
  * This interface is used by the block manager to expose a
  * few characteristics of a collection of Block/BlockUnderConstruction.
  */
+@InterfaceAudience.Private
 public interface BlockCollection {
   /**
    * Get the last block of the collection.
@@ -56,7 +58,7 @@ public interface BlockCollection {
    * Get block replication for the collection 
    * @return block replication value
    */
-  public short getReplication();
+  public short getBlockReplication();
 
   /**
    * Get the name of the collection.

Modified: hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java?rev=1398581&r1=1398580&r2=1398581&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java Tue Oct 16 00:02:55 2012
@@ -73,7 +73,7 @@ public class BlockInfo extends Block imp
    * @param from BlockInfo to copy from.
    */
   protected BlockInfo(BlockInfo from) {
-    this(from, from.bc.getReplication());
+    this(from, from.bc.getBlockReplication());
     this.bc = from.bc;
   }
 
@@ -335,7 +335,7 @@ public class BlockInfo extends Block imp
       BlockUCState s, DatanodeDescriptor[] targets) {
     if(isComplete()) {
       return new BlockInfoUnderConstruction(
-          this, getBlockCollection().getReplication(), s, targets);
+          this, getBlockCollection().getBlockReplication(), s, targets);
     }
     // the block is already under construction
     BlockInfoUnderConstruction ucBlock = (BlockInfoUnderConstruction)this;

Modified: hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java?rev=1398581&r1=1398580&r2=1398581&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java Tue Oct 16 00:02:55 2012
@@ -59,7 +59,6 @@ import static org.apache.hadoop.util.Exi
 
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
-import org.apache.hadoop.hdfs.server.common.Util;
 import org.apache.hadoop.hdfs.server.namenode.FSClusterStats;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.namenode.Namesystem;
@@ -364,11 +363,10 @@ public class BlockManager {
         replicationThread.join(3000);
       }
     } catch (InterruptedException ie) {
-    } finally {
-      if (pendingReplications != null) pendingReplications.stop();
-      blocksMap.close();
-      datanodeManager.close();
     }
+    datanodeManager.close();
+    pendingReplications.stop();
+    blocksMap.close();
   }
 
   /** @return the datanodeManager */
@@ -999,7 +997,7 @@ public class BlockManager {
 
     // Add this replica to corruptReplicas Map
     corruptReplicas.addToCorruptReplicasMap(b.corrupted, node, b.reason);
-    if (countNodes(b.stored).liveReplicas() >= bc.getReplication()) {
+    if (countNodes(b.stored).liveReplicas() >= bc.getBlockReplication()) {
       // the block is over-replicated so invalidate the replicas immediately
       invalidateBlock(b, node);
     } else if (namesystem.isPopulatingReplQueues()) {
@@ -1137,7 +1135,7 @@ public class BlockManager {
               continue;
             }
 
-            requiredReplication = bc.getReplication();
+            requiredReplication = bc.getBlockReplication();
 
             // get a source data-node
             containingNodes = new ArrayList<DatanodeDescriptor>();
@@ -1223,7 +1221,7 @@ public class BlockManager {
             neededReplications.decrementReplicationIndex(priority);
             continue;
           }
-          requiredReplication = bc.getReplication();
+          requiredReplication = bc.getBlockReplication();
 
           // do not schedule more if enough replicas is already pending
           NumberReplicas numReplicas = countNodes(block);
@@ -1316,8 +1314,9 @@ public class BlockManager {
       final HashMap<Node, Node> excludedNodes,
       final long blocksize) throws IOException {
     // choose targets for the new block to be allocated.
-    final DatanodeDescriptor targets[] = blockplacement.chooseTarget(
-        src, numOfReplicas, client, excludedNodes, blocksize);
+    final DatanodeDescriptor targets[] = blockplacement.chooseTarget(src,
+        numOfReplicas, client, new ArrayList<DatanodeDescriptor>(), false,
+        excludedNodes, blocksize);
     if (targets.length < minReplication) {
       throw new IOException("File " + src + " could only be replicated to "
           + targets.length + " nodes instead of minReplication (="
@@ -2090,7 +2089,7 @@ assert storedBlock.findDatanode(dn) < 0 
     }
 
     // handle underReplication/overReplication
-    short fileReplication = bc.getReplication();
+    short fileReplication = bc.getBlockReplication();
     if (!isNeededReplication(storedBlock, fileReplication, numCurrentReplica)) {
       neededReplications.remove(storedBlock, numCurrentReplica,
           num.decommissionedReplicas(), fileReplication);
@@ -2229,7 +2228,7 @@ assert storedBlock.findDatanode(dn) < 0 
       return MisReplicationResult.UNDER_CONSTRUCTION;
     }
     // calculate current replication
-    short expectedReplication = bc.getReplication();
+    short expectedReplication = bc.getBlockReplication();
     NumberReplicas num = countNodes(block);
     int numCurrentReplica = num.liveReplicas();
     // add to under-replicated queue if need to be
@@ -2700,7 +2699,7 @@ assert storedBlock.findDatanode(dn) < 0 
     while(it.hasNext()) {
       final Block block = it.next();
       BlockCollection bc = blocksMap.getBlockCollection(block);
-      short expectedReplication = bc.getReplication();
+      short expectedReplication = bc.getBlockReplication();
       NumberReplicas num = countNodes(block);
       int numCurrentReplica = num.liveReplicas();
       if (numCurrentReplica > expectedReplication) {
@@ -2846,7 +2845,7 @@ assert storedBlock.findDatanode(dn) < 0 
     if (bc == null) { // block does not belong to any file
       return 0;
     }
-    return bc.getReplication();
+    return bc.getBlockReplication();
   }