You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by sz...@apache.org on 2012/06/28 09:00:25 UTC

svn commit: r1354832 [3/5] - in /hadoop/common/branches/HDFS-3092/hadoop-hdfs-project: ./ hadoop-hdfs-httpfs/ hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/client/ hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/server/ hadoop-h...

Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java?rev=1354832&r1=1354831&r2=1354832&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java Thu Jun 28 06:59:38 2012
@@ -47,10 +47,12 @@ import org.apache.hadoop.hdfs.protocolPB
 import org.apache.hadoop.hdfs.protocolPB.RefreshUserMappingsProtocolClientSideTranslatorPB;
 import org.apache.hadoop.hdfs.protocolPB.RefreshUserMappingsProtocolPB;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.hdfs.server.namenode.SafeModeException;
 import org.apache.hadoop.hdfs.server.protocol.JournalProtocol;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.retry.DefaultFailoverProxyProvider;
 import org.apache.hadoop.io.retry.FailoverProxyProvider;
 import org.apache.hadoop.io.retry.RetryPolicies;
 import org.apache.hadoop.io.retry.RetryPolicy;
@@ -66,6 +68,7 @@ import org.apache.hadoop.security.author
 import org.apache.hadoop.tools.GetUserMappingsProtocol;
 
 import com.google.common.base.Preconditions;
+import com.google.protobuf.ServiceException;
 
 /**
  * Create proxy objects to communicate with a remote NN. All remote access to an
@@ -240,12 +243,106 @@ public class NameNodeProxies {
     return new NamenodeProtocolTranslatorPB(proxy);
   }
   
+  /**
+   * Return the default retry policy used in RPC.
+   * 
+   * If dfs.client.retry.policy.enabled == false, use TRY_ONCE_THEN_FAIL.
+   * 
+   * Otherwise, first unwrap ServiceException if possible, and then 
+   * (1) use multipleLinearRandomRetry for
+   *     - SafeModeException, or
+   *     - IOException other than RemoteException, or
+   *     - ServiceException; and
+   * (2) use TRY_ONCE_THEN_FAIL for
+   *     - non-SafeMode RemoteException, or
+   *     - non-IOException.
+   *     
+   * Note that dfs.client.retry.max < 0 is not allowed.
+   */
+  private static RetryPolicy getDefaultRpcRetryPolicy(Configuration conf) {
+    final RetryPolicy multipleLinearRandomRetry = getMultipleLinearRandomRetry(conf);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("multipleLinearRandomRetry = " + multipleLinearRandomRetry);
+    }
+    if (multipleLinearRandomRetry == null) {
+      //no retry
+      return RetryPolicies.TRY_ONCE_THEN_FAIL;
+    } else {
+      return new RetryPolicy() {
+        @Override
+        public RetryAction shouldRetry(Exception e, int retries, int failovers,
+            boolean isMethodIdempotent) throws Exception {
+          if (e instanceof ServiceException) {
+            //unwrap ServiceException
+            final Throwable cause = e.getCause();
+            if (cause != null && cause instanceof Exception) {
+              e = (Exception)cause;
+            }
+          }
+
+          //see (1) and (2) in the javadoc of this method.
+          final RetryPolicy p;
+          if (e instanceof RemoteException) {
+            final RemoteException re = (RemoteException)e;
+            p = SafeModeException.class.getName().equals(re.getClassName())?
+                multipleLinearRandomRetry: RetryPolicies.TRY_ONCE_THEN_FAIL;
+          } else if (e instanceof IOException || e instanceof ServiceException) {
+            p = multipleLinearRandomRetry;
+          } else { //non-IOException
+            p = RetryPolicies.TRY_ONCE_THEN_FAIL;
+          }
+
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("RETRY " + retries + ") policy="
+                + p.getClass().getSimpleName() + ", exception=" + e);
+          }
+          LOG.info("RETRY " + retries + ") policy="
+              + p.getClass().getSimpleName() + ", exception=" + e);
+          return p.shouldRetry(e, retries, failovers, isMethodIdempotent);
+        }
+      };
+    }
+  }
+
+  /**
+   * Return the MultipleLinearRandomRetry policy specified in the conf,
+   * or null if the feature is disabled.
+   * If the policy is specified in the conf but the policy cannot be parsed,
+   * the default policy is returned.
+   * 
+   * Conf property: N pairs of sleep-time and number-of-retries
+   *   dfs.client.retry.policy = "s1,n1,s2,n2,..."
+   */
+  private static RetryPolicy getMultipleLinearRandomRetry(Configuration conf) {
+    final boolean enabled = conf.getBoolean(
+        DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_ENABLED_KEY,
+        DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_ENABLED_DEFAULT);
+    if (!enabled) {
+      return null;
+    }
+
+    final String policy = conf.get(
+        DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_SPEC_KEY,
+        DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_SPEC_DEFAULT);
+
+    final RetryPolicy r = RetryPolicies.MultipleLinearRandomRetry.parseCommaSeparatedString(policy);
+    return r != null? r: RetryPolicies.MultipleLinearRandomRetry.parseCommaSeparatedString(
+        DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_SPEC_DEFAULT);
+  }
+
   private static ClientProtocol createNNProxyWithClientProtocol(
       InetSocketAddress address, Configuration conf, UserGroupInformation ugi,
       boolean withRetries) throws IOException {
-    ClientNamenodeProtocolPB proxy = (ClientNamenodeProtocolPB) NameNodeProxies
-        .createNameNodeProxy(address, conf, ugi, ClientNamenodeProtocolPB.class, 0);
+    RPC.setProtocolEngine(conf, ClientNamenodeProtocolPB.class, ProtobufRpcEngine.class);
+
+    final RetryPolicy defaultPolicy = getDefaultRpcRetryPolicy(conf);
+    final long version = RPC.getProtocolVersion(ClientNamenodeProtocolPB.class);
+    ClientNamenodeProtocolPB proxy = RPC.getProtocolProxy(
+        ClientNamenodeProtocolPB.class, version, address, ugi, conf,
+        NetUtils.getDefaultSocketFactory(conf), 0, defaultPolicy).getProxy();
+
     if (withRetries) { // create the proxy with retries
+
       RetryPolicy createPolicy = RetryPolicies
           .retryUpToMaximumCountWithFixedSleep(5,
               HdfsConstants.LEASE_SOFTLIMIT_PERIOD, TimeUnit.MILLISECONDS);
@@ -258,17 +355,21 @@ public class NameNodeProxies {
       Map<Class<? extends Exception>, RetryPolicy> exceptionToPolicyMap
                  = new HashMap<Class<? extends Exception>, RetryPolicy>();
       exceptionToPolicyMap.put(RemoteException.class, RetryPolicies
-          .retryByRemoteException(RetryPolicies.TRY_ONCE_THEN_FAIL,
+          .retryByRemoteException(defaultPolicy,
               remoteExceptionToPolicyMap));
       RetryPolicy methodPolicy = RetryPolicies.retryByException(
-          RetryPolicies.TRY_ONCE_THEN_FAIL, exceptionToPolicyMap);
+          defaultPolicy, exceptionToPolicyMap);
       Map<String, RetryPolicy> methodNameToPolicyMap 
                  = new HashMap<String, RetryPolicy>();
     
       methodNameToPolicyMap.put("create", methodPolicy);
     
-      proxy = (ClientNamenodeProtocolPB) RetryProxy
-          .create(ClientNamenodeProtocolPB.class, proxy, methodNameToPolicyMap);
+      proxy = (ClientNamenodeProtocolPB) RetryProxy.create(
+          ClientNamenodeProtocolPB.class,
+          new DefaultFailoverProxyProvider<ClientNamenodeProtocolPB>(
+              ClientNamenodeProtocolPB.class, proxy),
+          methodNameToPolicyMap,
+          defaultPolicy);
     }
     return new ClientNamenodeProtocolTranslatorPB(proxy);
   }

Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java?rev=1354832&r1=1354831&r2=1354832&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java Thu Jun 28 06:59:38 2012
@@ -199,10 +199,10 @@ public class DatanodeInfo extends Datano
     this.xceiverCount = xceiverCount; 
   }
 
-  /** rack name */
+  /** network location */
   public synchronized String getNetworkLocation() {return location;}
     
-  /** Sets the rack name */
+  /** Sets the network location */
   public synchronized void setNetworkLocation(String location) {
     this.location = NodeBase.normalize(location);
   }

Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java?rev=1354832&r1=1354831&r2=1354832&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java Thu Jun 28 06:59:38 2012
@@ -30,8 +30,9 @@ import org.apache.hadoop.hdfs.util.Light
  * the block are stored.
  */
 @InterfaceAudience.Private
-public class BlockInfo extends Block implements
-    LightWeightGSet.LinkedElement {
+public class BlockInfo extends Block implements LightWeightGSet.LinkedElement {
+  public static final BlockInfo[] EMPTY_ARRAY = {}; 
+
   private BlockCollection bc;
 
   /** For implementing {@link LightWeightGSet.LinkedElement} interface */

Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java?rev=1354832&r1=1354831&r2=1354832&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java Thu Jun 28 06:59:38 2012
@@ -2259,30 +2259,14 @@ assert storedBlock.findDatanode(dn) < 0 
     BlockCollection bc = getBlockCollection(b);
     final Map<String, List<DatanodeDescriptor>> rackMap
         = new HashMap<String, List<DatanodeDescriptor>>();
-    for(final Iterator<DatanodeDescriptor> iter = nonExcess.iterator();
-        iter.hasNext(); ) {
-      final DatanodeDescriptor node = iter.next();
-      final String rackName = node.getNetworkLocation();
-      List<DatanodeDescriptor> datanodeList = rackMap.get(rackName);
-      if (datanodeList == null) {
-        datanodeList = new ArrayList<DatanodeDescriptor>();
-        rackMap.put(rackName, datanodeList);
-      }
-      datanodeList.add(node);
-    }
+    final List<DatanodeDescriptor> moreThanOne = new ArrayList<DatanodeDescriptor>();
+    final List<DatanodeDescriptor> exactlyOne = new ArrayList<DatanodeDescriptor>();
     
     // split nodes into two sets
-    // priSet contains nodes on rack with more than one replica
-    // remains contains the remaining nodes
-    final List<DatanodeDescriptor> priSet = new ArrayList<DatanodeDescriptor>();
-    final List<DatanodeDescriptor> remains = new ArrayList<DatanodeDescriptor>();
-    for(List<DatanodeDescriptor> datanodeList : rackMap.values()) {
-      if (datanodeList.size() == 1 ) {
-        remains.add(datanodeList.get(0));
-      } else {
-        priSet.addAll(datanodeList);
-      }
-    }
+    // moreThanOne contains nodes on rack with more than one replica
+    // exactlyOne contains the remaining nodes
+    replicator.splitNodesWithRack(nonExcess, rackMap, moreThanOne,
+        exactlyOne);
     
     // pick one node to delete that favors the delete hint
     // otherwise pick one with least space from priSet if it is not empty
@@ -2292,30 +2276,18 @@ assert storedBlock.findDatanode(dn) < 0 
       // check if we can delete delNodeHint
       final DatanodeInfo cur;
       if (firstOne && delNodeHint !=null && nonExcess.contains(delNodeHint)
-          && (priSet.contains(delNodeHint)
-              || (addedNode != null && !priSet.contains(addedNode))) ) {
+          && (moreThanOne.contains(delNodeHint)
+              || (addedNode != null && !moreThanOne.contains(addedNode))) ) {
         cur = delNodeHint;
       } else { // regular excessive replica removal
         cur = replicator.chooseReplicaToDelete(bc, b, replication,
-            priSet, remains);
+        		moreThanOne, exactlyOne);
       }
       firstOne = false;
 
-      // adjust rackmap, priSet, and remains
-      String rack = cur.getNetworkLocation();
-      final List<DatanodeDescriptor> datanodes = rackMap.get(rack);
-      datanodes.remove(cur);
-      if (datanodes.isEmpty()) {
-        rackMap.remove(rack);
-      }
-      if (priSet.remove(cur)) {
-        if (datanodes.size() == 1) {
-          priSet.remove(datanodes.get(0));
-          remains.add(datanodes.get(0));
-        }
-      } else {
-        remains.remove(cur);
-      }
+      // adjust rackmap, moreThanOne, and exactlyOne
+      replicator.adjustSetsWithChosenReplica(rackMap, moreThanOne,
+          exactlyOne, cur);
 
       nonExcess.remove(cur);
       addToExcessReplicate(cur, b);

Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java?rev=1354832&r1=1354831&r2=1354832&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java Thu Jun 28 06:59:38 2012
@@ -21,12 +21,14 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.server.namenode.FSClusterStats;
 import org.apache.hadoop.net.NetworkTopology;
@@ -241,5 +243,80 @@ public abstract class BlockPlacementPoli
                         excludedNodes,
                         blocksize);
   }
+  
+  /**
+   * Adjust rackmap, moreThanOne, and exactlyOne after removing replica on cur.
+   *
+   * @param rackMap a map from rack to replica
+   * @param moreThanOne The List of replica nodes on rack which has more than 
+   *        one replica
+   * @param exactlyOne The List of replica nodes on rack with only one replica
+   * @param cur current replica to remove
+   */
+  public void adjustSetsWithChosenReplica(final Map<String, 
+      List<DatanodeDescriptor>> rackMap,
+      final List<DatanodeDescriptor> moreThanOne,
+      final List<DatanodeDescriptor> exactlyOne, final DatanodeInfo cur) {
+    
+    String rack = getRack(cur);
+    final List<DatanodeDescriptor> datanodes = rackMap.get(rack);
+    datanodes.remove(cur);
+    if (datanodes.isEmpty()) {
+      rackMap.remove(rack);
+    }
+    if (moreThanOne.remove(cur)) {
+      if (datanodes.size() == 1) {
+        moreThanOne.remove(datanodes.get(0));
+        exactlyOne.add(datanodes.get(0));
+      }
+    } else {
+      exactlyOne.remove(cur);
+    }
+  }
+
+  /**
+   * Get rack string from a data node
+   * @param datanode
+   * @return rack of data node
+   */
+  protected String getRack(final DatanodeInfo datanode) {
+    return datanode.getNetworkLocation();
+  }
+  
+  /**
+   * Split data nodes into two sets, one set includes nodes on rack with
+   * more than one  replica, the other set contains the remaining nodes.
+   * 
+   * @param dataNodes
+   * @param rackMap a map from rack to datanodes
+   * @param moreThanOne contains nodes on rack with more than one replica
+   * @param exactlyOne remains contains the remaining nodes
+   */
+  public void splitNodesWithRack(
+      Collection<DatanodeDescriptor> dataNodes,
+      final Map<String, List<DatanodeDescriptor>> rackMap,
+      final List<DatanodeDescriptor> moreThanOne,
+      final List<DatanodeDescriptor> exactlyOne) {
+    for(DatanodeDescriptor node : dataNodes) {
+      final String rackName = getRack(node);
+      List<DatanodeDescriptor> datanodeList = rackMap.get(rackName);
+      if (datanodeList == null) {
+        datanodeList = new ArrayList<DatanodeDescriptor>();
+        rackMap.put(rackName, datanodeList);
+      }
+      datanodeList.add(node);
+    }
+    
+    // split nodes into two sets
+    for(List<DatanodeDescriptor> datanodeList : rackMap.values()) {
+      if (datanodeList.size() == 1) {
+        // exactlyOne contains nodes on rack with only one replica
+        exactlyOne.add(datanodeList.get(0));
+      } else {
+        // moreThanOne contains nodes on rack with more than one replica
+        moreThanOne.addAll(datanodeList);
+      }
+    }
+  }
 
 }

Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java?rev=1354832&r1=1354831&r2=1354832&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java Thu Jun 28 06:59:38 2012
@@ -56,15 +56,15 @@ public class BlockPlacementPolicyDefault
     "For more information, please enable DEBUG log level on "
     + ((Log4JLogger)LOG).getLogger().getName();
 
-  private boolean considerLoad; 
+  protected boolean considerLoad; 
   private boolean preferLocalNode = true;
-  private NetworkTopology clusterMap;
+  protected NetworkTopology clusterMap;
   private FSClusterStats stats;
-  private long heartbeatInterval;   // interval for DataNode heartbeats
+  protected long heartbeatInterval;   // interval for DataNode heartbeats
   /**
    * A miss of that many heartbeats is tolerated for replica deletion policy.
    */
-  private int tolerateHeartbeatMultiplier;
+  protected int tolerateHeartbeatMultiplier;
 
   BlockPlacementPolicyDefault(Configuration conf,  FSClusterStats stats,
                            NetworkTopology clusterMap) {
@@ -88,7 +88,7 @@ public class BlockPlacementPolicyDefault
         DFSConfigKeys.DFS_NAMENODE_TOLERATE_HEARTBEAT_MULTIPLIER_DEFAULT);
   }
 
-  private ThreadLocal<StringBuilder> threadLocalBuilder =
+  protected ThreadLocal<StringBuilder> threadLocalBuilder =
     new ThreadLocal<StringBuilder>() {
     @Override
     protected StringBuilder initialValue() {
@@ -229,7 +229,7 @@ public class BlockPlacementPolicyDefault
    * choose a node on the same rack
    * @return the chosen node
    */
-  private DatanodeDescriptor chooseLocalNode(
+  protected DatanodeDescriptor chooseLocalNode(
                                              DatanodeDescriptor localMachine,
                                              HashMap<Node, Node> excludedNodes,
                                              long blocksize,
@@ -263,7 +263,7 @@ public class BlockPlacementPolicyDefault
    * in the cluster.
    * @return the chosen node
    */
-  private DatanodeDescriptor chooseLocalRack(
+  protected DatanodeDescriptor chooseLocalRack(
                                              DatanodeDescriptor localMachine,
                                              HashMap<Node, Node> excludedNodes,
                                              long blocksize,
@@ -316,7 +316,7 @@ public class BlockPlacementPolicyDefault
    * from the local rack
    */
     
-  private void chooseRemoteRack(int numOfReplicas,
+  protected void chooseRemoteRack(int numOfReplicas,
                                 DatanodeDescriptor localMachine,
                                 HashMap<Node, Node> excludedNodes,
                                 long blocksize,
@@ -338,7 +338,7 @@ public class BlockPlacementPolicyDefault
   /* Randomly choose one target from <i>nodes</i>.
    * @return the chosen node
    */
-  private DatanodeDescriptor chooseRandom(
+  protected DatanodeDescriptor chooseRandom(
                                           String nodes,
                                           HashMap<Node, Node> excludedNodes,
                                           long blocksize,
@@ -382,7 +382,7 @@ public class BlockPlacementPolicyDefault
     
   /* Randomly choose <i>numOfReplicas</i> targets from <i>nodes</i>.
    */
-  private void chooseRandom(int numOfReplicas,
+  protected void chooseRandom(int numOfReplicas,
                             String nodes,
                             HashMap<Node, Node> excludedNodes,
                             long blocksize,
@@ -438,7 +438,7 @@ public class BlockPlacementPolicyDefault
                         this.considerLoad, results);
   }
     
-  private boolean isGoodTarget(DatanodeDescriptor node,
+  protected boolean isGoodTarget(DatanodeDescriptor node,
                                long blockSize, int maxTargetPerLoc,
                                boolean considerLoad,
                                List<DatanodeDescriptor> results) {
@@ -574,8 +574,7 @@ public class BlockPlacementPolicyDefault
 
     // pick replica from the first Set. If first is empty, then pick replicas
     // from second set.
-    Iterator<DatanodeDescriptor> iter =
-          first.isEmpty() ? second.iterator() : first.iterator();
+    Iterator<DatanodeDescriptor> iter = pickupReplicaSet(first, second);
 
     // Pick the node with the oldest heartbeat or with the least free space,
     // if all hearbeats are within the tolerable heartbeat interval
@@ -594,6 +593,20 @@ public class BlockPlacementPolicyDefault
     }
     return oldestHeartbeatNode != null ? oldestHeartbeatNode : minSpaceNode;
   }
+
+  /**
+   * Pick up replica node set for deleting replica as over-replicated. 
+   * First set contains replica nodes on rack with more than one
+   * replica while second set contains remaining replica nodes.
+   * So pick up first set if not empty. If first is empty, then pick second.
+   */
+  protected Iterator<DatanodeDescriptor> pickupReplicaSet(
+      Collection<DatanodeDescriptor> first,
+      Collection<DatanodeDescriptor> second) {
+    Iterator<DatanodeDescriptor> iter =
+        first.isEmpty() ? second.iterator() : first.iterator();
+    return iter;
+  }
   
   @VisibleForTesting
   void setPreferLocalNode(boolean prefer) {

Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java?rev=1354832&r1=1354831&r2=1354832&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java Thu Jun 28 06:59:38 2012
@@ -38,6 +38,7 @@ import org.apache.hadoop.HadoopIllegalAr
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
@@ -66,6 +67,8 @@ import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.net.CachedDNSToSwitchMapping;
 import org.apache.hadoop.net.DNSToSwitchMapping;
 import org.apache.hadoop.net.NetworkTopology;
+import org.apache.hadoop.net.Node;
+import org.apache.hadoop.net.NodeBase;
 import org.apache.hadoop.net.ScriptBasedMapping;
 import org.apache.hadoop.util.Daemon;
 import org.apache.hadoop.util.HostsFileReader;
@@ -108,7 +111,7 @@ public class DatanodeManager {
       = new TreeMap<String, DatanodeDescriptor>();
 
   /** Cluster network topology */
-  private final NetworkTopology networktopology = new NetworkTopology();
+  private final NetworkTopology networktopology;
 
   /** Host names to datanode descriptors mapping. */
   private final Host2NodesMap host2DatanodeMap = new Host2NodesMap();
@@ -134,6 +137,12 @@ public class DatanodeManager {
       ) throws IOException {
     this.namesystem = namesystem;
     this.blockManager = blockManager;
+    
+    Class<? extends NetworkTopology> networkTopologyClass =
+        conf.getClass(CommonConfigurationKeysPublic.NET_TOPOLOGY_IMPL_KEY,
+            NetworkTopology.class, NetworkTopology.class);
+    networktopology = (NetworkTopology) ReflectionUtils.newInstance(
+        networkTopologyClass, conf);
 
     this.heartbeatManager = new HeartbeatManager(namesystem, blockManager, conf);
 
@@ -206,13 +215,22 @@ public class DatanodeManager {
   public void sortLocatedBlocks(final String targethost,
       final List<LocatedBlock> locatedblocks) {
     //sort the blocks
-    final DatanodeDescriptor client = getDatanodeByHost(targethost);
+    // As it is possible for the separation of node manager and datanode, 
+    // here we should get node but not datanode only .
+    Node client = getDatanodeByHost(targethost);
+    if (client == null) {
+      List<String> hosts = new ArrayList<String> (1);
+      hosts.add(targethost);
+      String rName = dnsToSwitchMapping.resolve(hosts).get(0);
+      if (rName != null)
+        client = new NodeBase(rName + NodeBase.PATH_SEPARATOR_STR + targethost);
+    }
     for (LocatedBlock b : locatedblocks) {
       networktopology.pseudoSortByDistance(client, b.getLocations());
       
       // Move decommissioned datanodes to the bottom
       Arrays.sort(b.getLocations(), DFSUtil.DECOM_COMPARATOR);
-    }    
+    }
   }
 
   CyclicIteration<String, DatanodeDescriptor> getDatanodeCyclicIteration(
@@ -1035,4 +1053,8 @@ public class DatanodeManager {
     }
   }
 
+  @Override
+  public String toString() {
+    return getClass().getSimpleName() + ": " + host2DatanodeMap;
+  }
 }

Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/Host2NodesMap.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/Host2NodesMap.java?rev=1354832&r1=1354831&r2=1354832&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/Host2NodesMap.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/Host2NodesMap.java Thu Jun 28 06:59:38 2012
@@ -17,7 +17,9 @@
  */
 package org.apache.hadoop.hdfs.server.blockmanagement;
 
+import java.util.Arrays;
 import java.util.HashMap;
+import java.util.Map;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
@@ -156,4 +158,14 @@ class Host2NodesMap {
       hostmapLock.readLock().unlock();
     }
   }
+  
+  @Override
+  public String toString() {
+    final StringBuilder b = new StringBuilder(getClass().getSimpleName())
+        .append("[");
+    for(Map.Entry<String, DatanodeDescriptor[]> e : map.entrySet()) {
+      b.append("\n  " + e.getKey() + " => " + Arrays.asList(e.getValue()));
+    }
+    return b.append("\n]").toString();
+  }
 }

Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Util.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Util.java?rev=1354832&r1=1354831&r2=1354832&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Util.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Util.java Thu Jun 28 06:59:38 2012
@@ -34,7 +34,9 @@ public final class Util {
   private final static Log LOG = LogFactory.getLog(Util.class.getName());
 
   /**
-   * Current system time.
+   * Current system time.  Do not use this to calculate a duration or interval
+   * to sleep, because it will be broken by settimeofday.  Instead, use
+   * monotonicNow.
    * @return current time in msec.
    */
   public static long now() {
@@ -42,6 +44,19 @@ public final class Util {
   }
   
   /**
+   * Current time from some arbitrary time base in the past, counting in
+   * milliseconds, and not affected by settimeofday or similar system clock
+   * changes.  This is appropriate to use when computing how much longer to
+   * wait for an interval to expire.
+   * @return a monotonic clock that counts in milliseconds.
+   */
+  public static long monotonicNow() {
+    final long NANOSECONDS_PER_MILLISECOND = 1000000;
+
+    return System.nanoTime() / NANOSECONDS_PER_MILLISECOND;
+  }
+
+  /**
    * Interprets the passed string as a URI. In case of error it 
    * assumes the specified string is a file.
    *

Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java?rev=1354832&r1=1354831&r2=1354832&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java Thu Jun 28 06:59:38 2012
@@ -302,6 +302,22 @@ public class DirectoryScanner implements
     shouldRun = false;
     if (masterThread != null) masterThread.shutdown();
     if (reportCompileThreadPool != null) reportCompileThreadPool.shutdown();
+    if (masterThread != null) {
+      try {
+        masterThread.awaitTermination(1, TimeUnit.MINUTES);
+      } catch (InterruptedException e) {
+        LOG.error("interrupted while waiting for masterThread to " +
+          "terminate", e);
+      }
+    }
+    if (reportCompileThreadPool != null) {
+      try {
+        reportCompileThreadPool.awaitTermination(1, TimeUnit.MINUTES);
+      } catch (InterruptedException e) {
+        LOG.error("interrupted while waiting for reportCompileThreadPool to " +
+          "terminate", e);
+      }
+    }
     if (!retainDiffs) clear();
   }
 

Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java?rev=1354832&r1=1354831&r2=1354832&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java Thu Jun 28 06:59:38 2012
@@ -98,6 +98,10 @@ public class DatanodeWebHdfsMethods {
       LOG.trace("HTTP " + op.getValue().getType() + ": " + op + ", " + path
           + ", ugi=" + ugi + Param.toSortedString(", ", parameters));
     }
+    if (nnRpcAddr == null) {
+      throw new IllegalArgumentException(NamenodeRpcAddressParam.NAME
+          + " is not specified.");
+    }
 
     //clear content type
     response.setContentType(null);

Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java?rev=1354832&r1=1354831&r2=1354832&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java Thu Jun 28 06:59:38 2012
@@ -165,7 +165,8 @@ public class EditLogFileInputStream exte
             LOG.warn("skipping " + skipAmt + " bytes at the end " +
               "of edit log  '" + getName() + "': reached txid " + txId +
               " out of " + lastTxId);
-            tracker.skip(skipAmt);
+            tracker.clearLimit();
+            IOUtils.skipFully(tracker, skipAmt);
           }
         }
       }

Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java?rev=1354832&r1=1354831&r2=1354832&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java Thu Jun 28 06:59:38 2012
@@ -206,10 +206,10 @@ public class EditLogFileOutputStream ext
             + fc.size());
       }
       fill.position(0);
-      int written = fc.write(fill, position);
+      IOUtils.writeFully(fc, fill, position);
       if(FSNamesystem.LOG.isDebugEnabled()) {
         FSNamesystem.LOG.debug("Edit log size is now " + fc.size() +
-            " written " + written + " bytes " + " at offset " + position);
+            " written " + fill.capacity() + " bytes " + " at offset " + position);
       }
     }
   }

Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java?rev=1354832&r1=1354831&r2=1354832&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java Thu Jun 28 06:59:38 2012
@@ -277,7 +277,7 @@ public class FSDirectory implements Clos
           preferredBlockSize, modificationTime, clientName, 
           clientMachine, null);
     } else {
-      newNode = new INodeFile(permissions, 0, replication,
+      newNode = new INodeFile(permissions, BlockInfo.EMPTY_ARRAY, replication,
                               modificationTime, atime, preferredBlockSize);
     }
 

Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java?rev=1354832&r1=1354831&r2=1354832&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java Thu Jun 28 06:59:38 2012
@@ -1236,18 +1236,6 @@ public class FSEditLog  {
         throw e;
       }
     }
-    // This code will go away as soon as RedundantEditLogInputStream is
-    // introduced. (HDFS-3049)
-    try {
-      if (!streams.isEmpty()) {
-        streams.get(0).skipUntil(fromTxId);
-      }
-    } catch (IOException e) {
-      // We don't want to throw an exception from here, because that would make
-      // recovery impossible even if the user requested it.  An exception will
-      // be thrown later, when we don't read the starting txid we expect.
-      LOG.error("error skipping until transaction " + fromTxId, e);
-    }
     return streams;
   }
   

Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java?rev=1354832&r1=1354831&r2=1354832&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java Thu Jun 28 06:59:38 2012
@@ -668,7 +668,9 @@ public class FSEditLogLoader {
         FSImage.LOG.warn("Caught exception after reading " + numValid +
             " ops from " + in + " while determining its valid length." +
             "Position was " + lastPos, t);
-        break;
+        in.resync();
+        FSImage.LOG.warn("After resync, position is " + in.getPosition());
+        continue;
       }
       if (lastTxId == HdfsConstants.INVALID_TXID
           || op.getTransactionId() > lastTxId) {
@@ -752,6 +754,11 @@ public class FSEditLogLoader {
     }
 
     @Override
+    public void clearLimit() {
+      limitPos = Long.MAX_VALUE;
+    }
+
+    @Override
     public void mark(int limit) {
       super.mark(limit);
       markPos = curPos;

Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java?rev=1354832&r1=1354831&r2=1354832&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java Thu Jun 28 06:59:38 2012
@@ -44,6 +44,7 @@ import org.apache.hadoop.security.token.
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableFactories;
@@ -2289,9 +2290,11 @@ public abstract class FSEditLogOp {
           // 0xff, we want to skip over that region, because there's nothing
           // interesting there.
           long numSkip = e.getNumAfterTerminator();
-          if (in.skip(numSkip) < numSkip) {
+          try {
+            IOUtils.skipFully(in,  numSkip);
+          } catch (Throwable t) {
             FSImage.LOG.error("Failed to skip " + numSkip + " bytes of " +
-              "garbage after an OP_INVALID.  Unexpected early EOF.");
+              "garbage after an OP_INVALID.", t);
             return null;
           }
         } catch (IOException e) {

Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1354832&r1=1354831&r2=1354832&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Thu Jun 28 06:59:38 2012
@@ -240,8 +240,15 @@ public class FSNamesystem implements Nam
   private static final void logAuditEvent(UserGroupInformation ugi,
       InetAddress addr, String cmd, String src, String dst,
       HdfsFileStatus stat) {
+    logAuditEvent(true, ugi, addr, cmd, src, dst, stat);
+  }
+
+  private static final void logAuditEvent(boolean succeeded,
+      UserGroupInformation ugi, InetAddress addr, String cmd, String src,
+      String dst, HdfsFileStatus stat) {
     final StringBuilder sb = auditBuffer.get();
     sb.setLength(0);
+    sb.append("allowed=").append(succeeded).append("\t");
     sb.append("ugi=").append(ugi).append("\t");
     sb.append("ip=").append(addr).append("\t");
     sb.append("cmd=").append(cmd).append("\t");
@@ -572,8 +579,6 @@ public class FSNamesystem implements Nam
         !safeMode.isPopulatingReplQueues();
       setBlockTotal();
       blockManager.activate(conf);
-      this.nnrmthread = new Daemon(new NameNodeResourceMonitor());
-      nnrmthread.start();
     } finally {
       writeUnlock();
     }
@@ -590,7 +595,6 @@ public class FSNamesystem implements Nam
     writeLock();
     try {
       if (blockManager != null) blockManager.close();
-      if (nnrmthread != null) nnrmthread.interrupt();
     } finally {
       writeUnlock();
     }
@@ -644,6 +648,10 @@ public class FSNamesystem implements Nam
       }
       leaseManager.startMonitor();
       startSecretManagerIfNecessary();
+
+      //ResourceMonitor required only at ActiveNN. See HDFS-2914
+      this.nnrmthread = new Daemon(new NameNodeResourceMonitor());
+      nnrmthread.start();
     } finally {
       writeUnlock();
     }
@@ -666,6 +674,10 @@ public class FSNamesystem implements Nam
       if (leaseManager != null) {
         leaseManager.stopMonitor();
       }
+      if (nnrmthread != null) {
+        ((NameNodeResourceMonitor) nnrmthread.getRunnable()).stopMonitor();
+        nnrmthread.interrupt();
+      }
       if (dir != null && dir.fsImage != null) {
         if (dir.fsImage.editLog != null) {
           dir.fsImage.editLog.close();
@@ -1013,6 +1025,21 @@ public class FSNamesystem implements Nam
   void setPermission(String src, FsPermission permission)
       throws AccessControlException, FileNotFoundException, SafeModeException,
       UnresolvedLinkException, IOException {
+    try {
+      setPermissionInt(src, permission);
+    } catch (AccessControlException e) {
+      if (auditLog.isInfoEnabled() && isExternalInvocation()) {
+        logAuditEvent(false, UserGroupInformation.getCurrentUser(),
+                      Server.getRemoteIp(),
+                      "setPermission", src, null, null);
+      }
+      throw e;
+    }
+  }
+
+  private void setPermissionInt(String src, FsPermission permission)
+      throws AccessControlException, FileNotFoundException, SafeModeException,
+      UnresolvedLinkException, IOException {
     HdfsFileStatus resultingStat = null;
     writeLock();
     try {
@@ -1044,6 +1071,21 @@ public class FSNamesystem implements Nam
   void setOwner(String src, String username, String group)
       throws AccessControlException, FileNotFoundException, SafeModeException,
       UnresolvedLinkException, IOException {
+    try {
+      setOwnerInt(src, username, group);
+    } catch (AccessControlException e) {
+      if (auditLog.isInfoEnabled() && isExternalInvocation()) {
+        logAuditEvent(false, UserGroupInformation.getCurrentUser(),
+                      Server.getRemoteIp(),
+                      "setOwner", src, null, null);
+      }
+      throw e;
+    } 
+  }
+
+  private void setOwnerInt(String src, String username, String group)
+      throws AccessControlException, FileNotFoundException, SafeModeException,
+      UnresolvedLinkException, IOException {
     HdfsFileStatus resultingStat = null;
     writeLock();
     try {
@@ -1084,7 +1126,8 @@ public class FSNamesystem implements Nam
   LocatedBlocks getBlockLocations(String clientMachine, String src,
       long offset, long length) throws AccessControlException,
       FileNotFoundException, UnresolvedLinkException, IOException {
-    LocatedBlocks blocks = getBlockLocations(src, offset, length, true, true);
+    LocatedBlocks blocks = getBlockLocations(src, offset, length, true, true,
+        true);
     if (blocks != null) {
       blockManager.getDatanodeManager().sortLocatedBlocks(
           clientMachine, blocks.getLocatedBlocks());
@@ -1098,8 +1141,24 @@ public class FSNamesystem implements Nam
    * @throws FileNotFoundException, UnresolvedLinkException, IOException
    */
   LocatedBlocks getBlockLocations(String src, long offset, long length,
-      boolean doAccessTime, boolean needBlockToken) throws FileNotFoundException,
-      UnresolvedLinkException, IOException {
+      boolean doAccessTime, boolean needBlockToken, boolean checkSafeMode)
+      throws FileNotFoundException, UnresolvedLinkException, IOException {
+    try {
+      return getBlockLocationsInt(src, offset, length, doAccessTime,
+                                  needBlockToken, checkSafeMode);
+    } catch (AccessControlException e) {
+      if (auditLog.isInfoEnabled() && isExternalInvocation()) {
+        logAuditEvent(false, UserGroupInformation.getCurrentUser(),
+                      Server.getRemoteIp(),
+                      "open", src, null, null);
+      }
+      throw e;
+    }
+  }
+
+  private LocatedBlocks getBlockLocationsInt(String src, long offset, long length,
+      boolean doAccessTime, boolean needBlockToken, boolean checkSafeMode)
+      throws FileNotFoundException, UnresolvedLinkException, IOException {
     if (isPermissionEnabled) {
       checkPathAccess(src, FsAction.READ);
     }
@@ -1119,6 +1178,15 @@ public class FSNamesystem implements Nam
                     Server.getRemoteIp(),
                     "open", src, null, null);
     }
+    if (checkSafeMode && isInSafeMode()) {
+      for (LocatedBlock b : ret.getLocatedBlocks()) {
+        // if safemode & no block locations yet then throw safemodeException
+        if ((b.getLocations() == null) || (b.getLocations().length == 0)) {
+          throw new SafeModeException("Zero blocklocations for " + src,
+              safeMode);
+        }
+      }
+    }
     return ret;
   }
 
@@ -1187,6 +1255,20 @@ public class FSNamesystem implements Nam
    */
   void concat(String target, String [] srcs) 
       throws IOException, UnresolvedLinkException {
+    try {
+      concatInt(target, srcs);
+    } catch (AccessControlException e) {
+      if (auditLog.isInfoEnabled() && isExternalInvocation()) {
+        logAuditEvent(false, UserGroupInformation.getLoginUser(),
+                      Server.getRemoteIp(),
+                      "concat", Arrays.toString(srcs), target, null);
+      }
+      throw e;
+    }
+  }
+
+  private void concatInt(String target, String [] srcs) 
+      throws IOException, UnresolvedLinkException {
     if(FSNamesystem.LOG.isDebugEnabled()) {
       FSNamesystem.LOG.debug("concat " + Arrays.toString(srcs) +
           " to " + target);
@@ -1339,6 +1421,20 @@ public class FSNamesystem implements Nam
    * written to the edits log but is not flushed.
    */
   void setTimes(String src, long mtime, long atime) 
+      throws IOException, UnresolvedLinkException {
+    try {
+      setTimesInt(src, mtime, atime);
+    } catch (AccessControlException e) {
+      if (auditLog.isInfoEnabled() && isExternalInvocation()) {
+        logAuditEvent(false, UserGroupInformation.getCurrentUser(),
+                      Server.getRemoteIp(),
+                      "setTimes", src, null, null);
+      }
+      throw e;
+    }
+  }
+
+  private void setTimesInt(String src, long mtime, long atime) 
     throws IOException, UnresolvedLinkException {
     if (!isAccessTimeSupported() && atime != -1) {
       throw new IOException("Access time for hdfs is not configured. " +
@@ -1375,6 +1471,21 @@ public class FSNamesystem implements Nam
   void createSymlink(String target, String link,
       PermissionStatus dirPerms, boolean createParent) 
       throws IOException, UnresolvedLinkException {
+    try {
+      createSymlinkInt(target, link, dirPerms, createParent);
+    } catch (AccessControlException e) {
+      if (auditLog.isInfoEnabled() && isExternalInvocation()) {
+        logAuditEvent(false, UserGroupInformation.getCurrentUser(),
+                      Server.getRemoteIp(),
+                      "createSymlink", link, target, null);
+      }
+      throw e;
+    }
+  }
+
+  private void createSymlinkInt(String target, String link,
+      PermissionStatus dirPerms, boolean createParent) 
+      throws IOException, UnresolvedLinkException {
     HdfsFileStatus resultingStat = null;
     writeLock();
     try {
@@ -1442,8 +1553,22 @@ public class FSNamesystem implements Nam
    * @return true if successful; 
    *         false if file does not exist or is a directory
    */
-  boolean setReplication(final String src, final short replication
-      ) throws IOException {
+  boolean setReplication(final String src, final short replication)
+      throws IOException {
+    try {
+      return setReplicationInt(src, replication);
+    } catch (AccessControlException e) {
+      if (auditLog.isInfoEnabled() && isExternalInvocation()) {
+        logAuditEvent(false, UserGroupInformation.getCurrentUser(),
+                      Server.getRemoteIp(),
+                      "setReplication", src, null, null);
+      }
+      throw e;
+    }
+  }
+
+  private boolean setReplicationInt(final String src, final short replication)
+      throws IOException {
     blockManager.verifyReplication(src, replication, null);
 
     final boolean isFile;
@@ -1476,7 +1601,7 @@ public class FSNamesystem implements Nam
     }
     return isFile;
   }
-    
+
   long getPreferredBlockSize(String filename) 
       throws IOException, UnresolvedLinkException {
     readLock();
@@ -1522,6 +1647,24 @@ public class FSNamesystem implements Nam
       short replication, long blockSize) throws AccessControlException,
       SafeModeException, FileAlreadyExistsException, UnresolvedLinkException,
       FileNotFoundException, ParentNotDirectoryException, IOException {
+    try {
+      startFileInt(src, permissions, holder, clientMachine, flag, createParent,
+                   replication, blockSize);
+    } catch (AccessControlException e) {
+      if (auditLog.isInfoEnabled() && isExternalInvocation()) {
+        logAuditEvent(false, UserGroupInformation.getCurrentUser(),
+                      Server.getRemoteIp(),
+                      "create", src, null, null);
+      }
+      throw e;
+    }
+  }
+
+  private void startFileInt(String src, PermissionStatus permissions, String holder,
+      String clientMachine, EnumSet<CreateFlag> flag, boolean createParent,
+      short replication, long blockSize) throws AccessControlException,
+      SafeModeException, FileAlreadyExistsException, UnresolvedLinkException,
+      FileNotFoundException, ParentNotDirectoryException, IOException {
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
@@ -1600,7 +1743,7 @@ public class FSNamesystem implements Nam
     }
 
     try {
-      INode myFile = dir.getFileINode(src);
+      INodeFile myFile = dir.getFileINode(src);
       recoverLeaseInternal(myFile, src, holder, clientMachine, false);
 
       try {
@@ -1676,22 +1819,20 @@ public class FSNamesystem implements Nam
    * @throws UnresolvedLinkException
    * @throws IOException
    */
-  public LocatedBlock prepareFileForWrite(String src, INode file,
+  LocatedBlock prepareFileForWrite(String src, INodeFile file,
       String leaseHolder, String clientMachine, DatanodeDescriptor clientNode,
-      boolean writeToEditLog)
-      throws UnresolvedLinkException, IOException {
-    INodeFile node = (INodeFile) file;
+      boolean writeToEditLog) throws IOException {
     INodeFileUnderConstruction cons = new INodeFileUnderConstruction(
-                                    node.getLocalNameBytes(),
-                                    node.getReplication(),
-                                    node.getModificationTime(),
-                                    node.getPreferredBlockSize(),
-                                    node.getBlocks(),
-                                    node.getPermissionStatus(),
+                                    file.getLocalNameBytes(),
+                                    file.getReplication(),
+                                    file.getModificationTime(),
+                                    file.getPreferredBlockSize(),
+                                    file.getBlocks(),
+                                    file.getPermissionStatus(),
                                     leaseHolder,
                                     clientMachine,
                                     clientNode);
-    dir.replaceNode(src, node, cons);
+    dir.replaceNode(src, file, cons);
     leaseManager.addLease(cons.getClientName(), src);
     
     LocatedBlock ret = blockManager.convertLastBlockToUnderConstruction(cons);
@@ -1827,6 +1968,22 @@ public class FSNamesystem implements Nam
       throws AccessControlException, SafeModeException,
       FileAlreadyExistsException, FileNotFoundException,
       ParentNotDirectoryException, IOException {
+    try {
+      return appendFileInt(src, holder, clientMachine);
+    } catch (AccessControlException e) {
+      if (auditLog.isInfoEnabled() && isExternalInvocation()) {
+        logAuditEvent(false, UserGroupInformation.getCurrentUser(),
+                      Server.getRemoteIp(),
+                      "append", src, null, null);
+      }
+      throw e;
+    }
+  }
+
+  private LocatedBlock appendFileInt(String src, String holder, String clientMachine)
+      throws AccessControlException, SafeModeException,
+      FileAlreadyExistsException, FileNotFoundException,
+      ParentNotDirectoryException, IOException {
     if (!supportAppends) {
       throw new UnsupportedOperationException(
           "Append is not enabled on this NameNode. Use the " +
@@ -2313,6 +2470,20 @@ public class FSNamesystem implements Nam
    */
   @Deprecated
   boolean renameTo(String src, String dst) 
+      throws IOException, UnresolvedLinkException {
+    try {
+      return renameToInt(src, dst);
+    } catch (AccessControlException e) {
+      if (auditLog.isInfoEnabled() && isExternalInvocation()) {
+        logAuditEvent(false, UserGroupInformation.getCurrentUser(),
+                      Server.getRemoteIp(),
+                      "rename", src, dst, null);
+      }
+      throw e;
+    }
+  }
+
+  private boolean renameToInt(String src, String dst) 
     throws IOException, UnresolvedLinkException {
     boolean status = false;
     HdfsFileStatus resultingStat = null;
@@ -2424,20 +2595,35 @@ public class FSNamesystem implements Nam
    * @see ClientProtocol#delete(String, boolean) for detailed descriptoin and 
    * description of exceptions
    */
-    boolean delete(String src, boolean recursive)
-        throws AccessControlException, SafeModeException,
-               UnresolvedLinkException, IOException {
-      if (NameNode.stateChangeLog.isDebugEnabled()) {
-        NameNode.stateChangeLog.debug("DIR* NameSystem.delete: " + src);
-      }
-      boolean status = deleteInternal(src, recursive, true);
-      if (status && auditLog.isInfoEnabled() && isExternalInvocation()) {
-        logAuditEvent(UserGroupInformation.getCurrentUser(),
+  boolean delete(String src, boolean recursive)
+      throws AccessControlException, SafeModeException,
+      UnresolvedLinkException, IOException {
+    try {
+      return deleteInt(src, recursive);
+    } catch (AccessControlException e) {
+      if (auditLog.isInfoEnabled() && isExternalInvocation()) {
+        logAuditEvent(false, UserGroupInformation.getCurrentUser(),
                       Server.getRemoteIp(),
                       "delete", src, null, null);
       }
-      return status;
+      throw e;
+    }
+  }
+      
+  private boolean deleteInt(String src, boolean recursive)
+      throws AccessControlException, SafeModeException,
+      UnresolvedLinkException, IOException {
+    if (NameNode.stateChangeLog.isDebugEnabled()) {
+      NameNode.stateChangeLog.debug("DIR* NameSystem.delete: " + src);
+    }
+    boolean status = deleteInternal(src, recursive, true);
+    if (status && auditLog.isInfoEnabled() && isExternalInvocation()) {
+      logAuditEvent(UserGroupInformation.getCurrentUser(),
+                    Server.getRemoteIp(),
+                    "delete", src, null, null);
     }
+    return status;
+  }
     
   /**
    * Remove a file/directory from the namespace.
@@ -2593,6 +2779,20 @@ public class FSNamesystem implements Nam
    */
   boolean mkdirs(String src, PermissionStatus permissions,
       boolean createParent) throws IOException, UnresolvedLinkException {
+    try {
+      return mkdirsInt(src, permissions, createParent);
+    } catch (AccessControlException e) {
+      if (auditLog.isInfoEnabled() && isExternalInvocation()) {
+        logAuditEvent(false, UserGroupInformation.getCurrentUser(),
+                      Server.getRemoteIp(),
+                      "mkdirs", src, null, null);
+      }
+      throw e;
+    }
+  }
+
+  private boolean mkdirsInt(String src, PermissionStatus permissions,
+      boolean createParent) throws IOException, UnresolvedLinkException {
     boolean status = false;
     if(NameNode.stateChangeLog.isDebugEnabled()) {
       NameNode.stateChangeLog.debug("DIR* NameSystem.mkdirs: " + src);
@@ -3044,6 +3244,21 @@ public class FSNamesystem implements Nam
    */
   DirectoryListing getListing(String src, byte[] startAfter,
       boolean needLocation) 
+      throws AccessControlException, UnresolvedLinkException, IOException {
+    try {
+      return getListingInt(src, startAfter, needLocation);
+    } catch (AccessControlException e) {
+      if (auditLog.isInfoEnabled() && isExternalInvocation()) {
+        logAuditEvent(false, UserGroupInformation.getCurrentUser(),
+                      Server.getRemoteIp(),
+                      "listStatus", src, null, null);
+      }
+      throw e;
+    }
+  }
+
+  private DirectoryListing getListingInt(String src, byte[] startAfter,
+      boolean needLocation) 
     throws AccessControlException, UnresolvedLinkException, IOException {
     DirectoryListing dl;
     readLock();
@@ -3193,10 +3408,11 @@ public class FSNamesystem implements Nam
    * acceptable levels, this daemon will cause the NN to exit safe mode.
    */
   class NameNodeResourceMonitor implements Runnable  {
+    boolean shouldNNRmRun = true;
     @Override
     public void run () {
       try {
-        while (fsRunning) {
+        while (fsRunning && shouldNNRmRun) {
           checkAvailableResources();
           if(!nameNodeHasResourcesAvailable()) {
             String lowResourcesMsg = "NameNode low on available disk space. ";
@@ -3217,7 +3433,11 @@ public class FSNamesystem implements Nam
         FSNamesystem.LOG.error("Exception in NameNodeResourceMonitor: ", e);
       }
     }
-  }
+
+    public void stopMonitor() {
+      shouldNNRmRun = false;
+    }
+ }
   
   public FSImage getFSImage() {
     return dir.fsImage;

Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java?rev=1354832&r1=1354831&r2=1354832&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java Thu Jun 28 06:59:38 2012
@@ -25,9 +25,9 @@ import org.apache.hadoop.fs.permission.F
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.permission.PermissionStatus;
 import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockCollection;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction;
-import org.apache.hadoop.hdfs.server.blockmanagement.BlockCollection;
 
 /** I-node for closed file. */
 @InterfaceAudience.Private
@@ -45,13 +45,6 @@ public class INodeFile extends INode imp
 
   BlockInfo blocks[] = null;
 
-  INodeFile(PermissionStatus permissions,
-            int nrBlocks, short replication, long modificationTime,
-            long atime, long preferredBlockSize) {
-    this(permissions, new BlockInfo[nrBlocks], replication,
-        modificationTime, atime, preferredBlockSize);
-  }
-
   INodeFile(PermissionStatus permissions, BlockInfo[] blklist,
                       short replication, long modificationTime,
                       long atime, long preferredBlockSize) {

Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFileUnderConstruction.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFileUnderConstruction.java?rev=1354832&r1=1354831&r2=1354832&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFileUnderConstruction.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFileUnderConstruction.java Thu Jun 28 06:59:38 2012
@@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.server.na
 
 import java.io.IOException;
 
+import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.fs.permission.PermissionStatus;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
@@ -32,8 +33,8 @@ import com.google.common.base.Joiner;
 /**
  * I-node for file being written.
  */
-public class INodeFileUnderConstruction extends INodeFile 
-                                        implements MutableBlockCollection {
+@InterfaceAudience.Private
+class INodeFileUnderConstruction extends INodeFile implements MutableBlockCollection {
   private  String clientName;         // lease holder
   private final String clientMachine;
   private final DatanodeDescriptor clientNode; // if client is a cluster node too.
@@ -45,7 +46,7 @@ public class INodeFileUnderConstruction 
                              String clientName,
                              String clientMachine,
                              DatanodeDescriptor clientNode) {
-    super(permissions.applyUMask(UMASK), 0, replication,
+    super(permissions.applyUMask(UMASK), BlockInfo.EMPTY_ARRAY, replication,
         modTime, modTime, preferredBlockSize);
     this.clientName = clientName;
     this.clientMachine = clientMachine;

Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalSet.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalSet.java?rev=1354832&r1=1354831&r2=1354832&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalSet.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalSet.java Thu Jun 28 06:59:38 2012
@@ -25,7 +25,9 @@ import java.util.Collections;
 import java.util.Comparator;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.PriorityQueue;
 import java.util.SortedSet;
+import java.util.TreeSet;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -41,7 +43,6 @@ import com.google.common.collect.Immutab
 import com.google.common.collect.Lists;
 import com.google.common.collect.Multimaps;
 import com.google.common.collect.Sets;
-import com.google.common.collect.TreeMultiset;
 
 /**
  * Manages a collection of Journals. None of the methods are synchronized, it is
@@ -223,8 +224,9 @@ public class JournalSet implements Journ
   @Override
   public void selectInputStreams(Collection<EditLogInputStream> streams,
       long fromTxId, boolean inProgressOk) {
-    final TreeMultiset<EditLogInputStream> allStreams =
-        TreeMultiset.create(EDIT_LOG_INPUT_STREAM_COMPARATOR);
+    final PriorityQueue<EditLogInputStream> allStreams = 
+        new PriorityQueue<EditLogInputStream>(64,
+            EDIT_LOG_INPUT_STREAM_COMPARATOR);
     for (JournalAndStream jas : journals) {
       if (jas.isDisabled()) {
         LOG.info("Skipping jas " + jas + " since it's disabled");
@@ -240,7 +242,8 @@ public class JournalSet implements Journ
     // transaction ID.
     LinkedList<EditLogInputStream> acc =
         new LinkedList<EditLogInputStream>();
-    for (EditLogInputStream elis : allStreams) {
+    EditLogInputStream elis;
+    while ((elis = allStreams.poll()) != null) {
       if (acc.isEmpty()) {
         acc.add(elis);
       } else {
@@ -248,7 +251,7 @@ public class JournalSet implements Journ
         if (accFirstTxId == elis.getFirstTxId()) {
           acc.add(elis);
         } else if (accFirstTxId < elis.getFirstTxId()) {
-          streams.add(acc.get(0));
+          streams.add(new RedundantEditLogInputStream(acc, fromTxId));
           acc.clear();
           acc.add(elis);
         } else if (accFirstTxId > elis.getFirstTxId()) {
@@ -259,7 +262,7 @@ public class JournalSet implements Journ
       }
     }
     if (!acc.isEmpty()) {
-      streams.add(acc.get(0));
+      streams.add(new RedundantEditLogInputStream(acc, fromTxId));
       acc.clear();
     }
   }

Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeHttpServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeHttpServer.java?rev=1354832&r1=1354831&r2=1354832&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeHttpServer.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeHttpServer.java Thu Jun 28 06:59:38 2012
@@ -44,7 +44,6 @@ import org.apache.hadoop.http.HttpServer
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.authentication.server.AuthenticationFilter;
 import org.apache.hadoop.security.authorize.AccessControlList;
 
 /**
@@ -91,22 +90,9 @@ public class NameNodeHttpServer {
       {
         // Add SPNEGO support to NameNode
         if (UserGroupInformation.isSecurityEnabled()) {
-          Map<String, String> params = new HashMap<String, String>();
-          String principalInConf = conf.get(
-            DFSConfigKeys.DFS_NAMENODE_INTERNAL_SPENGO_USER_NAME_KEY);
-          if (principalInConf != null && !principalInConf.isEmpty()) {
-            params.put("kerberos.principal",
-                       SecurityUtil.getServerPrincipal(principalInConf, infoHost));
-            String httpKeytab = conf.get(DFSConfigKeys.DFS_NAMENODE_KEYTAB_FILE_KEY);
-            if (httpKeytab != null && !httpKeytab.isEmpty()) {
-              params.put("kerberos.keytab", httpKeytab);
-            }
-
-            params.put(AuthenticationFilter.AUTH_TYPE, "kerberos");
-
-            defineFilter(webAppContext, SPNEGO_FILTER,
-                         AuthenticationFilter.class.getName(), params, null);
-          }
+          initSpnego(conf,
+              DFSConfigKeys.DFS_NAMENODE_INTERNAL_SPNEGO_USER_NAME_KEY,
+              DFSConfigKeys.DFS_NAMENODE_KEYTAB_FILE_KEY);
         }
         if (WebHdfsFileSystem.isEnabled(conf, LOG)) {
           //add SPNEGO authentication filter for webhdfs

Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java?rev=1354832&r1=1354831&r2=1354832&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java Thu Jun 28 06:59:38 2012
@@ -277,7 +277,7 @@ public class NamenodeFsck {
     // Get block locations without updating the file access time 
     // and without block access tokens
     LocatedBlocks blocks = namenode.getNamesystem().getBlockLocations(path, 0,
-        fileLen, false, false);
+        fileLen, false, false, false);
     if (blocks == null) { // the file is deleted
       return;
     }

Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java?rev=1354832&r1=1354831&r2=1354832&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java Thu Jun 28 06:59:38 2012
@@ -25,10 +25,8 @@ import java.security.PrivilegedAction;
 import java.security.PrivilegedExceptionAction;
 import java.util.Collection;
 import java.util.Date;
-import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
-import java.util.Map;
 
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.CommandLineParser;
@@ -68,7 +66,6 @@ import org.apache.hadoop.metrics2.source
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.authentication.server.AuthenticationFilter;
 import org.apache.hadoop.security.authorize.AccessControlList;
 
 import org.apache.hadoop.util.Daemon;
@@ -239,20 +236,8 @@ public class SecondaryNameNode implement
                                 new AccessControlList(conf.get(DFS_ADMIN, " "))) {
       {
         if (UserGroupInformation.isSecurityEnabled()) {
-          Map<String, String> params = new HashMap<String, String>();
-          String principalInConf = conf.get(DFSConfigKeys.DFS_SECONDARY_NAMENODE_INTERNAL_SPENGO_USER_NAME_KEY);
-          if (principalInConf != null && !principalInConf.isEmpty()) {
-            params.put("kerberos.principal",
-                       SecurityUtil.getServerPrincipal(principalInConf, infoSocAddr.getHostName()));
-          }
-          String httpKeytab = conf.get(DFSConfigKeys.DFS_SECONDARY_NAMENODE_KEYTAB_FILE_KEY);
-          if (httpKeytab != null && !httpKeytab.isEmpty()) {
-            params.put("kerberos.keytab", httpKeytab);
-          }
-          params.put(AuthenticationFilter.AUTH_TYPE, "kerberos");
-
-          defineFilter(webAppContext, SPNEGO_FILTER, AuthenticationFilter.class.getName(),
-                       params, null);
+          initSpnego(conf, DFSConfigKeys.DFS_SECONDARY_NAMENODE_INTERNAL_SPNEGO_USER_NAME_KEY,
+              DFSConfigKeys.DFS_SECONDARY_NAMENODE_KEYTAB_FILE_KEY);
         }
       }
     };

Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StreamLimiter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StreamLimiter.java?rev=1354832&r1=1354831&r2=1354832&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StreamLimiter.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StreamLimiter.java Thu Jun 28 06:59:38 2012
@@ -27,4 +27,9 @@ interface StreamLimiter {
    * Set a limit.  Calling this function clears any existing limit.
    */
   public void setLimit(long limit);
+  
+  /**
+   * Disable limit.
+   */
+  public void clearLimit();
 }
\ No newline at end of file

Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/TransferFsImage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/TransferFsImage.java?rev=1354832&r1=1354831&r2=1354832&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/TransferFsImage.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/TransferFsImage.java Thu Jun 28 06:59:38 2012
@@ -32,11 +32,11 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.server.common.Util;
 import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
 import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
 import org.apache.hadoop.hdfs.util.DataTransferThrottler;
 import org.apache.hadoop.io.MD5Hash;
-import org.apache.hadoop.security.UserGroupInformation;
 
 import com.google.common.collect.Lists;
 
@@ -207,6 +207,7 @@ public class TransferFsImage {
     //
     // open connection to remote server
     //
+    long startTime = Util.monotonicNow();
     URL url = new URL(str);
 
     HttpURLConnection connection = (HttpURLConnection)
@@ -312,6 +313,11 @@ public class TransferFsImage {
                               advertisedSize);
       }
     }
+    double xferSec = Math.max(
+        ((float)(Util.monotonicNow() - startTime)) / 1000.0, 0.001);
+    long xferKb = received / 1024;
+    LOG.info(String.format("Transfer took %.2fs at %.2f KB/s",
+        xferSec, xferKb / xferSec));
 
     if (digester != null) {
       MD5Hash computedDigest = new MD5Hash(digester.digest());

Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java?rev=1354832&r1=1354831&r2=1354832&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java Thu Jun 28 06:59:38 2012
@@ -56,6 +56,7 @@ import org.apache.hadoop.hdfs.protocol.H
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSecretManager;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
 import org.apache.hadoop.hdfs.server.common.JspHelper;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
@@ -115,6 +116,11 @@ public class NamenodeWebHdfsMethods {
     return REMOTE_ADDRESS.get();
   }
 
+  /** Set the remote client address. */
+  static void setRemoteAddress(String remoteAddress) {
+    REMOTE_ADDRESS.set(remoteAddress);
+  }
+
   private @Context ServletContext context;
   private @Context HttpServletRequest request;
   private @Context HttpServletResponse response;
@@ -123,7 +129,7 @@ public class NamenodeWebHdfsMethods {
       final DelegationParam delegation,
       final UserParam username, final DoAsParam doAsUser,
       final UriFsPathParam path, final HttpOpParam<?> op,
-      final Param<?, ?>... parameters) throws IOException {
+      final Param<?, ?>... parameters) {
     if (LOG.isTraceEnabled()) {
       LOG.trace("HTTP " + op.getValue().getType() + ": " + op + ", " + path
           + ", ugi=" + ugi + ", " + username + ", " + doAsUser
@@ -134,12 +140,26 @@ public class NamenodeWebHdfsMethods {
     response.setContentType(null);
   }
 
-  private static DatanodeInfo chooseDatanode(final NameNode namenode,
+  static DatanodeInfo chooseDatanode(final NameNode namenode,
       final String path, final HttpOpParam.Op op, final long openOffset,
-      Configuration conf) throws IOException {
-    if (op == GetOpParam.Op.OPEN
+      final long blocksize, Configuration conf) throws IOException {
+    final BlockManager bm = namenode.getNamesystem().getBlockManager();
+
+    if (op == PutOpParam.Op.CREATE) {
+      //choose a datanode near to client 
+      final DatanodeDescriptor clientNode = bm.getDatanodeManager(
+          ).getDatanodeByHost(getRemoteAddress());
+      if (clientNode != null) {
+        final DatanodeDescriptor[] datanodes = bm.getBlockPlacementPolicy(
+            ).chooseTarget(path, 1, clientNode, null, blocksize);
+        if (datanodes.length > 0) {
+          return datanodes[0];
+        }
+      }
+    } else if (op == GetOpParam.Op.OPEN
         || op == GetOpParam.Op.GETFILECHECKSUM
         || op == PostOpParam.Op.APPEND) {
+      //choose a datanode containing a replica 
       final NamenodeProtocols np = namenode.getRpcServer();
       final HdfsFileStatus status = np.getFileInfo(path);
       if (status == null) {
@@ -158,14 +178,13 @@ public class NamenodeWebHdfsMethods {
         final LocatedBlocks locations = np.getBlockLocations(path, offset, 1);
         final int count = locations.locatedBlockCount();
         if (count > 0) {
-          return JspHelper.bestNode(locations.get(0), conf);
+          return JspHelper.bestNode(locations.get(0).getLocations(), false, conf);
         }
       }
     } 
 
-    return (DatanodeDescriptor)namenode.getNamesystem().getBlockManager(
-        ).getDatanodeManager().getNetworkTopology().chooseRandom(
-        NodeBase.ROOT);
+    return (DatanodeDescriptor)bm.getDatanodeManager().getNetworkTopology(
+        ).chooseRandom(NodeBase.ROOT);
   }
 
   private Token<? extends TokenIdentifier> generateDelegationToken(
@@ -183,9 +202,11 @@ public class NamenodeWebHdfsMethods {
       final UserGroupInformation ugi, final DelegationParam delegation,
       final UserParam username, final DoAsParam doAsUser,
       final String path, final HttpOpParam.Op op, final long openOffset,
+      final long blocksize,
       final Param<?, ?>... parameters) throws URISyntaxException, IOException {
     final Configuration conf = (Configuration)context.getAttribute(JspHelper.CURRENT_CONF);
-    final DatanodeInfo dn = chooseDatanode(namenode, path, op, openOffset, conf);
+    final DatanodeInfo dn = chooseDatanode(namenode, path, op, openOffset,
+        blocksize, conf);
 
     final String delegationQuery;
     if (!UserGroupInformation.isSecurityEnabled()) {
@@ -356,7 +377,7 @@ public class NamenodeWebHdfsMethods {
     case CREATE:
     {
       final URI uri = redirectURI(namenode, ugi, delegation, username, doAsUser,
-          fullpath, op.getValue(), -1L,
+          fullpath, op.getValue(), -1L, blockSize.getValue(conf),
           permission, overwrite, bufferSize, replication, blockSize);
       return Response.temporaryRedirect(uri).type(MediaType.APPLICATION_OCTET_STREAM).build();
     } 
@@ -502,7 +523,7 @@ public class NamenodeWebHdfsMethods {
     case APPEND:
     {
       final URI uri = redirectURI(namenode, ugi, delegation, username, doAsUser,
-          fullpath, op.getValue(), -1L, bufferSize);
+          fullpath, op.getValue(), -1L, -1L, bufferSize);
       return Response.temporaryRedirect(uri).type(MediaType.APPLICATION_OCTET_STREAM).build();
     }
     default:
@@ -532,7 +553,7 @@ public class NamenodeWebHdfsMethods {
           final RenewerParam renewer,
       @QueryParam(BufferSizeParam.NAME) @DefaultValue(BufferSizeParam.DEFAULT)
           final BufferSizeParam bufferSize
-      ) throws IOException, URISyntaxException, InterruptedException {
+      ) throws IOException, InterruptedException {
     return get(ugi, delegation, username, doAsUser, ROOT, op,
         offset, length, renewer, bufferSize);
   }
@@ -598,7 +619,7 @@ public class NamenodeWebHdfsMethods {
     case OPEN:
     {
       final URI uri = redirectURI(namenode, ugi, delegation, username, doAsUser,
-          fullpath, op.getValue(), offset.getValue(), offset, length, bufferSize);
+          fullpath, op.getValue(), offset.getValue(), -1L, offset, length, bufferSize);
       return Response.temporaryRedirect(uri).type(MediaType.APPLICATION_OCTET_STREAM).build();
     }
     case GET_BLOCK_LOCATIONS:
@@ -634,7 +655,7 @@ public class NamenodeWebHdfsMethods {
     case GETFILECHECKSUM:
     {
       final URI uri = redirectURI(namenode, ugi, delegation, username, doAsUser,
-          fullpath, op.getValue(), -1L);
+          fullpath, op.getValue(), -1L, -1L);
       return Response.temporaryRedirect(uri).type(MediaType.APPLICATION_OCTET_STREAM).build();
     }
     case GETDELEGATIONTOKEN:

Modified: hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java?rev=1354832&r1=1354831&r2=1354832&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java Thu Jun 28 06:59:38 2012
@@ -239,7 +239,12 @@ public class DFSAdmin extends FsShell {
       CommandFormat c = new CommandFormat(2, Integer.MAX_VALUE);
       List<String> parameters = c.parse(args, pos);
       String str = parameters.remove(0).trim();
-      quota = StringUtils.TraditionalBinaryPrefix.string2long(str);
+      try {
+        quota = StringUtils.TraditionalBinaryPrefix.string2long(str);
+      } catch (NumberFormatException nfe) {
+        throw new IllegalArgumentException("\"" + str + "\" is not a valid value for a quota.");
+      }
+      
       this.args = parameters.toArray(new String[parameters.size()]);
     }