You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by at...@apache.org on 2011/12/08 03:57:51 UTC

svn commit: r1211749 [3/3] - in /hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs: ./ src/main/java/ src/main/java/org/apache/hadoop/hdfs/ src/main/java/org/apache/hadoop/hdfs/protocol/ src/main/java/org/apache/hadoop/hdfs/protocol/prot...

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/JournalProtocolServerSideTranslatorPB.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/JournalProtocolServerSideTranslatorPB.java?rev=1211749&r1=1211748&r2=1211749&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/JournalProtocolServerSideTranslatorPB.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/JournalProtocolServerSideTranslatorPB.java Thu Dec  8 02:57:47 2011
@@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.protocolP
 
 import java.io.IOException;
 
+import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.JournalRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.JournalResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.StartLogSegmentRequestProto;
@@ -37,6 +38,7 @@ import com.google.protobuf.ServiceExcept
  * received on {@link JournalProtocolPB} to the 
  * {@link JournalProtocol} server implementation.
  */
+@InterfaceAudience.Private
 public class JournalProtocolServerSideTranslatorPB implements JournalProtocolPB {
   /** Server side implementation to delegate the requests to */
   private final JournalProtocol impl;
@@ -118,4 +120,4 @@ public class JournalProtocolServerSideTr
     return ProtocolSignatureWritable.convert(
         this.getProtocolSignature(protocol, clientVersion, clientMethodsHash));
   }
-}
\ No newline at end of file
+}

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/JournalProtocolTranslatorPB.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/JournalProtocolTranslatorPB.java?rev=1211749&r1=1211748&r2=1211749&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/JournalProtocolTranslatorPB.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/JournalProtocolTranslatorPB.java Thu Dec  8 02:57:47 2011
@@ -53,7 +53,7 @@ public class JournalProtocolTranslatorPB
       Configuration conf) throws IOException {
     RPC.setProtocolEngine(conf, JournalProtocolPB.class, ProtobufRpcEngine.class);
     rpcProxy = RPC.getProxy(JournalProtocolPB.class,
-        JournalProtocol.versionID, nameNodeAddr, conf);
+        RPC.getProtocolVersion(JournalProtocolPB.class), nameNodeAddr, conf);
   }
 
   @Override
@@ -64,7 +64,7 @@ public class JournalProtocolTranslatorPB
   @Override
   public long getProtocolVersion(String protocolName, long clientVersion)
       throws IOException {
-    return 0;
+    return rpcProxy.getProtocolVersion(protocolName, clientVersion);
   }
 
   @Override

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java?rev=1211749&r1=1211748&r2=1211749&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java Thu Dec  8 02:57:47 2011
@@ -17,28 +17,72 @@
  */
 package org.apache.hadoop.hdfs.protocolPB;
 
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo.AdminStates;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockKeyProto;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockProto;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockTokenIdentifierProto;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockWithLocationsProto;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlocksWithLocationsProto;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.CheckpointCommandProto;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.CheckpointSignatureProto;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeIDProto;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeInfoProto;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeInfoProto.AdminState;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ExportedBlockKeysProto;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ExtendedBlockProto;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.LocatedBlockProto;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.LocatedBlockProto.Builder;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.NamenodeCommandProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.NamenodeRegistrationProto;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.NamespaceInfoProto;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.RecoveringBlockProto;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.RemoteEditLogManifestProto;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.RemoteEditLogProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.NamenodeRegistrationProto.NamenodeRoleProto;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ReplicaStateProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageInfoProto;
+import org.apache.hadoop.hdfs.security.token.block.BlockKey;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
 import org.apache.hadoop.hdfs.server.common.StorageInfo;
+import org.apache.hadoop.hdfs.server.namenode.CheckpointSignature;
+import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
+import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
+import org.apache.hadoop.hdfs.server.protocol.CheckpointCommand;
+import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
+import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
+import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
+import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
+import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.token.Token;
 
 import com.google.protobuf.ByteString;
 
 /**
- * Utilities for converting protobuf classes to and from 
- * implementation classes.
+ * Utilities for converting protobuf classes to and from implementation classes.
  */
 class PBHelper {
   private PBHelper() {
     /** Hidden constructor */
   }
-  
+
   public static ByteString getByteString(byte[] bytes) {
     return ByteString.copyFrom(bytes);
   }
-  
+
   public static NamenodeRole convert(NamenodeRoleProto role) {
     switch (role) {
     case NAMENODE:
@@ -50,7 +94,7 @@ class PBHelper {
     }
     return null;
   }
-  
+
   public static NamenodeRoleProto convert(NamenodeRole role) {
     switch (role) {
     case NAMENODE:
@@ -62,31 +106,315 @@ class PBHelper {
     }
     return null;
   }
-  
+
   public static StorageInfoProto convert(StorageInfo info) {
     return StorageInfoProto.newBuilder().setClusterID(info.getClusterID())
-        .setCTime(info.getCTime())
-        .setLayoutVersion(info.getLayoutVersion())
-        .setNamespceID(info.getNamespaceID())
-        .build();
+        .setCTime(info.getCTime()).setLayoutVersion(info.getLayoutVersion())
+        .setNamespceID(info.getNamespaceID()).build();
   }
-  
+
   public static StorageInfo convert(StorageInfoProto info) {
     return new StorageInfo(info.getLayoutVersion(), info.getNamespceID(),
         info.getClusterID(), info.getCTime());
   }
-  
-  
+
   public static NamenodeRegistrationProto convert(NamenodeRegistration reg) {
     return NamenodeRegistrationProto.newBuilder()
-        .setHttpAddress(reg.getHttpAddress())
-        .setRole(convert(reg.getRole()))
+        .setHttpAddress(reg.getHttpAddress()).setRole(convert(reg.getRole()))
         .setRpcAddress(reg.getAddress())
         .setStorageInfo(convert((StorageInfo) reg)).build();
   }
-  
+
   public static NamenodeRegistration convert(NamenodeRegistrationProto reg) {
     return new NamenodeRegistration(reg.getRpcAddress(), reg.getHttpAddress(),
         convert(reg.getStorageInfo()), convert(reg.getRole()));
   }
+
+  public static DatanodeID convert(DatanodeIDProto dn) {
+    return new DatanodeID(dn.getName(), dn.getStorageID(), dn.getInfoPort(),
+        dn.getIpcPort());
+  }
+
+  public static DatanodeIDProto convert(DatanodeID dn) {
+    return DatanodeIDProto.newBuilder().setName(dn.getName())
+        .setInfoPort(dn.getInfoPort()).setIpcPort(dn.getIpcPort())
+        .setStorageID(dn.getStorageID()).build();
+  }
+
+  public static BlockProto convert(Block b) {
+    return BlockProto.newBuilder().setBlockId(b.getBlockId())
+        .setGenStamp(b.getGenerationStamp()).setNumBytes(b.getNumBytes())
+        .build();
+  }
+
+  public static Block convert(BlockProto b) {
+    return new Block(b.getBlockId(), b.getGenStamp(), b.getNumBytes());
+  }
+
+  public static BlockWithLocationsProto convert(BlockWithLocations blk) {
+    return BlockWithLocationsProto.newBuilder()
+        .setBlock(convert(blk.getBlock()))
+        .addAllDatanodeIDs(Arrays.asList(blk.getDatanodes())).build();
+  }
+
+  public static BlockWithLocations convert(BlockWithLocationsProto b) {
+    return new BlockWithLocations(convert(b.getBlock()), b.getDatanodeIDsList()
+        .toArray(new String[0]));
+  }
+
+  public static BlocksWithLocationsProto convert(BlocksWithLocations blks) {
+    BlocksWithLocationsProto.Builder builder = BlocksWithLocationsProto
+        .newBuilder();
+    for (BlockWithLocations b : blks.getBlocks()) {
+      builder.addBlocks(convert(b));
+    }
+    return builder.build();
+  }
+
+  public static BlocksWithLocations convert(BlocksWithLocationsProto blocks) {
+    List<BlockWithLocationsProto> b = blocks.getBlocksList();
+    BlockWithLocations[] ret = new BlockWithLocations[b.size()];
+    int i = 0;
+    for (BlockWithLocationsProto entry : b) {
+      ret[i++] = convert(entry);
+    }
+    return new BlocksWithLocations(ret);
+  }
+
+  public static BlockKeyProto convert(BlockKey key) {
+    byte[] encodedKey = key.getEncodedKey();
+    ByteString keyBytes = ByteString.copyFrom(encodedKey == null ? new byte[0]
+        : encodedKey);
+    return BlockKeyProto.newBuilder().setKeyId(key.getKeyId())
+        .setKeyBytes(keyBytes).setExpiryDate(key.getExpiryDate()).build();
+  }
+
+  public static BlockKey convert(BlockKeyProto k) {
+    return new BlockKey(k.getKeyId(), k.getExpiryDate(), k.getKeyBytes()
+        .toByteArray());
+  }
+
+  public static ExportedBlockKeysProto convert(ExportedBlockKeys keys) {
+    ExportedBlockKeysProto.Builder builder = ExportedBlockKeysProto
+        .newBuilder();
+    builder.setIsBlockTokenEnabled(keys.isBlockTokenEnabled())
+        .setKeyUpdateInterval(keys.getKeyUpdateInterval())
+        .setTokenLifeTime(keys.getTokenLifetime())
+        .setCurrentKey(convert(keys.getCurrentKey()));
+    for (BlockKey k : keys.getAllKeys()) {
+      builder.addAllKeys(convert(k));
+    }
+    return builder.build();
+  }
+
+  public static ExportedBlockKeys convert(ExportedBlockKeysProto keys) {
+    return new ExportedBlockKeys(keys.getIsBlockTokenEnabled(),
+        keys.getKeyUpdateInterval(), keys.getTokenLifeTime(),
+        convert(keys.getCurrentKey()), convertBlockKeys(keys.getAllKeysList()));
+  }
+
+  public static CheckpointSignatureProto convert(CheckpointSignature s) {
+    return CheckpointSignatureProto.newBuilder()
+        .setBlockPoolId(s.getBlockpoolID())
+        .setCurSegmentTxId(s.getCurSegmentTxId())
+        .setMostRecentCheckpointTxId(s.getMostRecentCheckpointTxId())
+        .setStorageInfo(PBHelper.convert((StorageInfo) s)).build();
+  }
+
+  public static CheckpointSignature convert(CheckpointSignatureProto s) {
+    return new CheckpointSignature(PBHelper.convert(s.getStorageInfo()),
+        s.getBlockPoolId(), s.getMostRecentCheckpointTxId(),
+        s.getCurSegmentTxId());
+  }
+
+  public static RemoteEditLogProto convert(RemoteEditLog log) {
+    return RemoteEditLogProto.newBuilder().setEndTxId(log.getEndTxId())
+        .setStartTxId(log.getStartTxId()).build();
+  }
+
+  public static RemoteEditLog convert(RemoteEditLogProto l) {
+    return new RemoteEditLog(l.getStartTxId(), l.getEndTxId());
+  }
+
+  public static RemoteEditLogManifestProto convert(
+      RemoteEditLogManifest manifest) {
+    RemoteEditLogManifestProto.Builder builder = RemoteEditLogManifestProto
+        .newBuilder();
+    for (RemoteEditLog log : manifest.getLogs()) {
+      builder.addLogs(convert(log));
+    }
+    return builder.build();
+  }
+
+  public static RemoteEditLogManifest convert(
+      RemoteEditLogManifestProto manifest) {
+    List<RemoteEditLog> logs = new ArrayList<RemoteEditLog>(manifest
+        .getLogsList().size());
+    for (RemoteEditLogProto l : manifest.getLogsList()) {
+      logs.add(convert(l));
+    }
+    return new RemoteEditLogManifest(logs);
+  }
+
+  public static CheckpointCommandProto convert(CheckpointCommand cmd) {
+    return CheckpointCommandProto.newBuilder()
+        .setSignature(convert(cmd.getSignature())).build();
+  }
+
+  public static NamenodeCommandProto convert(NamenodeCommand cmd) {
+    if (cmd instanceof CheckpointCommand) {
+      return NamenodeCommandProto.newBuilder().setAction(cmd.getAction())
+          .setType(NamenodeCommandProto.Type.NamenodeCommand)
+          .setCheckpointCmd(convert((CheckpointCommand) cmd)).build();
+    }
+    return NamenodeCommandProto.newBuilder().setAction(cmd.getAction()).build();
+  }
+
+  public static BlockKey[] convertBlockKeys(List<BlockKeyProto> list) {
+    BlockKey[] ret = new BlockKey[list.size()];
+    int i = 0;
+    for (BlockKeyProto k : list) {
+      ret[i++] = convert(k);
+    }
+    return ret;
+  }
+
+  public static NamespaceInfo convert(NamespaceInfoProto info) {
+    StorageInfoProto storage = info.getStorageInfo();
+    return new NamespaceInfo(storage.getNamespceID(), storage.getClusterID(),
+        info.getBlockPoolID(), storage.getCTime(), info.getDistUpgradeVersion());
+  }
+
+  public static NamenodeCommand convert(NamenodeCommandProto cmd) {
+    switch (cmd.getType()) {
+    case CheckPointCommand:
+      CheckpointCommandProto chkPt = cmd.getCheckpointCmd();
+      return new CheckpointCommand(PBHelper.convert(chkPt.getSignature()),
+          chkPt.getNeedToReturnImage());
+    default:
+      return new NamenodeCommand(cmd.getAction());
+    }
+  }
+
+  public static ExtendedBlockProto convert(ExtendedBlock b) {
+    return ExtendedBlockProto.newBuilder().setBlockId(b.getBlockId())
+        .setGenerationStamp(b.getGenerationStamp())
+        .setNumBytes(b.getNumBytes()).setPoolId(b.getBlockPoolId()).build();
+  }
+
+  public static ExtendedBlock convert(ExtendedBlockProto b) {
+    return new ExtendedBlock(b.getPoolId(), b.getBlockId(), b.getNumBytes(),
+        b.getGenerationStamp());
+  }
+
+  public static RecoveringBlockProto convert(RecoveringBlock b) {
+    LocatedBlockProto lb = PBHelper.convert((LocatedBlock)b);
+    return RecoveringBlockProto.newBuilder().setBlock(lb)
+        .setNewGenStamp(b.getNewGenerationStamp()).build();
+  }
+
+  public static RecoveringBlock convert(RecoveringBlockProto b) {
+    ExtendedBlock block = convert(b.getBlock().getB());
+    DatanodeInfo[] locs = convert(b.getBlock().getLocsList());
+    return new RecoveringBlock(block, locs, b.getNewGenStamp());
+  }
+
+  public static DatanodeInfo[] convert(List<DatanodeInfoProto> list) {
+    DatanodeInfo[] info = new DatanodeInfo[list.size()];
+    for (int i = 0; i < info.length; i++) {
+      info[i] = convert(list.get(i));
+    }
+    return info;
+  }
+
+  public static DatanodeInfo convert(DatanodeInfoProto info) {
+    DatanodeIDProto dnId = info.getId();
+    return new DatanodeInfo(dnId.getName(), dnId.getStorageID(),
+        dnId.getInfoPort(), dnId.getIpcPort(), info.getCapacity(),
+        info.getDfsUsed(), info.getRemaining(), info.getBlockPoolUsed(),
+        info.getLastUpdate(), info.getXceiverCount(), info.getLocation(),
+        info.getHostName(), convert(info.getAdminState()));
+  }
+  
+  public static DatanodeInfoProto convert(DatanodeInfo info) {
+    return DatanodeInfoProto.newBuilder()
+        .setAdminState(PBHelper.convert(info.getAdminState()))
+        .setBlockPoolUsed(info.getBlockPoolUsed())
+        .setCapacity(info.getCapacity())
+        .setDfsUsed(info.getDfsUsed())
+        .setHostName(info.getHostName())
+        .setId(PBHelper.convert((DatanodeID)info))
+        .setLastUpdate(info.getLastUpdate())
+        .setLocation(info.getNetworkLocation())
+        .setRemaining(info.getRemaining())
+        .setXceiverCount(info.getXceiverCount())
+        .build();
+  }
+
+  public static AdminStates convert(AdminState adminState) {
+    switch(adminState) {
+    case DECOMMISSION_INPROGRESS:
+      return AdminStates.DECOMMISSION_INPROGRESS;
+    case DECOMMISSIONED:
+      return AdminStates.DECOMMISSIONED;
+    case NORMAL:
+    default:
+      return AdminStates.NORMAL;
+    }
+  }
+  
+  public static AdminState convert(AdminStates adminState) {
+    switch(adminState) {
+    case DECOMMISSION_INPROGRESS:
+      return AdminState.DECOMMISSION_INPROGRESS;
+    case DECOMMISSIONED:
+      return AdminState.DECOMMISSIONED;
+    case NORMAL:
+    default:
+      return AdminState.NORMAL;
+    }
+  }
+
+  public static LocatedBlockProto convert(LocatedBlock b) {
+    Builder builder = LocatedBlockProto.newBuilder();
+    DatanodeInfo[] locs = b.getLocations();
+    for(DatanodeInfo loc : locs) {
+      builder.addLocs(PBHelper.convert(loc));
+    }
+    return builder.setB(PBHelper.convert(b.getBlock()))
+        .setBlockToken(PBHelper.convert(b.getBlockToken()))
+        .setCorrupt(b.isCorrupt()).setOffset(b.getStartOffset()).build();
+  }
+
+  public static BlockTokenIdentifierProto convert(
+      Token<BlockTokenIdentifier> token) {
+    ByteString tokenId = ByteString.copyFrom(token.getIdentifier());
+    ByteString password = ByteString.copyFrom(token.getPassword());
+    return BlockTokenIdentifierProto.newBuilder().setIdentifier(tokenId)
+        .setKind(token.getKind().toString()).setPassword(password)
+        .setService(token.getService().toString()).build();
+  }
+  
+  public static Token<BlockTokenIdentifier> convert(
+      BlockTokenIdentifierProto blockToken) {
+    return new Token<BlockTokenIdentifier>(blockToken.getIdentifier()
+        .toByteArray(), blockToken.getPassword().toByteArray(), new Text(
+        blockToken.getKind()), new Text(blockToken.getService()));
+  }
+
+  public static ReplicaState convert(ReplicaStateProto state) {
+    switch (state) {
+    case RBW:
+      return ReplicaState.RBW;
+    case RUR:
+      return ReplicaState.RUR;
+    case RWR:
+      return ReplicaState.RWR;
+    case TEMPORARY:
+      return ReplicaState.TEMPORARY;
+    case FINALIZED:
+    default:
+      return ReplicaState.FINALIZED;
+    }
+  }
 }

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockKey.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockKey.java?rev=1211749&r1=1211748&r2=1211749&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockKey.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockKey.java Thu Dec  8 02:57:47 2011
@@ -36,4 +36,8 @@ public class BlockKey extends Delegation
   public BlockKey(int keyId, long expiryDate, SecretKey key) {
     super(keyId, expiryDate, key);
   }
+  
+  public BlockKey(int keyId, long expiryDate, byte[] encodedKey) {
+    super(keyId, expiryDate, encodedKey);
+  }
 }

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Util.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Util.java?rev=1211749&r1=1211748&r2=1211749&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Util.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Util.java Thu Dec  8 02:57:47 2011
@@ -68,14 +68,28 @@ public final class Util {
   }
 
   /**
-   * Converts the passed File to a URI.
-   *
+   * Converts the passed File to a URI. This method trims the trailing slash if
+   * one is appended because the underlying file is in fact a directory that
+   * exists.
+   * 
    * @param f the file to convert
-   * @return the resulting URI 
-   * @throws IOException 
+   * @return the resulting URI
+   * @throws IOException
    */
   public static URI fileAsURI(File f) throws IOException {
-    return f.getCanonicalFile().toURI();
+    URI u = f.getCanonicalFile().toURI();
+    
+    // trim the trailing slash, if it's present
+    if (u.getPath().endsWith("/")) {
+      String uriAsString = u.toString();
+      try {
+        u = new URI(uriAsString.substring(0, uriAsString.length() - 1));
+      } catch (URISyntaxException e) {
+        throw new IOException(e);
+      }
+    }
+    
+    return u;
   }
 
   /**

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java?rev=1211749&r1=1211748&r2=1211749&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java Thu Dec  8 02:57:47 2011
@@ -22,10 +22,10 @@ import java.net.URI;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.List;
 import java.lang.reflect.Constructor;
 
+import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -125,6 +125,13 @@ public class FSEditLog  {
 
   private NNStorage storage;
   private Configuration conf;
+  
+  private Collection<URI> editsDirs;
+  
+  /**
+   * The edit directories that are shared between primary and secondary.
+   */
+  private Collection<URI> sharedEditsDirs;
 
   private static class TransactionId {
     public long txid;
@@ -141,24 +148,22 @@ public class FSEditLog  {
     }
   };
 
-  final private Collection<URI> editsDirs;
-  
-  /**
-   * The edit directories that are shared between primary and secondary.
-   */
-  final private Collection<URI> sharedEditsDirs;
-
   /**
    * Construct FSEditLog with default configuration, taking editDirs from NNStorage
+   * 
    * @param storage Storage object used by namenode
    */
   @VisibleForTesting
-  FSEditLog(NNStorage storage) {
-    this(new Configuration(), storage, Collections.<URI>emptyList());
+  FSEditLog(NNStorage storage) throws IOException {
+    Configuration conf = new Configuration();
+    // Make sure the edits dirs are set in the provided configuration object.
+    conf.set(DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY,
+        StringUtils.join(storage.getEditsDirectories(), ","));
+    init(conf, storage, FSNamesystem.getNamespaceEditsDirs(conf));
   }
 
   /**
-   * Constructor for FSEditLog. Add underlying journals are constructed, but 
+   * Constructor for FSEditLog. Underlying journals are constructed, but 
    * no streams are opened until open() is called.
    * 
    * @param conf The namenode configuration
@@ -166,27 +171,20 @@ public class FSEditLog  {
    * @param editsDirs List of journals to use
    */
   FSEditLog(Configuration conf, NNStorage storage, Collection<URI> editsDirs) {
-    this.conf = conf;
+    init(conf, storage, editsDirs);
+  }
+  
+  private void init(Configuration conf, NNStorage storage, Collection<URI> editsDirs) {
     isSyncRunning = false;
+    this.conf = conf;
     this.storage = storage;
     metrics = NameNode.getNameNodeMetrics();
     lastPrintTime = now();
-    
-    if (editsDirs.isEmpty()) { 
-      // if this is the case, no edit dirs have been explictly configured
-      // image dirs are to be used for edits too
-      try {
-        editsDirs = Lists.newArrayList(storage.getEditsDirectories());
-      } catch (IOException ioe) {
-        // cannot get list from storage, so the empty editsDirs 
-        // will be assigned. an error will be thrown on first use
-        // of the editlog, as no journals will exist
-      }
-      this.editsDirs = editsDirs;
-    } else {
-      this.editsDirs = Lists.newArrayList(editsDirs);
-    }
-    
+     
+    // If this list is empty, an error will be thrown on first use
+    // of the editlog, as no journals will exist
+    this.editsDirs = Lists.newArrayList(editsDirs);
+
     this.sharedEditsDirs = FSNamesystem.getSharedEditsDirs(conf);
   }
   
@@ -212,15 +210,21 @@ public class FSEditLog  {
   }
   
   private void initJournals(Collection<URI> dirs) {
-    this.journalSet = new JournalSet();
+    int minimumRedundantJournals = conf.getInt(
+        DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_MINIMUM_KEY,
+        DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_MINIMUM_DEFAULT);
+
+    journalSet = new JournalSet(minimumRedundantJournals);
     for (URI u : dirs) {
+      boolean required = FSNamesystem.getRequiredNamespaceEditsDirs(conf)
+          .contains(u);
       if (u.getScheme().equals(NNStorage.LOCAL_URI_SCHEME)) {
         StorageDirectory sd = storage.getStorageDirectory(u);
         if (sd != null) {
-          journalSet.add(new FileJournalManager(sd));
+          journalSet.add(new FileJournalManager(sd), required);
         }
       } else {
-        journalSet.add(createJournal(u));
+        journalSet.add(createJournal(u), required);
       }
     }
  
@@ -491,7 +495,7 @@ public class FSEditLog  {
             }
             editLogStream.setReadyToFlush();
           } catch (IOException e) {
-            LOG.fatal("Could not sync any journal to persistent storage. "
+            LOG.fatal("Could not sync enough journals to persistent storage. "
                 + "Unsynced transactions: " + (txid - synctxid),
                 new Exception());
             runtime.exit(1);
@@ -513,7 +517,7 @@ public class FSEditLog  {
         }
       } catch (IOException ex) {
         synchronized (this) {
-          LOG.fatal("Could not sync any journal to persistent storage. "
+          LOG.fatal("Could not sync enough journals to persistent storage. "
               + "Unsynced transactions: " + (txid - synctxid), new Exception());
           runtime.exit(1);
         }
@@ -966,7 +970,7 @@ public class FSEditLog  {
     
     LOG.info("Registering new backup node: " + bnReg);
     BackupJournalManager bjm = new BackupJournalManager(bnReg, nnReg);
-    journalSet.add(bjm);
+    journalSet.add(bjm, true);
   }
   
   synchronized void releaseBackupStream(NamenodeRegistration registration)

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java?rev=1211749&r1=1211748&r2=1211749&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java Thu Dec  8 02:57:47 2011
@@ -759,7 +759,7 @@ public class FSImage implements Closeabl
    * FSImageSaver assumes that it was launched from a thread that holds
    * FSNamesystem lock and waits for the execution of FSImageSaver thread
    * to finish.
-   * This way we are guraranteed that the namespace is not being updated
+   * This way we are guaranteed that the namespace is not being updated
    * while multiple instances of FSImageSaver are traversing it
    * and writing it out.
    */

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1211749&r1=1211748&r2=1211749&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Thu Dec  8 02:57:47 2011
@@ -33,6 +33,7 @@ import static org.apache.hadoop.hdfs.DFS
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_RENEW_INTERVAL_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_REQUIRED_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_MAX_OBJECTS_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_MAX_OBJECTS_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY;
@@ -547,6 +548,10 @@ public class FSNamesystem implements Nam
   public static Collection<URI> getNamespaceDirs(Configuration conf) {
     return getStorageDirs(conf, DFS_NAMENODE_NAME_DIR_KEY);
   }
+  
+  public static Collection<URI> getRequiredNamespaceEditsDirs(Configuration conf) {
+    return getStorageDirs(conf, DFS_NAMENODE_EDITS_DIR_REQUIRED_KEY);
+  }
 
   private static Collection<URI> getStorageDirs(Configuration conf,
                                                 String propertyName) {
@@ -581,7 +586,13 @@ public class FSNamesystem implements Nam
   public static Collection<URI> getNamespaceEditsDirs(Configuration conf) {
     Collection<URI> editsDirs = getStorageDirs(conf, DFS_NAMENODE_EDITS_DIR_KEY);
     editsDirs.addAll(getSharedEditsDirs(conf));
-    return editsDirs;
+    if (editsDirs.isEmpty()) {
+      // If this is the case, no edit dirs have been explicitly configured.
+      // Image dirs are to be used for edits too.
+      return getNamespaceDirs(conf);
+    } else {
+      return editsDirs;
+    }
   }
   
   /**

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalSet.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalSet.java?rev=1211749&r1=1211748&r2=1211749&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalSet.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalSet.java Thu Dec  8 02:57:47 2011
@@ -50,17 +50,16 @@ public class JournalSet implements Journ
    * 
    * If a Journal gets disabled due to an error writing to its
    * stream, then the stream will be aborted and set to null.
-   * 
-   * This should be used outside JournalSet only for testing.
    */
-  @VisibleForTesting
-  static class JournalAndStream {
+  static class JournalAndStream implements CheckableNameNodeResource {
     private final JournalManager journal;
     private boolean disabled = false;
     private EditLogOutputStream stream;
+    private boolean required = false;
     
-    public JournalAndStream(JournalManager manager) {
+    public JournalAndStream(JournalManager manager, boolean required) {
       this.journal = manager;
+      this.required = required;
     }
 
     public void startLogSegment(long txId) throws IOException {
@@ -132,9 +131,24 @@ public class JournalSet implements Journ
     private void setDisabled(boolean disabled) {
       this.disabled = disabled;
     }
+    
+    @Override
+    public boolean isResourceAvailable() {
+      return !isDisabled();
+    }
+    
+    @Override
+    public boolean isRequired() {
+      return required;
+    }
   }
   
   private List<JournalAndStream> journals = Lists.newArrayList();
+  final int minimumRedundantJournals;
+  
+  JournalSet(int minimumRedundantResources) {
+    this.minimumRedundantJournals = minimumRedundantResources;
+  }
   
   @Override
   public EditLogOutputStream startLogSegment(final long txId) throws IOException {
@@ -232,16 +246,15 @@ public class JournalSet implements Journ
   }
 
   /**
-   * Returns true if there are no journals or all are disabled.
-   * @return True if no journals or all are disabled.
+   * Returns true if there are no journals, all redundant journals are disabled,
+   * or any required journals are disabled.
+   * 
+   * @return True if there no journals, all redundant journals are disabled,
+   * or any required journals are disabled.
    */
   public boolean isEmpty() {
-    for (JournalAndStream jas : journals) {
-      if (!jas.isDisabled()) {
-        return false;
-      }
-    }
-    return true;
+    return !NameNodeResourcePolicy.areResourcesAvailable(journals,
+        minimumRedundantJournals);
   }
   
   /**
@@ -292,9 +305,11 @@ public class JournalSet implements Journ
       }
     }
     disableAndReportErrorOnJournals(badJAS);
-    if (badJAS.size() >= journals.size()) {
-      LOG.error("Error: "+status+" failed for all journals");
-      throw new IOException(status+" failed on all the journals");
+    if (!NameNodeResourcePolicy.areResourcesAvailable(journals,
+        minimumRedundantJournals)) {
+      String message = status + " failed for too many journals";
+      LOG.error("Error: " + message);
+      throw new IOException(message);
     }
   }
   
@@ -450,8 +465,9 @@ public class JournalSet implements Journ
     return jList;
   }
 
-  void add(JournalManager j) {
-    journals.add(new JournalAndStream(j));
+  void add(JournalManager j, boolean required) {
+    JournalAndStream jas = new JournalAndStream(j, required);
+    journals.add(jas);
   }
   
   void remove(JournalManager j) {

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeResourceChecker.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeResourceChecker.java?rev=1211749&r1=1211748&r2=1211749&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeResourceChecker.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeResourceChecker.java Thu Dec  8 02:57:47 2011
@@ -27,6 +27,7 @@ import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.DF;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
@@ -40,37 +41,80 @@ import com.google.common.base.Predicate;
  * 
  * NameNodeResourceChecker provides a method -
  * <code>hasAvailableDiskSpace</code> - which will return true if and only if
- * the NameNode has disk space available on all volumes which are configured to
- * be checked. Volumes containing file system name/edits dirs are added by
- * default, and arbitrary extra volumes may be configured as well.
+ * the NameNode has disk space available on all required volumes, and any volume
+ * which is configured to be redundant. Volumes containing file system edits dirs
+ * are added by default, and arbitrary extra volumes may be configured as well.
  */
-public class NameNodeResourceChecker {
+@InterfaceAudience.Private
+class NameNodeResourceChecker {
   private static final Log LOG = LogFactory.getLog(NameNodeResourceChecker.class.getName());
 
   // Space (in bytes) reserved per volume.
   private long duReserved;
 
   private final Configuration conf;
-  private Map<String, DF> volumes;
+  private Map<String, CheckedVolume> volumes;
+  private int minimumRedundantVolumes;
+  
+  @VisibleForTesting
+  class CheckedVolume implements CheckableNameNodeResource {
+    private DF df;
+    private boolean required;
+    private String volume;
+    
+    public CheckedVolume(File dirToCheck, boolean required)
+        throws IOException {
+      df = new DF(dirToCheck, conf);
+      this.required = required;
+      volume = df.getFilesystem();
+    }
+    
+    public String getVolume() {
+      return volume;
+    }
+    
+    @Override
+    public boolean isRequired() {
+      return required;
+    }
+
+    @Override
+    public boolean isResourceAvailable() {
+      long availableSpace = df.getAvailable();
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Space available on volume '" + volume + "' is "
+            + availableSpace);
+      }
+      if (availableSpace < duReserved) {
+        LOG.warn("Space available on volume '" + volume + "' is "
+            + availableSpace +
+            ", which is below the configured reserved amount " + duReserved);
+        return false;
+      } else {
+        return true;
+      }
+    }
+    
+    @Override
+    public String toString() {
+      return "volume: " + volume + " required: " + required +
+          " resource available: " + isResourceAvailable();
+    }
+  }
 
   /**
-   * Create a NameNodeResourceChecker, which will check the name dirs and edits
-   * dirs set in <code>conf</code>.
-   * 
-   * @param conf
-   * @throws IOException
+   * Create a NameNodeResourceChecker, which will check the edits dirs and any
+   * additional dirs to check set in <code>conf</code>.
    */
   public NameNodeResourceChecker(Configuration conf) throws IOException {
     this.conf = conf;
-    volumes = new HashMap<String, DF>();
+    volumes = new HashMap<String, CheckedVolume>();
 
     duReserved = conf.getLong(DFSConfigKeys.DFS_NAMENODE_DU_RESERVED_KEY,
         DFSConfigKeys.DFS_NAMENODE_DU_RESERVED_DEFAULT);
-  
+    
     Collection<URI> extraCheckedVolumes = Util.stringCollectionAsURIs(conf
         .getTrimmedStringCollection(DFSConfigKeys.DFS_NAMENODE_CHECKED_VOLUMES_KEY));
-
-    addDirsToCheck(FSNamesystem.getNamespaceDirs(conf));
     
     Collection<URI> localEditDirs = Collections2.filter(
         FSNamesystem.getNamespaceEditsDirs(conf),
@@ -82,70 +126,86 @@ public class NameNodeResourceChecker {
             return false;
           }
         });
-    addDirsToCheck(localEditDirs);
-    addDirsToCheck(extraCheckedVolumes);
+
+    // Add all the local edits dirs, marking some as required if they are
+    // configured as such.
+    for (URI editsDirToCheck : localEditDirs) {
+      addDirToCheck(editsDirToCheck,
+          FSNamesystem.getRequiredNamespaceEditsDirs(conf).contains(
+              editsDirToCheck));
+    }
+
+    // All extra checked volumes are marked "required"
+    for (URI extraDirToCheck : extraCheckedVolumes) {
+      addDirToCheck(extraDirToCheck, true);
+    }
+    
+    minimumRedundantVolumes = conf.getInt(
+        DFSConfigKeys.DFS_NAMENODE_CHECKED_VOLUMES_MINIMUM_KEY,
+        DFSConfigKeys.DFS_NAMENODE_CHECKED_VOLUMES_MINIMUM_DEFAULT);
   }
 
   /**
-   * Add the passed-in directories to the list of volumes to check.
+   * Add the volume of the passed-in directory to the list of volumes to check.
+   * If <code>required</code> is true, and this volume is already present, but
+   * is marked redundant, it will be marked required. If the volume is already
+   * present but marked required then this method is a no-op.
    * 
-   * @param directoriesToCheck
-   *          The directories whose volumes will be checked for available space.
-   * @throws IOException
+   * @param directoryToCheck
+   *          The directory whose volume will be checked for available space.
    */
-  private void addDirsToCheck(Collection<URI> directoriesToCheck)
+  private void addDirToCheck(URI directoryToCheck, boolean required)
       throws IOException {
-    for (URI directoryUri : directoriesToCheck) {
-      File dir = new File(directoryUri.getPath());
-      if (!dir.exists()) {
-        throw new IOException("Missing directory "+dir.getAbsolutePath());
-      }
-      DF df = new DF(dir, conf);
-      volumes.put(df.getFilesystem(), df);
+    File dir = new File(directoryToCheck.getPath());
+    if (!dir.exists()) {
+      throw new IOException("Missing directory "+dir.getAbsolutePath());
+    }
+    
+    CheckedVolume newVolume = new CheckedVolume(dir, required);
+    CheckedVolume volume = volumes.get(newVolume.getVolume());
+    if (volume == null || (volume != null && !volume.isRequired())) {
+      volumes.put(newVolume.getVolume(), newVolume);
     }
   }
 
   /**
    * Return true if disk space is available on at least one of the configured
-   * volumes.
+   * redundant volumes, and all of the configured required volumes.
    * 
    * @return True if the configured amount of disk space is available on at
-   *         least one volume, false otherwise.
-   * @throws IOException
+   *         least one redundant volume and all of the required volumes, false
+   *         otherwise.
    */
   boolean hasAvailableDiskSpace()
       throws IOException {
-    return getVolumesLowOnSpace().size() < volumes.size();
+    return NameNodeResourcePolicy.areResourcesAvailable(volumes.values(),
+        minimumRedundantVolumes);
   }
 
   /**
    * Return the set of directories which are low on space.
+   * 
    * @return the set of directories whose free space is below the threshold.
-   * @throws IOException 
    */
+  @VisibleForTesting
   Collection<String> getVolumesLowOnSpace() throws IOException {
     if (LOG.isDebugEnabled()) {
       LOG.debug("Going to check the following volumes disk space: " + volumes);
     }
     Collection<String> lowVolumes = new ArrayList<String>();
-    for (DF volume : volumes.values()) {
-      long availableSpace = volume.getAvailable();
-      String fileSystem = volume.getFilesystem();
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Space available on volume '" + fileSystem + "' is " + availableSpace);
-      }
-      if (availableSpace < duReserved) {
-        LOG.warn("Space available on volume '" + fileSystem + "' is "
-            + availableSpace +
-            ", which is below the configured reserved amount " + duReserved);
-        lowVolumes.add(volume.getFilesystem());
-      }
+    for (CheckedVolume volume : volumes.values()) {
+      lowVolumes.add(volume.getVolume());
     }
     return lowVolumes;
   }
   
   @VisibleForTesting
-  void setVolumes(Map<String, DF> volumes) {
+  void setVolumes(Map<String, CheckedVolume> volumes) {
     this.volumes = volumes;
   }
+  
+  @VisibleForTesting
+  void setMinimumReduntdantVolumes(int minimumRedundantVolumes) {
+    this.minimumRedundantVolumes = minimumRedundantVolumes;
+  }
 }

Propchange: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/native/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Dec  8 02:57:47 2011
@@ -1,4 +1,4 @@
-/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native:1152502-1210663
+/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native:1152502-1211747
 /hadoop/core/branches/branch-0.19/hdfs/src/main/native:713112
 /hadoop/core/branches/branch-0.19/mapred/src/c++/libhdfs:713112
 /hadoop/core/trunk/src/c++/libhdfs:776175-784663

Propchange: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/datanode/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Dec  8 02:57:47 2011
@@ -1,4 +1,4 @@
-/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/datanode:1159757-1210663
+/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/datanode:1159757-1211747
 /hadoop/core/branches/branch-0.19/hdfs/src/main/webapps/datanode:713112
 /hadoop/core/branches/branch-0.19/hdfs/src/webapps/datanode:713112
 /hadoop/core/trunk/src/webapps/datanode:776175-784663

Propchange: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/hdfs/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Dec  8 02:57:47 2011
@@ -1,4 +1,4 @@
-/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/hdfs:1152502-1210663
+/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/hdfs:1152502-1211747
 /hadoop/core/branches/branch-0.19/hdfs/src/main/webapps/hdfs:713112
 /hadoop/core/branches/branch-0.19/hdfs/src/webapps/hdfs:713112
 /hadoop/core/trunk/src/webapps/hdfs:776175-784663

Propchange: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/secondary/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Dec  8 02:57:47 2011
@@ -1,4 +1,4 @@
-/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/secondary:1152502-1210663
+/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/secondary:1152502-1211747
 /hadoop/core/branches/branch-0.19/hdfs/src/main/webapps/secondary:713112
 /hadoop/core/branches/branch-0.19/hdfs/src/webapps/secondary:713112
 /hadoop/core/trunk/src/webapps/secondary:776175-784663

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/proto/InterDatanodeProtocol.proto
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/proto/InterDatanodeProtocol.proto?rev=1211749&r1=1211748&r2=1211749&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/proto/InterDatanodeProtocol.proto (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/proto/InterDatanodeProtocol.proto Thu Dec  8 02:57:47 2011
@@ -38,7 +38,7 @@ message InitReplicaRecoveryRequestProto 
  * Repica recovery information
  */
 message InitReplicaRecoveryResponseProto {
-  required ReplicaState state = 1; // State fo the replica
+  required ReplicaStateProto state = 1; // State of the replica
   required BlockProto block = 2;   // block information
 }
 

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/proto/NamenodeProtocol.proto
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/proto/NamenodeProtocol.proto?rev=1211749&r1=1211748&r2=1211749&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/proto/NamenodeProtocol.proto (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/proto/NamenodeProtocol.proto Thu Dec  8 02:57:47 2011
@@ -42,7 +42,7 @@ message GetBlocksRequestProto {
  * blocks - List of returned blocks
  */
 message GetBlocksResponseProto {
-  required BlockWithLocationsProto blocks = 1; // List of blocks
+  required BlocksWithLocationsProto blocks = 1; // List of blocks
 }
 
 /**
@@ -85,12 +85,25 @@ message RollEditLogResponseProto {
 }
 
 /**
- * registartion - Namenode reporting the error
+ * void request
+ */
+message VersionRequestProto {
+}
+
+/**
+ * void request
+ */
+message VersionResponseProto {
+  required NamespaceInfoProto info = 1;
+}
+
+/**
+ * registration - Namenode reporting the error
  * errorCode - error code indicating the error
  * msg - Free text description of the error
  */
 message ErrorReportRequestProto {
-  required NamenodeRegistrationProto registartion = 1; // Registartion info
+  required NamenodeRegistrationProto registration = 1; // Registration info
   required uint32 errorCode = 2;  // Error code
   required string msg = 3;        // Error message
 }
@@ -194,6 +207,11 @@ service NamenodeProtocolService {
   rpc rollEditLog(RollEditLogRequestProto) returns(RollEditLogResponseProto);
 
   /**
+   * Close the current editlog and open a new one for checkpointing purposes
+   */
+  rpc versionRequest(VersionRequestProto) returns(VersionResponseProto);
+
+  /**
    * Report from a sub-ordinate namenode of an error to the active namenode.
    * Active namenode may decide to unregister the reporting namenode 
    * depending on the error.

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/proto/hdfs.proto
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/proto/hdfs.proto?rev=1211749&r1=1211748&r2=1211749&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/proto/hdfs.proto (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/proto/hdfs.proto Thu Dec  8 02:57:47 2011
@@ -270,8 +270,8 @@ message BlockProto {
  * Block and datanodes where is it located
  */
 message BlockWithLocationsProto {
-  required BlockProto block = 1;            // Block
-  repeated DatanodeIDProto datanodeIDs = 2; // Datanodes with replicas of the block
+  required BlockProto block = 1;   // Block
+  repeated string datanodeIDs = 2; // Datanodes with replicas of the block
 }
 
 /**
@@ -329,7 +329,7 @@ message ExportedBlockKeysProto {
 /**
  * State of a block replica at a datanode
  */
-enum ReplicaState {
+enum ReplicaStateProto {
   FINALIZED = 0;  // State of a replica when it is not modified
   RBW = 1;        // State of replica that is being written to
   RWR = 2;        // State of replica that is waiting to be recovered

Propchange: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/hdfs/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Dec  8 02:57:47 2011
@@ -1,4 +1,4 @@
-/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/hdfs:1159757-1210663
+/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/hdfs:1159757-1211747
 /hadoop/core/branches/branch-0.19/hdfs/src/test/hdfs:713112
 /hadoop/core/trunk/src/test/hdfs:776175-785643
 /hadoop/hdfs/branches/HDFS-1052/src/test/hdfs:987665-1095512

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java?rev=1211749&r1=1211748&r2=1211749&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java Thu Dec  8 02:57:47 2011
@@ -19,12 +19,46 @@ package org.apache.hadoop.hdfs.protocolP
 
 import static junit.framework.Assert.*;
 
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.EnumSet;
+import java.util.List;
+
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockKeyProto;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockProto;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockTokenIdentifierProto;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockWithLocationsProto;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlocksWithLocationsProto;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.CheckpointCommandProto;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.CheckpointSignatureProto;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeIDProto;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ExportedBlockKeysProto;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ExtendedBlockProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.NamenodeRegistrationProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.NamenodeRegistrationProto.NamenodeRoleProto;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.RecoveringBlockProto;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.RemoteEditLogManifestProto;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.RemoteEditLogProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageInfoProto;
+import org.apache.hadoop.hdfs.security.token.block.BlockKey;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager.AccessMode;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
 import org.apache.hadoop.hdfs.server.common.StorageInfo;
+import org.apache.hadoop.hdfs.server.namenode.CheckpointSignature;
+import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
+import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
+import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
+import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
+import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.token.Token;
 import org.junit.Test;
 
 /**
@@ -46,10 +80,14 @@ public class TestPBHelper {
     assertEquals(NamenodeRole.NAMENODE,
         PBHelper.convert(NamenodeRoleProto.NAMENODE));
   }
-  
+
+  private static StorageInfo getStorageInfo() {
+    return new StorageInfo(1, 2, "cid", 3);
+  }
+
   @Test
   public void testConvertStoragInfo() {
-    StorageInfo info = new StorageInfo(1, 2, "cid", 3);
+    StorageInfo info = getStorageInfo();
     StorageInfoProto infoProto = PBHelper.convert(info);
     StorageInfo info2 = PBHelper.convert(infoProto);
     assertEquals(info.getClusterID(), info2.getClusterID());
@@ -57,10 +95,10 @@ public class TestPBHelper {
     assertEquals(info.getLayoutVersion(), info2.getLayoutVersion());
     assertEquals(info.getNamespaceID(), info2.getNamespaceID());
   }
-  
+
   @Test
   public void testConvertNamenodeRegistration() {
-    StorageInfo info = new StorageInfo(1, 2, "cid", 3);
+    StorageInfo info = getStorageInfo();
     NamenodeRegistration reg = new NamenodeRegistration("address:999",
         "http:1000", info, NamenodeRole.NAMENODE);
     NamenodeRegistrationProto regProto = PBHelper.convert(reg);
@@ -74,6 +112,217 @@ public class TestPBHelper {
     assertEquals(reg.getRegistrationID(), reg2.getRegistrationID());
     assertEquals(reg.getRole(), reg2.getRole());
     assertEquals(reg.getVersion(), reg2.getVersion());
+
+  }
+
+  @Test
+  public void testConvertDatanodeID() {
+    DatanodeID dn = new DatanodeID("node", "sid", 1, 2);
+    DatanodeIDProto dnProto = PBHelper.convert(dn);
+    DatanodeID dn2 = PBHelper.convert(dnProto);
+    assertEquals(dn.getHost(), dn2.getHost());
+    assertEquals(dn.getInfoPort(), dn2.getInfoPort());
+    assertEquals(dn.getIpcPort(), dn2.getIpcPort());
+    assertEquals(dn.getName(), dn2.getName());
+    assertEquals(dn.getPort(), dn2.getPort());
+    assertEquals(dn.getStorageID(), dn2.getStorageID());
+  }
+
+  @Test
+  public void testConvertBlock() {
+    Block b = new Block(1, 100, 3);
+    BlockProto bProto = PBHelper.convert(b);
+    Block b2 = PBHelper.convert(bProto);
+    assertEquals(b, b2);
+  }
+
+  private static BlockWithLocations getBlockWithLocations(int bid) {
+    return new BlockWithLocations(new Block(bid, 0, 1), new String[] { "dn1",
+        "dn2", "dn3" });
+  }
+
+  private void compare(BlockWithLocations locs1, BlockWithLocations locs2) {
+    assertEquals(locs1.getBlock(), locs2.getBlock());
+    assertTrue(Arrays.equals(locs1.getDatanodes(), locs2.getDatanodes()));
+  }
+
+  @Test
+  public void testConvertBlockWithLocations() {
+    BlockWithLocations locs = getBlockWithLocations(1);
+    BlockWithLocationsProto locsProto = PBHelper.convert(locs);
+    BlockWithLocations locs2 = PBHelper.convert(locsProto);
+    compare(locs, locs2);
+  }
+
+  @Test
+  public void testConvertBlocksWithLocations() {
+    BlockWithLocations[] list = new BlockWithLocations[] {
+        getBlockWithLocations(1), getBlockWithLocations(2) };
+    BlocksWithLocations locs = new BlocksWithLocations(list);
+    BlocksWithLocationsProto locsProto = PBHelper.convert(locs);
+    BlocksWithLocations locs2 = PBHelper.convert(locsProto);
+    BlockWithLocations[] blocks = locs.getBlocks();
+    BlockWithLocations[] blocks2 = locs2.getBlocks();
+    assertEquals(blocks.length, blocks2.length);
+    for (int i = 0; i < blocks.length; i++) {
+      compare(blocks[i], blocks2[i]);
+    }
+  }
+
+  private static BlockKey getBlockKey(int keyId) {
+    return new BlockKey(keyId, 10, "encodedKey".getBytes());
+  }
+
+  private void compare(BlockKey k1, BlockKey k2) {
+    assertEquals(k1.getExpiryDate(), k2.getExpiryDate());
+    assertEquals(k1.getKeyId(), k2.getKeyId());
+    assertTrue(Arrays.equals(k1.getEncodedKey(), k2.getEncodedKey()));
+
+  }
+
+  @Test
+  public void testConvertBlockKey() {
+    BlockKey key = getBlockKey(1);
+    BlockKeyProto keyProto = PBHelper.convert(key);
+    BlockKey key1 = PBHelper.convert(keyProto);
+    compare(key, key1);
+  }
+
+  @Test
+  public void testConvertExportedBlockKeys() {
+    BlockKey[] keys = new BlockKey[] { getBlockKey(2), getBlockKey(3) };
+    ExportedBlockKeys expKeys = new ExportedBlockKeys(true, 9, 10,
+        getBlockKey(1), keys);
+    ExportedBlockKeysProto expKeysProto = PBHelper.convert(expKeys);
+    ExportedBlockKeys expKeys1 = PBHelper.convert(expKeysProto);
+
+    BlockKey[] allKeys = expKeys.getAllKeys();
+    BlockKey[] allKeys1 = expKeys1.getAllKeys();
+    assertEquals(allKeys.length, allKeys1.length);
+    for (int i = 0; i < allKeys.length; i++) {
+      compare(allKeys[i], allKeys1[i]);
+    }
+    compare(expKeys.getCurrentKey(), expKeys1.getCurrentKey());
+    assertEquals(expKeys.getKeyUpdateInterval(),
+        expKeys1.getKeyUpdateInterval());
+    assertEquals(expKeys.getTokenLifetime(), expKeys1.getTokenLifetime());
+  }
+
+  @Test
+  public void testConvertCheckpointSignature() {
+    CheckpointSignature s = new CheckpointSignature(getStorageInfo(), "bpid",
+        100, 1);
+    CheckpointSignatureProto sProto = PBHelper.convert(s);
+    CheckpointSignature s1 = PBHelper.convert(sProto);
+    assertEquals(s.getBlockpoolID(), s1.getBlockpoolID());
+    assertEquals(s.getClusterID(), s1.getClusterID());
+    assertEquals(s.getCTime(), s1.getCTime());
+    assertEquals(s.getCurSegmentTxId(), s1.getCurSegmentTxId());
+    assertEquals(s.getLayoutVersion(), s1.getLayoutVersion());
+    assertEquals(s.getMostRecentCheckpointTxId(),
+        s1.getMostRecentCheckpointTxId());
+    assertEquals(s.getNamespaceID(), s1.getNamespaceID());
+  }
+  
+  private static void compare(RemoteEditLog l1, RemoteEditLog l2) {
+    assertEquals(l1.getEndTxId(), l2.getEndTxId());
+    assertEquals(l1.getStartTxId(), l2.getStartTxId());
+  }
+  
+  @Test
+  public void testConvertRemoteEditLog() {
+    RemoteEditLog l = new RemoteEditLog(1, 100);
+    RemoteEditLogProto lProto = PBHelper.convert(l);
+    RemoteEditLog l1 = PBHelper.convert(lProto);
+    compare(l, l1);
+  }
+  
+  @Test
+  public void testConvertRemoteEditLogManifest() {
+    List<RemoteEditLog> logs = new ArrayList<RemoteEditLog>();
+    logs.add(new RemoteEditLog(1, 10));
+    logs.add(new RemoteEditLog(11, 20));
+    RemoteEditLogManifest m = new RemoteEditLogManifest(logs);
+    RemoteEditLogManifestProto mProto = PBHelper.convert(m);
+    RemoteEditLogManifest m1 = PBHelper.convert(mProto);
     
+    List<RemoteEditLog> logs1 = m1.getLogs();
+    assertEquals(logs.size(), logs1.size());
+    for (int i = 0; i < logs.size(); i++) {
+      compare(logs.get(i), logs1.get(i));
+    }
+  }
+  
+  public ExtendedBlock getExtendedBlock() {
+    return new ExtendedBlock("bpid", 1, 100, 2);
+  }
+  
+  public DatanodeInfo getDNInfo() {
+    return new DatanodeInfo(new DatanodeID("node", "sid", 1, 2));
+  }
+  
+  private void compare(DatanodeInfo dn1, DatanodeInfo dn2) {
+      assertEquals(dn1.getAdminState(), dn2.getAdminState());
+      assertEquals(dn1.getBlockPoolUsed(), dn2.getBlockPoolUsed());
+      assertEquals(dn1.getBlockPoolUsedPercent(), dn2.getBlockPoolUsedPercent());
+      assertEquals(dn1.getCapacity(), dn2.getCapacity());
+      assertEquals(dn1.getDatanodeReport(), dn2.getDatanodeReport());
+      assertEquals(dn1.getDfsUsed(), dn1.getDfsUsed());
+      assertEquals(dn1.getDfsUsedPercent(), dn1.getDfsUsedPercent());
+      assertEquals(dn1.getHost(), dn2.getHost());
+      assertEquals(dn1.getHostName(), dn2.getHostName());
+      assertEquals(dn1.getInfoPort(), dn2.getInfoPort());
+      assertEquals(dn1.getIpcPort(), dn2.getIpcPort());
+      assertEquals(dn1.getLastUpdate(), dn2.getLastUpdate());
+      assertEquals(dn1.getLevel(), dn2.getLevel());
+      assertEquals(dn1.getNetworkLocation(), dn2.getNetworkLocation());
+  }
+  
+  @Test
+  public void testConvertExtendedBlock() {
+    ExtendedBlock b = getExtendedBlock();
+    ExtendedBlockProto bProto = PBHelper.convert(b);
+    ExtendedBlock b1 = PBHelper.convert(bProto);
+    assertEquals(b, b1);
+    
+    b.setBlockId(-1);
+    bProto = PBHelper.convert(b);
+    b1 = PBHelper.convert(bProto);
+    assertEquals(b, b1);
+  }
+  
+  @Test
+  public void testConvertRecoveringBlock() {
+    DatanodeInfo[] dnInfo = new DatanodeInfo[] { getDNInfo(), getDNInfo() };
+    RecoveringBlock b = new RecoveringBlock(getExtendedBlock(), dnInfo, 3);
+    RecoveringBlockProto bProto = PBHelper.convert(b);
+    RecoveringBlock b1 = PBHelper.convert(bProto);
+    assertEquals(b.getBlock(), b1.getBlock());
+    DatanodeInfo[] dnInfo1 = b1.getLocations();
+    assertEquals(dnInfo.length, dnInfo1.length);
+    for (int i=0; i < dnInfo.length; i++) {
+      compare(dnInfo[0], dnInfo1[0]);
+    }
+  }
+  
+  @Test
+  public void testConvertText() {
+    Text t = new Text("abc".getBytes());
+    String s = t.toString();
+    Text t1 = new Text(s);
+    assertEquals(t, t1);
+  }
+  
+  @Test
+  public void testBlockTokenIdentifier() {
+    Token<BlockTokenIdentifier> token = new Token<BlockTokenIdentifier>(
+        "identifier".getBytes(), "password".getBytes(), new Text("kind"),
+        new Text("service"));
+    BlockTokenIdentifierProto tokenProto = PBHelper.convert(token);
+    Token<BlockTokenIdentifier> token2 = PBHelper.convert(tokenProto);
+    assertTrue(Arrays.equals(token.getIdentifier(), token2.getIdentifier()));
+    assertTrue(Arrays.equals(token.getPassword(), token2.getPassword()));
+    assertEquals(token.getKind(), token2.getKind());
+    assertEquals(token.getService(), token2.getService());
   }
 }

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestClusterId.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestClusterId.java?rev=1211749&r1=1211748&r2=1211749&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestClusterId.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestClusterId.java Thu Dec  8 02:57:47 2011
@@ -47,7 +47,7 @@ public class TestClusterId {
   private String getClusterId(Configuration config) throws IOException {
     // see if cluster id not empty.
     Collection<URI> dirsToFormat = FSNamesystem.getNamespaceDirs(config);
-    Collection<URI> editsToFormat = new ArrayList<URI>(0);
+    Collection<URI> editsToFormat = FSNamesystem.getNamespaceEditsDirs(config);
     FSImage fsImage = new FSImage(config, dirsToFormat, editsToFormat);
     
     Iterator<StorageDirectory> sdit = 

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogJournalFailures.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogJournalFailures.java?rev=1211749&r1=1211748&r2=1211749&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogJournalFailures.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogJournalFailures.java Thu Dec  8 02:57:47 2011
@@ -21,28 +21,32 @@ import static org.junit.Assert.assertFal
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.anyInt;
 import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.atLeast;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 
+import java.io.File;
 import java.io.IOException;
 
+import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.server.namenode.JournalSet.JournalAndStream;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.verification.VerificationMode;
 
 public class TestEditLogJournalFailures {
 
   private int editsPerformed = 0;
-  private Configuration conf;
   private MiniDFSCluster cluster;
   private FileSystem fs;
   private Runtime runtime;
@@ -53,8 +57,13 @@ public class TestEditLogJournalFailures 
    */
   @Before
   public void setUpMiniCluster() throws IOException {
-    conf = new HdfsConfiguration();
-    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0).build();
+    setUpMiniCluster(new HdfsConfiguration(), true);
+  }
+  
+  public void setUpMiniCluster(Configuration conf, boolean manageNameDfsDirs)
+      throws IOException {
+    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0)
+        .manageNameDfsDirs(manageNameDfsDirs).build();
     cluster.waitActive();
     fs = cluster.getFileSystem();
     
@@ -64,11 +73,13 @@ public class TestEditLogJournalFailures 
     
     cluster.getNameNode().getFSImage().getEditLog().setRuntimeForTesting(runtime);
   }
-   
+  
   @After
   public void shutDownMiniCluster() throws IOException {
-    fs.close();
-    cluster.shutdown();
+    if (fs != null)
+      fs.close();
+    if (cluster != null)
+      cluster.shutdown();
   }
    
   @Test
@@ -109,7 +120,7 @@ public class TestEditLogJournalFailures 
     assertTrue(doAnEdit());
     // The previous edit could not be synced to any persistent storage, should
     // have halted the NN.
-    assertExitInvocations(1);
+    assertExitInvocations(atLeast(1));
   }
   
   @Test
@@ -124,6 +135,80 @@ public class TestEditLogJournalFailures 
     assertExitInvocations(0);
     assertFalse(cluster.getNameNode().isInSafeMode());
   }
+  
+  @Test
+  public void testSingleRequiredFailedEditsDirOnSetReadyToFlush()
+      throws IOException {
+    // Set one of the edits dirs to be required.
+    String[] editsDirs = cluster.getConfiguration(0).getTrimmedStrings(
+        DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY);
+    shutDownMiniCluster();
+    Configuration conf = new HdfsConfiguration();
+    conf.set(DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_REQUIRED_KEY, editsDirs[1]);
+    conf.setInt(DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_MINIMUM_KEY, 0);
+    conf.setInt(DFSConfigKeys.DFS_NAMENODE_CHECKED_VOLUMES_MINIMUM_KEY, 0);
+    setUpMiniCluster(conf, true);
+    
+    assertTrue(doAnEdit());
+    // Invalidated the one required edits journal.
+    invalidateEditsDirAtIndex(1, false, false);
+    // Make sure runtime.exit(...) hasn't been called at all yet.
+    assertExitInvocations(0);
+    
+    // This will actually return true in the tests, since the NN will not in
+    // fact call Runtime.exit();
+    doAnEdit();
+    
+    // A single failure of a required journal should result in a call to
+    // runtime.exit(...).
+    assertExitInvocations(atLeast(1));
+  }
+  
+  @Test
+  public void testMultipleRedundantFailedEditsDirOnSetReadyToFlush()
+      throws IOException {
+    // Set up 4 name/edits dirs.
+    shutDownMiniCluster();
+    Configuration conf = new HdfsConfiguration();
+    String[] nameDirs = new String[4];
+    for (int i = 0; i < nameDirs.length; i++) {
+      File nameDir = new File(System.getProperty("test.build.data"),
+          "name-dir" + i);
+      nameDir.mkdirs();
+      nameDirs[i] = nameDir.getAbsolutePath();
+    }
+    conf.set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY,
+        StringUtils.join(nameDirs, ","));
+    
+    // Keep running unless there are less than 2 edits dirs remaining.
+    conf.setInt(DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_MINIMUM_KEY, 2);
+    setUpMiniCluster(conf, false);
+    
+    // All journals active.
+    assertTrue(doAnEdit());
+    assertExitInvocations(0);
+    
+    // Invalidate 1/4 of the redundant journals.
+    invalidateEditsDirAtIndex(0, false, false);
+    assertTrue(doAnEdit());
+    assertExitInvocations(0);
+
+    // Invalidate 2/4 of the redundant journals.
+    invalidateEditsDirAtIndex(1, false, false);
+    assertTrue(doAnEdit());
+    assertExitInvocations(0);
+    
+    // Invalidate 3/4 of the redundant journals.
+    invalidateEditsDirAtIndex(2, false, false);
+    
+    // This will actually return true in the tests, since the NN will not in
+    // fact call Runtime.exit();
+    doAnEdit();
+    
+    // A failure of more than the minimum number of redundant journals should
+    // result in a call to runtime.exit(...).
+    assertExitInvocations(atLeast(1));
+  }
 
   /**
    * Replace the journal at index <code>index</code> with one that throws an
@@ -181,6 +266,17 @@ public class TestEditLogJournalFailures 
   private boolean doAnEdit() throws IOException {
     return fs.mkdirs(new Path("/tmp", Integer.toString(editsPerformed++)));
   }
+  
+  /**
+   * Make sure that Runtime.exit(...) has been called exactly
+   * <code>expectedExits<code> number of times.
+   * 
+   * @param expectedExits the exact number of times Runtime.exit(...) should
+   *                      have been called.
+   */
+  private void assertExitInvocations(int expectedExits) {
+    assertExitInvocations(times(expectedExits));
+  }
 
   /**
    * Make sure that Runtime.exit(...) has been called
@@ -188,7 +284,7 @@ public class TestEditLogJournalFailures 
    * 
    * @param expectedExits the number of times Runtime.exit(...) should have been called.
    */
-  private void assertExitInvocations(int expectedExits) {
-    verify(runtime, times(expectedExits)).exit(anyInt());
+  private void assertExitInvocations(VerificationMode expectedExits) {
+    verify(runtime, expectedExits).exit(anyInt());
   }
 }

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeResourceChecker.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeResourceChecker.java?rev=1211749&r1=1211748&r2=1211749&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeResourceChecker.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeResourceChecker.java Thu Dec  8 02:57:47 2011
@@ -19,21 +19,20 @@ package org.apache.hadoop.hdfs.server.na
 
 import java.io.File;
 import java.io.IOException;
+import java.net.URISyntaxException;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.DF;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem.NameNodeResourceMonitor;
+import org.apache.hadoop.hdfs.server.namenode.NameNodeResourceChecker.CheckedVolume;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mockito;
 
-import com.google.common.collect.Lists;
-
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
@@ -49,7 +48,7 @@ public class TestNameNodeResourceChecker
     baseDir = new File(System.getProperty("test.build.data"));
     nameDir = new File(baseDir, "resource-check-name-dir");
     nameDir.mkdirs();
-    conf.set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY, nameDir.getAbsolutePath());
+    conf.set(DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY, nameDir.getAbsolutePath());
   }
 
   /**
@@ -90,7 +89,7 @@ public class TestNameNodeResourceChecker
       throws IOException, InterruptedException {
     MiniDFSCluster cluster = null;
     try {
-      conf.set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY, nameDir.getAbsolutePath());
+      conf.set(DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY, nameDir.getAbsolutePath());
       conf.setLong(DFSConfigKeys.DFS_NAMENODE_RESOURCE_CHECK_INTERVAL_KEY, 1);
       
       cluster = new MiniDFSCluster.Builder(conf)
@@ -145,7 +144,7 @@ public class TestNameNodeResourceChecker
     File nameDir2 = new File(System.getProperty("test.build.data"), "name-dir2");
     nameDir1.mkdirs();
     nameDir2.mkdirs();
-    conf.set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY,
+    conf.set(DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY,
         nameDir1.getAbsolutePath() + "," + nameDir2.getAbsolutePath());
     conf.setLong(DFSConfigKeys.DFS_NAMENODE_DU_RESERVED_KEY, Long.MAX_VALUE);
 
@@ -164,7 +163,7 @@ public class TestNameNodeResourceChecker
     Configuration conf = new Configuration();
     File nameDir = new File(System.getProperty("test.build.data"), "name-dir");
     nameDir.mkdirs();
-    conf.set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY, nameDir.getAbsolutePath());
+    conf.set(DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY, nameDir.getAbsolutePath());
     conf.set(DFSConfigKeys.DFS_NAMENODE_CHECKED_VOLUMES_KEY, nameDir.getAbsolutePath());
     conf.setLong(DFSConfigKeys.DFS_NAMENODE_DU_RESERVED_KEY, Long.MAX_VALUE);
 
@@ -176,38 +175,70 @@ public class TestNameNodeResourceChecker
 
   /**
    * Test that the NN is considered to be out of resources only once all
-   * configured volumes are low on resources.
+   * redundant configured volumes are low on resources, or when any required
+   * volume is low on resources. 
    */
   @Test
-  public void testLowResourceVolumePolicy() throws IOException {
+  public void testLowResourceVolumePolicy() throws IOException, URISyntaxException {
     Configuration conf = new Configuration();
     File nameDir1 = new File(System.getProperty("test.build.data"), "name-dir1");
     File nameDir2 = new File(System.getProperty("test.build.data"), "name-dir2");
     nameDir1.mkdirs();
     nameDir2.mkdirs();
     
-    conf.set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY,
+    conf.set(DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY,
         nameDir1.getAbsolutePath() + "," + nameDir2.getAbsolutePath());
+    conf.setInt(DFSConfigKeys.DFS_NAMENODE_CHECKED_VOLUMES_MINIMUM_KEY, 2);
     
     NameNodeResourceChecker nnrc = new NameNodeResourceChecker(conf);
     
     // For the purpose of this test, we need to force the name dirs to appear to
     // be on different volumes.
-    Map<String, DF> volumes = new HashMap<String, DF>();
-    volumes.put("volume1", new DF(nameDir1, conf));
-    volumes.put("volume2", new DF(nameDir2, conf));
+    Map<String, CheckedVolume> volumes = new HashMap<String, CheckedVolume>();
+    CheckedVolume volume1 = Mockito.mock(CheckedVolume.class);
+    CheckedVolume volume2 = Mockito.mock(CheckedVolume.class);
+    CheckedVolume volume3 = Mockito.mock(CheckedVolume.class);
+    CheckedVolume volume4 = Mockito.mock(CheckedVolume.class);
+    CheckedVolume volume5 = Mockito.mock(CheckedVolume.class);
+    Mockito.when(volume1.isResourceAvailable()).thenReturn(true);
+    Mockito.when(volume2.isResourceAvailable()).thenReturn(true);
+    Mockito.when(volume3.isResourceAvailable()).thenReturn(true);
+    Mockito.when(volume4.isResourceAvailable()).thenReturn(true);
+    Mockito.when(volume5.isResourceAvailable()).thenReturn(true);
+    
+    // Make volumes 4 and 5 required.
+    Mockito.when(volume4.isRequired()).thenReturn(true);
+    Mockito.when(volume5.isRequired()).thenReturn(true);
+    
+    volumes.put("volume1", volume1);
+    volumes.put("volume2", volume2);
+    volumes.put("volume3", volume3);
+    volumes.put("volume4", volume4);
+    volumes.put("volume5", volume5);
     nnrc.setVolumes(volumes);
     
-    NameNodeResourceChecker spyNnrc = Mockito.spy(nnrc);
-    
-    Mockito.when(spyNnrc.getVolumesLowOnSpace()).thenReturn(
-        Lists.newArrayList("volume1"));
-    
-    assertTrue(spyNnrc.hasAvailableDiskSpace());
-    
-    Mockito.when(spyNnrc.getVolumesLowOnSpace()).thenReturn(
-        Lists.newArrayList("volume1", "volume2"));
+    // Initially all dirs have space.
+    assertTrue(nnrc.hasAvailableDiskSpace());
     
-    assertFalse(spyNnrc.hasAvailableDiskSpace());
+    // 1/3 redundant dir is low on space.
+    Mockito.when(volume1.isResourceAvailable()).thenReturn(false);
+    assertTrue(nnrc.hasAvailableDiskSpace());
+    
+    // 2/3 redundant dirs are low on space.
+    Mockito.when(volume2.isResourceAvailable()).thenReturn(false);
+    assertFalse(nnrc.hasAvailableDiskSpace());
+    
+    // Lower the minimum number of redundant volumes that must be available.
+    nnrc.setMinimumReduntdantVolumes(1);
+    assertTrue(nnrc.hasAvailableDiskSpace());
+    
+    // Just one required dir is low on space.
+    Mockito.when(volume3.isResourceAvailable()).thenReturn(false);
+    assertFalse(nnrc.hasAvailableDiskSpace());
+    
+    // Just the other required dir is low on space.
+    Mockito.when(volume3.isResourceAvailable()).thenReturn(true);
+    Mockito.when(volume4.isResourceAvailable()).thenReturn(false);
+    assertFalse(nnrc.hasAvailableDiskSpace());
   }
 }