You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by to...@apache.org on 2011/04/20 04:28:21 UTC

svn commit: r1095253 [2/4] - in /hadoop/hdfs/branches/HDFS-1073: ./ bin/ src/c++/libhdfs/ src/contrib/hdfsproxy/ src/java/ src/java/org/apache/hadoop/fs/ src/java/org/apache/hadoop/hdfs/ src/java/org/apache/hadoop/hdfs/protocol/ src/java/org/apache/had...

Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=1095253&r1=1095252&r2=1095253&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java Wed Apr 20 02:28:19 2011
@@ -18,13 +18,17 @@
 package org.apache.hadoop.hdfs.server.datanode;
 
 
+import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status.ERROR_ACCESS_TOKEN;
+import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status.SUCCESS;
 import static org.apache.hadoop.hdfs.server.common.Util.now;
 
 import java.io.BufferedOutputStream;
+import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.File;
 import java.io.IOException;
 import java.io.OutputStream;
+import java.lang.management.ManagementFactory;
 import java.net.InetSocketAddress;
 import java.net.ServerSocket;
 import java.net.Socket;
@@ -49,6 +53,9 @@ import java.util.Random;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -66,23 +73,24 @@ import org.apache.hadoop.hdfs.protocol.B
 import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
 import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
 import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
+import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.BlockConstructionStage;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
 import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
-import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.BlockConstructionStage;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
 import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
+import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants;
+import org.apache.hadoop.hdfs.server.common.HdfsConstants.ReplicaState;
+import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption;
 import org.apache.hadoop.hdfs.server.common.IncorrectVersionException;
 import org.apache.hadoop.hdfs.server.common.JspHelper;
 import org.apache.hadoop.hdfs.server.common.Storage;
 import org.apache.hadoop.hdfs.server.common.Util;
-import org.apache.hadoop.hdfs.server.common.HdfsConstants.ReplicaState;
-import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption;
 import org.apache.hadoop.hdfs.server.datanode.FSDataset.VolumeInfo;
 import org.apache.hadoop.hdfs.server.datanode.SecureDataNodeStarter.SecureResources;
 import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
@@ -92,6 +100,7 @@ import org.apache.hadoop.hdfs.server.nam
 import org.apache.hadoop.hdfs.server.namenode.StreamFile;
 import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
 import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand;
+import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@@ -101,7 +110,6 @@ import org.apache.hadoop.hdfs.server.pro
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
 import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
 import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
-import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
 import org.apache.hadoop.http.HttpServer;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.ipc.ProtocolSignature;
@@ -117,20 +125,15 @@ import org.apache.hadoop.security.token.
 import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.util.Daemon;
 import org.apache.hadoop.util.DiskChecker;
+import org.apache.hadoop.util.DiskChecker.DiskErrorException;
+import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
 import org.apache.hadoop.util.GenericOptionsParser;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.ServicePlugin;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.VersionInfo;
-import org.apache.hadoop.util.DiskChecker.DiskErrorException;
-import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
 import org.mortbay.util.ajax.JSON;
 
-import java.lang.management.ManagementFactory;  
-
-import javax.management.MBeanServer; 
-import javax.management.ObjectName;
-
 /**********************************************************
  * DataNode is a class (and program) that stores a set of
  * blocks for a DFS deployment.  A single deployment can
@@ -1218,7 +1221,7 @@ public class DataNode extends Configured
       }
 
       new Daemon(new DataTransfer(xferTargets, block,
-          BlockConstructionStage.PIPELINE_SETUP_CREATE)).start();
+          BlockConstructionStage.PIPELINE_SETUP_CREATE, "")).start();
     }
   }
 
@@ -1347,16 +1350,25 @@ public class DataNode extends Configured
     final DatanodeInfo[] targets;
     final Block b;
     final BlockConstructionStage stage;
+    final String clientname;
 
     /**
      * Connect to the first item in the target list.  Pass along the 
      * entire target list, the block, and the data.
      */
-    DataTransfer(DatanodeInfo targets[], Block b, BlockConstructionStage stage
-        ) throws IOException {
+    DataTransfer(DatanodeInfo targets[], Block b, BlockConstructionStage stage,
+        final String clientname) throws IOException {
+      if (DataTransferProtocol.LOG.isDebugEnabled()) {
+        DataTransferProtocol.LOG.debug(getClass().getSimpleName() + ": "
+            + b + " (numBytes=" + b.getNumBytes() + ")"
+            + ", stage=" + stage
+            + ", clientname=" + clientname
+            + ", targests=" + Arrays.asList(targets));
+      }
       this.targets = targets;
       this.b = b;
       this.stage = stage;
+      this.clientname = clientname;
     }
 
     /**
@@ -1366,7 +1378,9 @@ public class DataNode extends Configured
       xmitsInProgress.getAndIncrement();
       Socket sock = null;
       DataOutputStream out = null;
+      DataInputStream in = null;
       BlockSender blockSender = null;
+      final boolean isClient = clientname.length() > 0;
       
       try {
         InetSocketAddress curTarget = 
@@ -1380,7 +1394,6 @@ public class DataNode extends Configured
         OutputStream baseStream = NetUtils.getOutputStream(sock, writeTimeout);
         out = new DataOutputStream(new BufferedOutputStream(baseStream, 
                                                             SMALL_BUFFER_SIZE));
-
         blockSender = new BlockSender(b, 0, b.getNumBytes(), 
             false, false, false, DataNode.this);
         DatanodeInfo srcNode = new DatanodeInfo(dnRegistration);
@@ -1395,14 +1408,33 @@ public class DataNode extends Configured
         }
 
         DataTransferProtocol.Sender.opWriteBlock(out,
-            b, 0, stage, 0, 0, 0, "", srcNode, targets, accessToken);
+            b, 0, stage, 0, 0, 0, clientname, srcNode, targets, accessToken);
 
         // send data & checksum
         blockSender.sendBlock(out, baseStream, null);
 
         // no response necessary
-        LOG.info(dnRegistration + ":Transmitted block " + b + " to " + curTarget);
+        LOG.info(getClass().getSimpleName() + ": Transmitted " + b
+            + " (numBytes=" + b.getNumBytes() + ") to " + curTarget);
 
+        // read ack
+        if (isClient) {
+          in = new DataInputStream(NetUtils.getInputStream(sock));
+          final DataTransferProtocol.Status s = DataTransferProtocol.Status.read(in);
+          if (LOG.isDebugEnabled()) {
+            LOG.debug(getClass().getSimpleName() + ": close-ack=" + s);
+          }
+          if (s != SUCCESS) {
+            if (s == ERROR_ACCESS_TOKEN) {
+              throw new InvalidBlockTokenException(
+                  "Got access token error for connect ack, targets="
+                   + Arrays.asList(targets));
+            } else {
+              throw new IOException("Bad connect ack, targets="
+                  + Arrays.asList(targets));
+            }
+          }
+        }
       } catch (IOException ie) {
         LOG.warn(dnRegistration + ":Failed to transfer " + b + " to " + targets[0].getName()
             + " got " + StringUtils.stringifyException(ie));
@@ -1413,6 +1445,7 @@ public class DataNode extends Configured
         xmitsInProgress.getAndDecrement();
         IOUtils.closeStream(blockSender);
         IOUtils.closeStream(out);
+        IOUtils.closeStream(in);
         IOUtils.closeSocket(sock);
       }
     }
@@ -1977,48 +2010,46 @@ public class DataNode extends Configured
   }
 
   /**
-   * Transfer a block to the datanode targets.
-   * @return rbw's visible length
-   */
-  long transferBlockForPipelineRecovery(final Block b,
-      final DatanodeInfo[] targets) throws IOException {
-    checkWriteAccess(b);
-    final Block stored;
-    final boolean isRbw;
+   * Transfer a replica to the datanode targets.
+   * @param b the block to transfer.
+   *          The corresponding replica must be an RBW or a Finalized.
+   *          Its GS and numBytes will be set to
+   *          the stored GS and the visible length. 
+   * @param targets
+   * @param client
+   */
+  void transferReplicaForPipelineRecovery(final Block b,
+      final DatanodeInfo[] targets, final String client) throws IOException {
+    final long storedGS;
     final long visible;
+    final BlockConstructionStage stage;
 
     //get replica information
     synchronized(data) {
-      stored = data.getStoredBlock(b.getBlockId());
-      if (stored.getGenerationStamp() < b.getGenerationStamp()) {
+      if (data.isValidRbw(b)) {
+        stage = BlockConstructionStage.TRANSFER_RBW;
+      } else if (data.isValidBlock(b)) {
+        stage = BlockConstructionStage.TRANSFER_FINALIZED;
+      } else {
+        final String r = data.getReplicaString(b.getBlockId());
+        throw new IOException(b + " is neither a RBW nor a Finalized, r=" + r);
+      }
+
+      storedGS = data.getStoredBlock(b.getBlockId()).getGenerationStamp();
+      if (storedGS < b.getGenerationStamp()) {
         throw new IOException(
-            "stored.getGenerationStamp() < b.getGenerationStamp(), stored="
-            + stored + ", b=" + b);        
+            storedGS + " = storedGS < b.getGenerationStamp(), b=" + b);        
       }
-      isRbw = data.isValidRbw(b);
       visible = data.getReplicaVisibleLength(b);
     }
 
+    //set storedGS and visible length
+    b.setGenerationStamp(storedGS);
+    b.setNumBytes(visible);
+
     if (targets.length > 0) {
-      if (isRbw) {
-        //transfer rbw
-        new DataTransfer(targets, b, BlockConstructionStage.TRANSFER_RBW).run();
-      } else {
-        //transfer finalized replica
-        transferBlock(stored, targets);
-      }
+      new DataTransfer(targets, b, stage, client).run();
     }
-    //TODO: should return: visible + storedGS + isRbw
-    return visible;
-  }
-
-  /**
-   * Covert an existing temporary replica to a rbw. 
-   * @param temporary specifies id, gs and visible bytes.
-   * @throws IOException
-   */
-  void convertTemporaryToRbw(final Block temporary) throws IOException {
-    data.convertTemporaryToRbw(temporary);
   }
 
   // Determine a Datanode's streaming address

Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java?rev=1095253&r1=1095252&r2=1095253&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java Wed Apr 20 02:28:19 2011
@@ -33,6 +33,7 @@ import java.io.OutputStream;
 import java.net.InetSocketAddress;
 import java.net.Socket;
 import java.net.SocketException;
+import java.util.Arrays;
 
 import org.apache.commons.logging.Log;
 import org.apache.hadoop.hdfs.protocol.Block;
@@ -153,24 +154,9 @@ class DataXceiver extends DataTransferPr
         datanode.socketWriteTimeout);
     DataOutputStream out = new DataOutputStream(
                  new BufferedOutputStream(baseStream, SMALL_BUFFER_SIZE));
-
-    if (datanode.isBlockTokenEnabled) {
-      try {
-        datanode.blockTokenSecretManager.checkAccess(blockToken, null, block,
-            BlockTokenSecretManager.AccessMode.READ);
-      } catch (InvalidToken e) {
-        try {
-          ERROR_ACCESS_TOKEN.write(out);
-          out.flush();
-          LOG.warn("Block token verification failed, for client "
-              + remoteAddress + " for OP_READ_BLOCK for block " + block + " : "
-              + e.getLocalizedMessage());
-          throw e;
-        } finally {
-          IOUtils.closeStream(out);
-        }
-      }
-    }
+    checkAccess(out, true, block, blockToken,
+        DataTransferProtocol.Op.READ_BLOCK,
+        BlockTokenSecretManager.AccessMode.READ);
   
     // send the block
     BlockSender blockSender = null;
@@ -245,8 +231,25 @@ class DataXceiver extends DataTransferPr
     updateCurrentThreadName("Receiving block " + block + " client=" + clientname);
     final boolean isDatanode = clientname.length() == 0;
     final boolean isClient = !isDatanode;
+    final boolean isTransfer = stage == BlockConstructionStage.TRANSFER_RBW
+        || stage == BlockConstructionStage.TRANSFER_FINALIZED;
+
+    // check single target for transfer-RBW/Finalized 
+    if (isTransfer && targets.length > 0) {
+      throw new IOException(stage + " does not support multiple targets "
+          + Arrays.asList(targets));
+    }
 
     if (LOG.isDebugEnabled()) {
+      LOG.debug("opWriteBlock: stage=" + stage + ", clientname=" + clientname 
+      		+ "\n  block  =" + block + ", newGs=" + newGs
+      		+ ", bytesRcvd=[" + minBytesRcvd + ", " + maxBytesRcvd + "]"
+          + "\n  targets=" + Arrays.asList(targets)
+          + "; pipelineSize=" + pipelineSize + ", srcDataNode=" + srcDataNode
+          );
+      LOG.debug("isDatanode=" + isDatanode
+          + ", isClient=" + isClient
+          + ", isTransfer=" + isTransfer);
       LOG.debug("writeBlock receive buf size " + s.getReceiveBufferSize() +
                 " tcp no delay " + s.getTcpNoDelay());
     }
@@ -261,30 +264,14 @@ class DataXceiver extends DataTransferPr
              " src: " + remoteAddress +
              " dest: " + localAddress);
 
-    DataOutputStream replyOut = null;   // stream to prev target
-    replyOut = new DataOutputStream(new BufferedOutputStream(
-                   NetUtils.getOutputStream(s, datanode.socketWriteTimeout),
-                   SMALL_BUFFER_SIZE));
-    if (datanode.isBlockTokenEnabled) {
-      try {
-        datanode.blockTokenSecretManager.checkAccess(blockToken, null, block,
-            BlockTokenSecretManager.AccessMode.WRITE);
-      } catch (InvalidToken e) {
-        try {
-          if (isClient) {
-            ERROR_ACCESS_TOKEN.write(replyOut);
-            Text.writeString(replyOut, datanode.dnRegistration.getName());
-            replyOut.flush();
-          }
-          LOG.warn("Block token verification failed, for client "
-              + remoteAddress + " for OP_WRITE_BLOCK for block " + block
-              + " : " + e.getLocalizedMessage());
-          throw e;
-        } finally {
-          IOUtils.closeStream(replyOut);
-        }
-      }
-    }
+    // reply to upstream datanode or client 
+    final DataOutputStream replyOut = new DataOutputStream(
+        new BufferedOutputStream(
+            NetUtils.getOutputStream(s, datanode.socketWriteTimeout),
+            SMALL_BUFFER_SIZE));
+    checkAccess(replyOut, isClient, block, blockToken,
+        DataTransferProtocol.Op.WRITE_BLOCK,
+        BlockTokenSecretManager.AccessMode.WRITE);
 
     DataOutputStream mirrorOut = null;  // stream to next target
     DataInputStream mirrorIn = null;    // reply from next target
@@ -307,8 +294,7 @@ class DataXceiver extends DataTransferPr
       }
 
       //
-      // Open network conn to backup machine, if 
-      // appropriate
+      // Connect to downstream machine, if appropriate
       //
       if (targets.length > 0) {
         InetSocketAddress mirrorTarget = null;
@@ -330,7 +316,6 @@ class DataXceiver extends DataTransferPr
                          SMALL_BUFFER_SIZE));
           mirrorIn = new DataInputStream(NetUtils.getInputStream(mirrorSock));
 
-          // Write header: Copied from DFSClient.java!
           DataTransferProtocol.Sender.opWriteBlock(mirrorOut, originalBlock,
               pipelineSize, stage, newGs, minBytesRcvd, maxBytesRcvd, clientname,
               srcDataNode, targets, blockToken);
@@ -375,8 +360,8 @@ class DataXceiver extends DataTransferPr
         }
       }
 
-      // send connect ack back to source (only for clients)
-      if (isClient) {
+      // send connect-ack to source for clients and not transfer-RBW/Finalized
+      if (isClient && !isTransfer) {
         if (LOG.isDebugEnabled() || mirrorInStatus != SUCCESS) {
           LOG.info("Datanode " + targets.length +
                    " forwarding connect ack to upstream firstbadlink is " +
@@ -391,7 +376,15 @@ class DataXceiver extends DataTransferPr
       if (blockReceiver != null) {
         String mirrorAddr = (mirrorSock == null) ? null : mirrorNode;
         blockReceiver.receiveBlock(mirrorOut, mirrorIn, replyOut,
-            mirrorAddr, null, targets.length);
+            mirrorAddr, null, targets);
+
+        // send close-ack for transfer-RBW/Finalized 
+        if (isTransfer) {
+          if (LOG.isTraceEnabled()) {
+            LOG.trace("TRANSFER: send close-ack");
+          }
+          SUCCESS.write(replyOut);
+        }
       }
 
       // update its generation stamp
@@ -404,8 +397,7 @@ class DataXceiver extends DataTransferPr
       // if this write is for a replication request or recovering
       // a failed close for client, then confirm block. For other client-writes,
       // the block is finalized in the PacketResponder.
-      if ((isDatanode  && stage != BlockConstructionStage.TRANSFER_RBW)
-          ||
+      if (isDatanode ||
           stage == BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) {
         datanode.closeBlock(block, DataNode.EMPTY_DEL_HINT);
         LOG.info("Received block " + block + 
@@ -433,32 +425,38 @@ class DataXceiver extends DataTransferPr
                   datanode.myMetrics.writesFromRemoteClient);
   }
 
+  @Override
+  protected void opTransferBlock(final DataInputStream in,
+      final Block blk, final String client,
+      final DatanodeInfo[] targets,
+      final Token<BlockTokenIdentifier> blockToken) throws IOException {
+    checkAccess(null, true, blk, blockToken,
+        DataTransferProtocol.Op.TRANSFER_BLOCK,
+        BlockTokenSecretManager.AccessMode.COPY);
+
+    updateCurrentThreadName(DataTransferProtocol.Op.TRANSFER_BLOCK + " " + blk);
+
+    final DataOutputStream out = new DataOutputStream(
+        NetUtils.getOutputStream(s, datanode.socketWriteTimeout));
+    try {
+      datanode.transferReplicaForPipelineRecovery(blk, targets, client);
+      SUCCESS.write(out);
+    } finally {
+      IOUtils.closeStream(out);
+    }
+  }
+
   /**
    * Get block checksum (MD5 of CRC32).
    */
   @Override
   protected void opBlockChecksum(DataInputStream in, Block block,
       Token<BlockTokenIdentifier> blockToken) throws IOException {
-    DataOutputStream out = new DataOutputStream(NetUtils.getOutputStream(s,
-        datanode.socketWriteTimeout));
-    if (datanode.isBlockTokenEnabled) {
-      try {
-        datanode.blockTokenSecretManager.checkAccess(blockToken, null, block,
-            BlockTokenSecretManager.AccessMode.READ);
-      } catch (InvalidToken e) {
-        try {
-          ERROR_ACCESS_TOKEN.write(out);
-          out.flush();
-          LOG.warn("Block token verification failed, for client "
-              + remoteAddress + " for OP_BLOCK_CHECKSUM for block " + block
-              + " : " + e.getLocalizedMessage());
-          throw e;
-        } finally {
-          IOUtils.closeStream(out);
-        }
-
-      }
-    }
+    final DataOutputStream out = new DataOutputStream(
+        NetUtils.getOutputStream(s, datanode.socketWriteTimeout));
+    checkAccess(out, true, block, blockToken,
+        DataTransferProtocol.Op.BLOCK_CHECKSUM,
+        BlockTokenSecretManager.AccessMode.READ);
 
     updateCurrentThreadName("Reading metadata for block " + block);
     final MetaDataInputStream metadataIn = 
@@ -649,7 +647,7 @@ class DataXceiver extends DataTransferPr
 
       // receive a block
       blockReceiver.receiveBlock(null, null, null, null, 
-          dataXceiverServer.balanceThrottler, -1);
+          dataXceiverServer.balanceThrottler, null);
                     
       // notify name node
       datanode.notifyNamenodeReceivedBlock(block, sourceID);
@@ -713,4 +711,36 @@ class DataXceiver extends DataTransferPr
       IOUtils.closeStream(reply);
     }
   }
+
+  private void checkAccess(DataOutputStream out, final boolean reply, 
+      final Block blk,
+      final Token<BlockTokenIdentifier> t,
+      final DataTransferProtocol.Op op,
+      final BlockTokenSecretManager.AccessMode mode) throws IOException {
+    if (datanode.isBlockTokenEnabled) {
+      try {
+        datanode.blockTokenSecretManager.checkAccess(t, null, blk, mode);
+      } catch(InvalidToken e) {
+        try {
+          if (reply) {
+            if (out == null) {
+              out = new DataOutputStream(
+                  NetUtils.getOutputStream(s, datanode.socketWriteTimeout));
+            }
+            ERROR_ACCESS_TOKEN.write(out);
+            if (mode == BlockTokenSecretManager.AccessMode.WRITE) {
+              Text.writeString(out, datanode.dnRegistration.getName());
+            }
+            out.flush();
+          }
+          LOG.warn("Block token verification failed: op=" + op
+              + ", remoteAddress=" + remoteAddress
+              + ", message=" + e.getLocalizedMessage());
+          throw e;
+        } finally {
+          IOUtils.closeStream(out);
+        }
+      }
+    }
+  }
 }

Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java?rev=1095253&r1=1095252&r2=1095253&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java Wed Apr 20 02:28:19 2011
@@ -530,7 +530,6 @@ public class FSDataset implements FSCons
     
   static class FSVolumeSet {
     FSVolume[] volumes = null;
-    int curVolume = 0;
     BlockVolumeChoosingPolicy blockChooser;
       
     FSVolumeSet(FSVolume[] volumes, BlockVolumeChoosingPolicy blockChooser) {
@@ -1351,39 +1350,49 @@ public class FSDataset implements FSCons
     final long blockId = b.getBlockId();
     final long expectedGs = b.getGenerationStamp();
     final long visible = b.getNumBytes();
-    DataNode.LOG.info("Covert the temporary replica " + b
-        + " to RBW, visible length is " + visible);
+    DataNode.LOG.info("Convert replica " + b
+        + " from Temporary to RBW, visible length=" + visible);
 
-    // get replica
-    final ReplicaInfo r = volumeMap.get(blockId);
-    if (r == null) {
-      throw new ReplicaNotFoundException(
-          ReplicaNotFoundException.NON_EXISTENT_REPLICA + b);
-    }
-    // check the replica's state
-    if (r.getState() != ReplicaState.TEMPORARY) {
-      throw new ReplicaNotFoundException(
-          "r.getState() != ReplicaState.TEMPORARY, r=" + r);
+    final ReplicaInPipeline temp;
+    {
+      // get replica
+      final ReplicaInfo r = volumeMap.get(blockId);
+      if (r == null) {
+        throw new ReplicaNotFoundException(
+            ReplicaNotFoundException.NON_EXISTENT_REPLICA + b);
+      }
+      // check the replica's state
+      if (r.getState() != ReplicaState.TEMPORARY) {
+        throw new ReplicaAlreadyExistsException(
+            "r.getState() != ReplicaState.TEMPORARY, r=" + r);
+      }
+      temp = (ReplicaInPipeline)r;
     }
     // check generation stamp
-    if (r.getGenerationStamp() != expectedGs) {
-      throw new ReplicaNotFoundException(
-          "r.getGenerationStamp() != expectedGs = " + expectedGs + ", r=" + r);
+    if (temp.getGenerationStamp() != expectedGs) {
+      throw new ReplicaAlreadyExistsException(
+          "temp.getGenerationStamp() != expectedGs = " + expectedGs
+          + ", temp=" + temp);
     }
+
+    // TODO: check writer?
+    // set writer to the current thread
+    // temp.setWriter(Thread.currentThread());
+
     // check length
-    final long numBytes = r.getNumBytes();
+    final long numBytes = temp.getNumBytes();
     if (numBytes < visible) {
-      throw new ReplicaNotFoundException(numBytes + " = numBytes < visible = "
-          + visible + ", r=" + r);
+      throw new IOException(numBytes + " = numBytes < visible = "
+          + visible + ", temp=" + temp);
     }
     // check volume
-    final FSVolume v = r.getVolume();
+    final FSVolume v = temp.getVolume();
     if (v == null) {
-      throw new IOException("r.getVolume() = null, temp="  + r);
+      throw new IOException("r.getVolume() = null, temp="  + temp);
     }
     
     // move block files to the rbw directory
-    final File dest = moveBlockFiles(b, r.getBlockFile(), v.rbwDir);
+    final File dest = moveBlockFiles(b, temp.getBlockFile(), v.rbwDir);
     // create RBW
     final ReplicaBeingWritten rbw = new ReplicaBeingWritten(
         blockId, numBytes, expectedGs,
@@ -2024,6 +2033,12 @@ public class FSDataset implements FSCons
     return volumeMap.get(blockId);
   }
 
+  @Override 
+  public synchronized String getReplicaString(long blockId) {
+    final Replica r = volumeMap.get(blockId);
+    return r == null? "null": r.toString();
+  }
+
   @Override // FSDatasetInterface
   public synchronized ReplicaRecoveryInfo initReplicaRecovery(
       RecoveringBlock rBlock) throws IOException {

Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java?rev=1095253&r1=1095252&r2=1095253&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java Wed Apr 20 02:28:19 2011
@@ -105,6 +105,11 @@ public interface FSDatasetInterface exte
   public Replica getReplica(long blockId);
 
   /**
+   * @return replica meta information
+   */
+  public String getReplicaString(long blockId);
+
+  /**
    * @return the generation stamp stored with the block.
    */
   public Block getStoredBlock(long blkid) throws IOException;

Propchange: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Apr 20 02:28:19 2011
@@ -4,3 +4,4 @@
 /hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java:776175-785643,785929-786278
 /hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java:796829-820463
 /hadoop/hdfs/branches/branch-0.21/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java:820487
+/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java:1086482-1095244

Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/BackupImage.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/BackupImage.java?rev=1095253&r1=1095252&r2=1095253&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/BackupImage.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/BackupImage.java Wed Apr 20 02:28:19 2011
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hdfs.server.namenode;
 
+import java.io.BufferedInputStream;
 import java.io.DataInputStream;
 import java.io.File;
 import java.io.IOException;
@@ -24,6 +25,8 @@ import java.net.URI;
 import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
+import java.util.zip.CheckedInputStream;
+import java.util.zip.Checksum;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants;
@@ -232,9 +235,16 @@ public class BackupImage extends FSImage
           // update NameSpace in memory
           backupInputStream.setBytes(data);
           FSEditLogLoader logLoader = new FSEditLogLoader(namesystem);
-          logLoader.loadEditRecords(storage.getLayoutVersion(),
-              backupInputStream.getDataInputStream(), true,
-              lastAppliedTxId + 1);
+          int logVersion = storage.getLayoutVersion();
+          BufferedInputStream bin = new BufferedInputStream(backupInputStream);
+          DataInputStream in = new DataInputStream(bin);
+          Checksum checksum = null;
+          if (logVersion <= -28) { // support fsedits checksum
+            checksum = FSEditLog.getChecksum();
+            in = new DataInputStream(new CheckedInputStream(bin, checksum));
+          }
+          logLoader.loadEditRecords(logVersion, in, checksum, true,
+                                    lastAppliedTxId + 1);
           getFSNamesystem().dir.updateCountForINodeWithQuota(); // inefficient!
           break;
         case INPROGRESS:
@@ -353,17 +363,25 @@ public class BackupImage extends FSImage
     if(jSpoolFile.exists()) {
       // load edits.new
       EditLogFileInputStream edits = new EditLogFileInputStream(jSpoolFile);
-      DataInputStream in = edits.getDataInputStream();
+      BufferedInputStream bin = new BufferedInputStream(edits);
+      DataInputStream in = new DataInputStream(bin);
       FSEditLogLoader logLoader = new FSEditLogLoader(namesystem);
-      int loaded = logLoader.loadFSEdits(in, false, lastAppliedTxId + 1);
+      int logVersion = logLoader.readLogVersion(in);
+      Checksum checksum = null;
+      if (logVersion <= -28) { // support fsedits checksum
+        checksum = FSEditLog.getChecksum();
+        in = new DataInputStream(new CheckedInputStream(bin, checksum));
+      }
+      int loaded = logLoader.loadEditRecords(logVersion, in, checksum, false,
+          lastAppliedTxId + 1);
 
       lastAppliedTxId += loaded;
       numEdits += loaded;
 
       // first time reached the end of spool
       jsState = JSpoolState.WAIT;
-      loaded = logLoader.loadEditRecords(storage.getLayoutVersion(),
-                                         in, true, lastAppliedTxId + 1);
+      loaded = logLoader.loadEditRecords(logVersion, in, checksum,
+                                         true, lastAppliedTxId + 1);
       numEdits += loaded;
       lastAppliedTxId += loaded;
 

Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java?rev=1095253&r1=1095252&r2=1095253&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java Wed Apr 20 02:28:19 2011
@@ -24,6 +24,7 @@ import java.io.IOException;
 import java.io.RandomAccessFile;
 import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
+import java.util.zip.Checksum;
 
 import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.io.DataOutputBuffer;
@@ -91,11 +92,19 @@ class EditLogFileOutputStream extends Ed
    * */
   @Override
   void write(byte op, long txid, Writable... writables) throws IOException {
+    int start = bufCurrent.getLength();
     write(op);
     bufCurrent.writeLong(txid);
     for (Writable w : writables) {
       w.write(bufCurrent);
     }
+    // write transaction checksum
+    int end = bufCurrent.getLength();
+    Checksum checksum = FSEditLog.getChecksum();
+    checksum.reset();
+    checksum.update(bufCurrent.getData(), start, end-start);
+    int sum = (int)checksum.getValue();
+    bufCurrent.writeInt(sum);
   }
 
   /**

Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java?rev=1095253&r1=1095252&r2=1095253&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java Wed Apr 20 02:28:19 2011
@@ -1285,9 +1285,10 @@ class FSDirectory implements Closeable {
    * Check whether the path specifies a directory
    */
   boolean isDir(String src) throws UnresolvedLinkException {
+    src = normalizePath(src);
     readLock();
     try {
-      INode node = rootDir.getNode(normalizePath(src), false);
+      INode node = rootDir.getNode(src, false);
       return node != null && node.isDirectory();
     } finally {
       readUnlock();
@@ -1385,6 +1386,12 @@ class FSDirectory implements Closeable {
   /** Return the name of the path represented by inodes at [0, pos] */
   private static String getFullPathName(INode[] inodes, int pos) {
     StringBuilder fullPathName = new StringBuilder();
+    if (inodes[0].isRoot()) {
+      if (pos == 0) return Path.SEPARATOR;
+    } else {
+      fullPathName.append(inodes[0].getLocalName());
+    }
+    
     for (int i=1; i<=pos; i++) {
       fullPathName.append(Path.SEPARATOR_CHAR).append(inodes[i].getLocalName());
     }
@@ -2018,7 +2025,7 @@ class FSDirectory implements Closeable {
         return null;
       }
     }
-    final String userName = UserGroupInformation.getCurrentUser().getUserName();
+    final String userName = dirPerms.getUserName();
     INodeSymlink newNode = unprotectedSymlink(path, target, modTime, modTime,
       new PermissionStatus(userName, null, FsPermission.getDefault()));         
     if (newNode == null) {

Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java?rev=1095253&r1=1095252&r2=1095253&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java Wed Apr 20 02:28:19 2011
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
+import java.util.zip.Checksum;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -48,6 +49,7 @@ import org.apache.hadoop.io.BytesWritabl
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.security.token.delegation.DelegationKey;
+import org.apache.hadoop.util.PureJavaCrc32;
 
 import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.*;
 
@@ -91,6 +93,18 @@ public class FSEditLog implements NNStor
 
   private NNStorage storage;
 
+  private static ThreadLocal<Checksum> localChecksum =
+    new ThreadLocal<Checksum>() {
+    protected Checksum initialValue() {
+      return new PureJavaCrc32();
+    }
+  };
+
+  /** Get a thread local checksum */
+  public static Checksum getChecksum() {
+    return localChecksum.get();
+  }
+
   private static class TransactionId {
     public long txid;
 

Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java?rev=1095253&r1=1095252&r2=1095253&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java Wed Apr 20 02:28:19 2011
@@ -17,12 +17,16 @@
  */
 package org.apache.hadoop.hdfs.server.namenode;
 
+import java.io.BufferedInputStream;
 import java.io.DataInput;
 import java.io.DataInputStream;
 import java.io.DataOutput;
 import java.io.EOFException;
 import java.io.IOException;
+import java.util.zip.CheckedInputStream;
+import java.util.zip.Checksum;
 
+import org.apache.hadoop.fs.ChecksumException;
 import org.apache.hadoop.fs.Options.Rename;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.permission.PermissionStatus;
@@ -55,45 +59,65 @@ public class FSEditLogLoader {
    */
   int loadFSEdits(EditLogInputStream edits, long expectedStartingTxId)
   throws IOException {
-    DataInputStream in = edits.getDataInputStream();
     long startTime = now();
-    int numEdits = loadFSEdits(in, true, expectedStartingTxId);
+    int numEdits = loadFSEdits(edits, true, expectedStartingTxId);
     FSImage.LOG.info("Edits file " + edits.getName() 
         + " of size " + edits.length() + " edits # " + numEdits 
         + " loaded in " + (now()-startTime)/1000 + " seconds.");
     return numEdits;
   }
 
-  int loadFSEdits(DataInputStream in, boolean closeOnExit,
+  /**
+   * Read the header of fsedit log
+   * @param in fsedit stream
+   * @return the edit log version number
+   * @throws IOException if error occurs
+   */
+  int readLogVersion(DataInputStream in) throws IOException {
+    int logVersion = 0;
+    // Read log file version. Could be missing. 
+    in.mark(4);
+    // If edits log is greater than 2G, available method will return negative
+    // numbers, so we avoid having to call available
+    boolean available = true;
+    try {
+      logVersion = in.readByte();
+    } catch (EOFException e) {
+      available = false;
+    }
+    if (available) {
+      in.reset();
+      logVersion = in.readInt();
+      if (logVersion < FSConstants.LAYOUT_VERSION) // future version
+        throw new IOException(
+            "Unexpected version of the file system log file: "
+            + logVersion + ". Current version = " 
+            + FSConstants.LAYOUT_VERSION + ".");
+    }
+    assert logVersion <= Storage.LAST_UPGRADABLE_LAYOUT_VERSION :
+      "Unsupported version " + logVersion;
+    return logVersion;
+  }
+  
+  int loadFSEdits(EditLogInputStream edits, boolean closeOnExit,
       long expectedStartingTxId)
   throws IOException {
+    BufferedInputStream bin = new BufferedInputStream(edits);
+    DataInputStream in = new DataInputStream(bin);
+    
     int numEdits = 0;
     int logVersion = 0;
 
     try {
-      // Read log file version. Could be missing. 
-      in.mark(4);
-      // If edits log is greater than 2G, available method will return negative
-      // numbers, so we avoid having to call available
-      boolean available = true;
-      try {
-        logVersion = in.readByte();
-      } catch (EOFException e) {
-        available = false;
-      }
-      if (available) {
-        in.reset();
-        logVersion = in.readInt();
-        if (logVersion < FSConstants.LAYOUT_VERSION) // future version
-          throw new IOException(
-                          "Unexpected version of the file system log file: "
-                          + logVersion + ". Current version = " 
-                          + FSConstants.LAYOUT_VERSION + ".");
+      logVersion = readLogVersion(in);
+      Checksum checksum = null;
+      if (logVersion <= -28) { // support fsedits checksum
+        checksum = FSEditLog.getChecksum();
+        in = new DataInputStream(new CheckedInputStream(bin, checksum));
       }
-      assert logVersion <= Storage.LAST_UPGRADABLE_LAYOUT_VERSION :
-                            "Unsupported version " + logVersion;
-      
-      numEdits = loadEditRecords(logVersion, in, false, expectedStartingTxId);
+
+      numEdits = loadEditRecords(logVersion, in, checksum, false,
+          expectedStartingTxId);
     } finally {
       if(closeOnExit)
         in.close();
@@ -104,7 +128,9 @@ public class FSEditLogLoader {
 
   @SuppressWarnings("deprecation")
   int loadEditRecords(int logVersion, DataInputStream in,
-      boolean closeOnExit, long expectedStartingTxId) throws IOException {
+                      Checksum checksum, boolean closeOnExit,
+                      long expectedStartingTxId)
+      throws IOException {
     FSDirectory fsDir = fsNamesys.dir;
     int numEdits = 0;
     String clientName = null;
@@ -128,6 +154,9 @@ public class FSEditLogLoader {
         long blockSize = 0;
         FSEditLogOpCodes opCode;
         try {
+          if (checksum != null) {
+            checksum.reset();
+          }
           in.mark(1);
           byte opCodeByte = in.readByte();
           opCode = FSEditLogOpCodes.fromByte(opCodeByte);
@@ -139,7 +168,7 @@ public class FSEditLogLoader {
           break; // no more transactions
         }
 
-        if (logVersion <= -28) {
+        if (logVersion <= -31) {
           // Read the txid
           long thisTxId = in.readLong();
           if (thisTxId != txId + 1) {
@@ -496,6 +525,7 @@ public class FSEditLogLoader {
           throw new IOException("Never seen opCode " + opCode);
         }
         }
+        validateChecksum(in, checksum, numEdits);
       }
     } finally {
       if(closeOnExit)
@@ -521,6 +551,22 @@ public class FSEditLogLoader {
     return numEdits;
   }
 
+  /**
+   * Validate a transaction's checksum
+   */
+  private static void validateChecksum(
+      DataInputStream in, Checksum checksum, int tid)
+  throws IOException {
+    if (checksum != null) {
+      int calculatedChecksum = (int)checksum.getValue();
+      int readChecksum = in.readInt(); // read in checksum
+      if (readChecksum != calculatedChecksum) {
+        throw new ChecksumException(
+            "Transaction " + tid + " is corrupt. Calculated checksum is " +
+            calculatedChecksum + " but read checksum " + readChecksum, tid);
+      }
+    }
+  }
 
   /**
    * A class to read in blocks stored in the old format. The only two

Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java?rev=1095253&r1=1095252&r2=1095253&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java Wed Apr 20 02:28:19 2011
@@ -71,7 +71,7 @@ public class FSImage implements NNStorag
   private static final SimpleDateFormat DATE_FORM =
       new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
 
-  private static final int FIRST_TXNID_BASED_LAYOUT_VERSION=-29;
+  private static final int FIRST_TXNID_BASED_LAYOUT_VERSION=-32;
   
   // checkpoint states
   enum CheckpointStates{START, ROLLED_EDITS, UPLOAD_START, UPLOAD_DONE; }

Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java?rev=1095253&r1=1095252&r2=1095253&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java Wed Apr 20 02:28:19 2011
@@ -30,6 +30,7 @@ import java.security.DigestInputStream;
 import java.security.DigestOutputStream;
 import java.security.MessageDigest;
 import java.util.Arrays;
+import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -158,7 +159,7 @@ class FSImageFormat {
         
         // read the transaction ID of the last edit represented by
         // this image
-        if (imgVersion <= -28) {
+        if (imgVersion <= -31) {
           imgTxId = in.readLong();
         } else {
           imgTxId = 0;
@@ -178,7 +179,11 @@ class FSImageFormat {
 
         // load all inodes
         LOG.info("Number of files = " + numFiles);
-        loadFullNameINodes(numFiles, in);
+        if (imgVersion <= -30) {
+          loadLocalNameINodes(numFiles, in);
+        } else {
+          loadFullNameINodes(numFiles, in);
+        }
 
         // load datanode info
         this.loadDatanodes(in);
@@ -214,6 +219,63 @@ class FSImageFormat {
     fsDir.rootDir.setPermissionStatus(root.getPermissionStatus());    
   }
 
+  /** 
+   * load fsimage files assuming only local names are stored
+   *   
+   * @param numFiles number of files expected to be read
+   * @param in image input stream
+   * @throws IOException
+   */  
+   private void loadLocalNameINodes(long numFiles, DataInputStream in) 
+   throws IOException {
+     assert numFiles > 0;
+
+     // load root
+     if( in.readShort() != 0) {
+       throw new IOException("First node is not root");
+     }   
+     INode root = loadINode(in);
+     // update the root's attributes
+     updateRootAttr(root);
+     numFiles--;
+
+     // load rest of the nodes directory by directory
+     while (numFiles > 0) {
+       numFiles -= loadDirectory(in);
+     }
+     if (numFiles != 0) {
+       throw new IOException("Read unexpect number of files: " + -numFiles);
+     }
+   }
+   
+   /**
+    * Load all children of a directory
+    * 
+    * @param in
+    * @return number of child inodes read
+    * @throws IOException
+    */
+   private int loadDirectory(DataInputStream in) throws IOException {
+     String parentPath = FSImageSerialization.readString(in);
+     FSDirectory fsDir = namesystem.dir;
+     INode parent = fsDir.rootDir.getNode(parentPath, true);
+     if (parent == null || !parent.isDirectory()) {
+       throw new IOException("Path " + parentPath + "is not a directory.");
+     }
+
+     int numChildren = in.readInt();
+     for(int i=0; i<numChildren; i++) {
+       // load single inode
+       byte[] localName = new byte[in.readShort()];
+       in.readFully(localName); // read local name
+       INode newNode = loadINode(in); // read rest of inode
+
+       // add to parent
+       namesystem.dir.addToParent(localName, (INodeDirectory)parent, newNode, false);
+     }
+     return numChildren;
+   }
+
   /**
    * load fsimage files assuming full path names are stored
    * 
@@ -501,9 +563,10 @@ class FSImageFormat {
         byte[] byteStore = new byte[4*FSConstants.MAX_PATH_LENGTH];
         ByteBuffer strbuf = ByteBuffer.wrap(byteStore);
         // save the root
-        FSImageSerialization.saveINode2Image(strbuf, fsDir.rootDir, out);
+        FSImageSerialization.saveINode2Image(fsDir.rootDir, out);
         // save the rest of the nodes
-        saveImage(strbuf, 0, fsDir.rootDir, out);
+        saveImage(strbuf, fsDir.rootDir, out);
+        // save files under construction
         sourceNamesystem.saveFilesUnderConstruction(out);
         sourceNamesystem.saveSecretManagerState(out);
         strbuf = null;
@@ -527,28 +590,33 @@ class FSImageFormat {
      * This is a recursive procedure, which first saves all children of
      * a current directory and then moves inside the sub-directories.
      */
-    private static void saveImage(ByteBuffer parentPrefix,
-                                  int prefixLength,
+    private static void saveImage(ByteBuffer currentDirName,
                                   INodeDirectory current,
                                   DataOutputStream out) throws IOException {
-      int newPrefixLength = prefixLength;
-      if (current.getChildrenRaw() == null)
+      List<INode> children = current.getChildrenRaw();
+      if (children == null || children.isEmpty())
         return;
-      for(INode child : current.getChildren()) {
+      // print prefix (parent directory name)
+      int prefixLen = currentDirName.position();
+      if (prefixLen == 0) {  // root
+        out.writeShort(PATH_SEPARATOR.length);
+        out.write(PATH_SEPARATOR);
+      } else {  // non-root directories
+        out.writeShort(prefixLen);
+        out.write(currentDirName.array(), 0, prefixLen);
+      }
+      out.writeInt(children.size());
+      for(INode child : children) {
         // print all children first
-        parentPrefix.position(prefixLength);
-        parentPrefix.put(PATH_SEPARATOR).put(child.getLocalNameBytes());
-        FSImageSerialization.saveINode2Image(parentPrefix, child, out);
+        FSImageSerialization.saveINode2Image(child, out);
       }
-      for(INode child : current.getChildren()) {
+      for(INode child : children) {
         if(!child.isDirectory())
           continue;
-        parentPrefix.position(prefixLength);
-        parentPrefix.put(PATH_SEPARATOR).put(child.getLocalNameBytes());
-        newPrefixLength = parentPrefix.position();
-        saveImage(parentPrefix, newPrefixLength, (INodeDirectory)child, out);
+        currentDirName.put(PATH_SEPARATOR).put(child.getLocalNameBytes());
+        saveImage(currentDirName, (INodeDirectory)child, out);
+        currentDirName.position(prefixLen);
       }
-      parentPrefix.position(prefixLength);
     }
   }
 }

Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java?rev=1095253&r1=1095252&r2=1095253&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java Wed Apr 20 02:28:19 2011
@@ -145,12 +145,11 @@ public class FSImageSerialization {
   /*
    * Save one inode's attributes to the image.
    */
-  static void saveINode2Image(ByteBuffer name,
-                              INode node,
+  static void saveINode2Image(INode node,
                               DataOutputStream out) throws IOException {
-    int nameLen = name.position();
-    out.writeShort(nameLen);
-    out.write(name.array(), name.arrayOffset(), nameLen);
+    byte[] name = node.getLocalNameBytes();
+    out.writeShort(name.length);
+    out.write(name);
     FsPermission filePerm = TL_DATA.get().FILE_PERM;
     if (node.isDirectory()) {
       out.writeShort(0);  // replication

Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1095253&r1=1095252&r2=1095253&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Wed Apr 20 02:28:19 2011
@@ -252,6 +252,8 @@ public class FSNamesystem implements FSC
   private FsServerDefaults serverDefaults;
   // allow appending to hdfs files
   private boolean supportAppends = true;
+  private DataTransferProtocol.ReplaceDatanodeOnFailure dtpReplaceDatanodeOnFailure = 
+      DataTransferProtocol.ReplaceDatanodeOnFailure.DEFAULT;
 
   private volatile SafeModeInfo safeMode;  // safe mode information
   private Host2NodesMap host2DataNodeMap = new Host2NodesMap();
@@ -522,6 +524,8 @@ public class FSNamesystem implements FSC
         + " blockKeyUpdateInterval=" + blockKeyUpdateInterval / (60 * 1000)
         + " min(s), blockTokenLifetime=" + blockTokenLifetime / (60 * 1000)
         + " min(s)");
+
+    this.dtpReplaceDatanodeOnFailure = DataTransferProtocol.ReplaceDatanodeOnFailure.get(conf);
   }
 
   /**
@@ -1312,22 +1316,16 @@ public class FSNamesystem implements FSC
       long blockSize) throws SafeModeException, FileAlreadyExistsException,
       AccessControlException, UnresolvedLinkException, FileNotFoundException,
       ParentNotDirectoryException, IOException {
-    writeLock();
-    try {
-    boolean overwrite = flag.contains(CreateFlag.OVERWRITE);
-    boolean append = flag.contains(CreateFlag.APPEND);
-    boolean create = flag.contains(CreateFlag.CREATE);
-
     if (NameNode.stateChangeLog.isDebugEnabled()) {
       NameNode.stateChangeLog.debug("DIR* NameSystem.startFile: src=" + src
           + ", holder=" + holder
           + ", clientMachine=" + clientMachine
           + ", createParent=" + createParent
           + ", replication=" + replication
-          + ", overwrite=" + overwrite
-          + ", append=" + append);
+          + ", createFlag=" + flag.toString());
     }
-
+    writeLock();
+    try {
     if (isInSafeMode())
       throw new SafeModeException("Cannot create file" + src, safeMode);
     if (!DFSUtil.isValidName(src)) {
@@ -1337,14 +1335,16 @@ public class FSNamesystem implements FSC
     // Verify that the destination does not exist as a directory already.
     boolean pathExists = dir.exists(src);
     if (pathExists && dir.isDir(src)) {
-      throw new FileAlreadyExistsException("Cannot create file "+ src + "; already exists as a directory.");
+      throw new FileAlreadyExistsException("Cannot create file " + src
+          + "; already exists as a directory.");
     }
 
+    boolean overwrite = flag.contains(CreateFlag.OVERWRITE);
+    boolean append = flag.contains(CreateFlag.APPEND);
     if (isPermissionEnabled) {
       if (append || (overwrite && pathExists)) {
         checkPathAccess(src, FsAction.WRITE);
-      }
-      else {
+      } else {
         checkAncestorAccess(src, FsAction.WRITE);
       }
     }
@@ -1395,7 +1395,8 @@ public class FSNamesystem implements FSC
                 ". Lease recovery is in progress. Try again later.");
 
         } else {
-          if(pendingFile.getLastBlock().getBlockUCState() ==
+          BlockInfoUnderConstruction lastBlock=pendingFile.getLastBlock();
+          if(lastBlock != null && lastBlock.getBlockUCState() ==
             BlockUCState.UNDER_RECOVERY) {
             throw new RecoveryInProgressException(
               "Recovery in progress, file [" + src + "], " +
@@ -1416,34 +1417,27 @@ public class FSNamesystem implements FSC
       } catch(IOException e) {
         throw new IOException("failed to create "+e.getMessage());
       }
-      if (append) {
-        if (myFile == null) {
-          if(!create)
-            throw new FileNotFoundException("failed to append to non-existent file "
-              + src + " on client " + clientMachine);
-          else {
-            //append & create a nonexist file equals to overwrite
-            return startFileInternal(src, permissions, holder, clientMachine,
-                EnumSet.of(CreateFlag.OVERWRITE), createParent, replication, blockSize);
-          }
-        } else if (myFile.isDirectory()) {
-          throw new IOException("failed to append to directory " + src 
-                                +" on client " + clientMachine);
+      boolean create = flag.contains(CreateFlag.CREATE);
+      if (myFile == null) {
+        if (!create) {
+          throw new FileNotFoundException("failed to overwrite or append to non-existent file "
+            + src + " on client " + clientMachine);
         }
-      } else if (!dir.isValidToCreate(src)) {
+      } else {
+        // File exists - must be one of append or overwrite
         if (overwrite) {
           delete(src, true);
-        } else {
-          throw new IOException("failed to create file " + src 
-                                +" on client " + clientMachine
-                                +" either because the filename is invalid or the file exists");
+        } else if (!append) {
+          throw new FileAlreadyExistsException("failed to create file " + src
+              + " on client " + clientMachine
+              + " because the file exists");
         }
       }
 
       DatanodeDescriptor clientNode = 
         host2DataNodeMap.getDatanodeByHost(clientMachine);
 
-      if (append) {
+      if (append && myFile != null) {
         //
         // Replace current node with a INodeUnderConstruction.
         // Recreate in-memory lease record.
@@ -1635,6 +1629,53 @@ public class FSNamesystem implements FSC
     return b;
   }
 
+  /** @see NameNode#getAdditionalDatanode(String, Block, DatanodeInfo[], DatanodeInfo[], int, String) */
+  LocatedBlock getAdditionalDatanode(final String src, final Block blk,
+      final DatanodeInfo[] existings,  final HashMap<Node, Node> excludes,
+      final int numAdditionalNodes, final String clientName
+      ) throws IOException {
+    //check if the feature is enabled
+    dtpReplaceDatanodeOnFailure.checkEnabled();
+
+    final DatanodeDescriptor clientnode;
+    final long preferredblocksize;
+    readLock();
+    try {
+      //check safe mode
+      if (isInSafeMode()) {
+        throw new SafeModeException("Cannot add datanode; src=" + src
+            + ", blk=" + blk, safeMode);
+      }
+
+      //check lease
+      final INodeFileUnderConstruction file = checkLease(src, clientName);
+      clientnode = file.getClientNode();
+      preferredblocksize = file.getPreferredBlockSize();
+    } finally {
+      readUnlock();
+    }
+
+    //find datanode descriptors
+    final List<DatanodeDescriptor> chosen = new ArrayList<DatanodeDescriptor>();
+    for(DatanodeInfo d : existings) {
+      final DatanodeDescriptor descriptor = getDatanode(d);
+      if (descriptor != null) {
+        chosen.add(descriptor);
+      }
+    }
+
+    // choose new datanodes.
+    final DatanodeInfo[] targets = blockManager.replicator.chooseTarget(
+        src, numAdditionalNodes, clientnode, chosen, true,
+        excludes, preferredblocksize);
+    final LocatedBlock lb = new LocatedBlock(blk, targets);
+    if (isBlockTokenEnabled) {
+      lb.setBlockToken(blockTokenSecretManager.generateToken(lb.getBlock(), 
+          EnumSet.of(BlockTokenSecretManager.AccessMode.COPY)));
+    }
+    return lb;
+  }
+
   /**
    * The client would like to let go of the given block
    */
@@ -2661,7 +2702,6 @@ public class FSNamesystem implements FSC
    * Get registrationID for datanodes based on the namespaceID.
    * 
    * @see #registerDatanode(DatanodeRegistration)
-   * @see FSImage#newNamespaceID()
    * @return registration ID
    */
   public String getRegistrationID() {
@@ -3104,6 +3144,14 @@ public class FSNamesystem implements FSC
       throw new IOException("ProcessReport from dead or unregisterted node: "
                             + nodeID.getName());
     }
+    // To minimize startup time, we discard any second (or later) block reports
+    // that we receive while still in startup phase.
+    if (isInStartupSafeMode() && node.numBlocks() > 0) {
+      NameNode.stateChangeLog.info("BLOCK* NameSystem.processReport: "
+          + "discarded non-initial block report from " + nodeID.getName()
+          + " because namenode still in startup phase");
+      return;
+    }
 
     blockManager.processReport(node, newReport);
     NameNode.getNameNodeMetrics().blockReport.inc((int) (now() - startTime));
@@ -4171,6 +4219,15 @@ public class FSNamesystem implements FSC
       return false;
     return safeMode.isOn();
   }
+  
+  /**
+   * Check whether the name node is in startup mode.
+   */
+  synchronized boolean isInStartupSafeMode() {
+    if (safeMode == null)
+      return false;
+    return safeMode.isOn() && !safeMode.isManual();
+  }
 
   /**
    * Check whether replication queues are populated.
@@ -4542,7 +4599,8 @@ public class FSNamesystem implements FSC
       bean = new StandardMBean(this,FSNamesystemMBean.class);
       mbeanName = MBeanUtil.registerMBean("NameNode", "FSNamesystemState", bean);
     } catch (NotCompliantMBeanException e) {
-      e.printStackTrace();
+      LOG.warn("Exception in initializing StandardMBean as FSNamesystemMBean",
+	  e);
     }
 
     LOG.info("Registered FSNamesystemStatusMBean");

Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/INode.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/INode.java?rev=1095253&r1=1095252&r2=1095253&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/INode.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/INode.java Wed Apr 20 02:28:19 2011
@@ -233,12 +233,8 @@ abstract class INode implements Comparab
 
 
   String getLocalParentDir() {
-    INode p_node=getParent();
-
-    if(p_node == null)
-      return "/";
-    else
-      return p_node.getFullPathName();
+    INode inode = isRoot() ? this : getParent();
+    return (inode != null) ? inode.getFullPathName() : "";
   }
 
   /**
@@ -271,12 +267,7 @@ abstract class INode implements Comparab
 
   /** {@inheritDoc} */
   public String toString() {
-    String i_path=getFullPathName();
-
-    if(i_path.length() == 0)
-      i_path="/";
-
-    return "\"" + i_path + "\":"
+    return "\"" + getFullPathName() + "\":"
     + getUserName() + ":" + getGroupName() + ":"
     + (isDirectory()? "d": "-") + getFsPermission();
   }

Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java?rev=1095253&r1=1095252&r2=1095253&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java Wed Apr 20 02:28:19 2011
@@ -17,13 +17,12 @@
  */
 package org.apache.hadoop.hdfs.server.namenode;
 
-import org.apache.hadoop.hdfs.DFSConfigKeys;
-
 import java.io.File;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.URI;
 import java.security.PrivilegedExceptionAction;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -33,9 +32,9 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.fs.ContentSummary;
 import org.apache.hadoop.fs.CreateFlag;
-import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FsServerDefaults;
 import org.apache.hadoop.fs.Options;
@@ -80,9 +79,6 @@ import org.apache.hadoop.hdfs.server.pro
 import org.apache.hadoop.hdfs.server.protocol.NodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
 import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
-import org.apache.hadoop.hdfs.server.namenode.GetDelegationTokenServlet;
-import org.apache.hadoop.hdfs.server.namenode.CancelDelegationTokenServlet;
-import org.apache.hadoop.hdfs.server.namenode.RenewDelegationTokenServlet;
 import org.apache.hadoop.http.HttpServer;
 import org.apache.hadoop.io.EnumSetWritable;
 import org.apache.hadoop.io.Text;
@@ -842,6 +838,33 @@ public class NameNode implements Namenod
     return locatedBlock;
   }
 
+  @Override
+  public LocatedBlock getAdditionalDatanode(final String src, final Block blk,
+      final DatanodeInfo[] existings, final DatanodeInfo[] excludes,
+      final int numAdditionalNodes, final String clientName
+      ) throws IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("getAdditionalDatanode: src=" + src
+          + ", blk=" + blk
+          + ", existings=" + Arrays.asList(existings)
+          + ", excludes=" + Arrays.asList(excludes)
+          + ", numAdditionalNodes=" + numAdditionalNodes
+          + ", clientName=" + clientName);
+    }
+
+    myMetrics.numGetAdditionalDatanodeOps.inc();
+
+    HashMap<Node, Node> excludeSet = null;
+    if (excludes != null) {
+      excludeSet = new HashMap<Node, Node>(excludes.length);
+      for (Node node : excludes) {
+        excludeSet.put(node, node);
+      }
+    }
+    return namesystem.getAdditionalDatanode(src, blk,
+        existings, excludeSet, numAdditionalNodes, clientName);
+  }
+
   /**
    * The client needs to give up on the block.
    */
@@ -1204,7 +1227,7 @@ public class NameNode implements Namenod
     }
     final UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
     namesystem.createSymlink(target, link,
-      new PermissionStatus(ugi.getUserName(), null, dirPerms), createParent);
+      new PermissionStatus(ugi.getShortUserName(), null, dirPerms), createParent);
   }
 
   /** @inheritDoc */

Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/metrics/NameNodeMetrics.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/metrics/NameNodeMetrics.java?rev=1095253&r1=1095252&r2=1095253&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/metrics/NameNodeMetrics.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/metrics/NameNodeMetrics.java Wed Apr 20 02:28:19 2011
@@ -72,6 +72,8 @@ public class NameNodeMetrics implements 
                           new MetricsTimeVaryingInt("FileInfoOps", registry);
     public MetricsTimeVaryingInt numAddBlockOps = 
                           new MetricsTimeVaryingInt("AddBlockOps", registry);
+    public final MetricsTimeVaryingInt numGetAdditionalDatanodeOps
+        = new MetricsTimeVaryingInt("GetAdditionalDatanodeOps", registry);
     public MetricsTimeVaryingInt numcreateSymlinkOps = 
                           new MetricsTimeVaryingInt("CreateSymlinkOps", registry);
     public MetricsTimeVaryingInt numgetLinkTargetOps = 

Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java?rev=1095253&r1=1095252&r2=1095253&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java Wed Apr 20 02:28:19 2011
@@ -266,13 +266,21 @@ public class DFSAdmin extends FsShell {
     super(conf);
   }
   
+  protected DistributedFileSystem getDFS() throws IOException {
+    FileSystem fs = getFS();
+    if (!(fs instanceof DistributedFileSystem)) {
+      throw new IllegalArgumentException("FileSystem " + fs.getUri() + 
+      " is not a distributed file system");
+    }
+    return (DistributedFileSystem)fs;
+  }
+  
   /**
    * Gives a report on how the FileSystem is doing.
    * @exception IOException if the filesystem does not exist.
    */
   public void report() throws IOException {
-    if (fs instanceof DistributedFileSystem) {
-      DistributedFileSystem dfs = (DistributedFileSystem) fs;
+      DistributedFileSystem dfs = getDFS();
       FsStatus ds = dfs.getStatus();
       long capacity = ds.getCapacity();
       long used = ds.getUsed();
@@ -339,7 +347,6 @@ public class DFSAdmin extends FsShell {
           System.out.println();
         }     
       }
-    }
   }
 
   /**
@@ -350,10 +357,6 @@ public class DFSAdmin extends FsShell {
    * @exception IOException if the filesystem does not exist.
    */
   public void setSafeMode(String[] argv, int idx) throws IOException {
-    if (!(fs instanceof DistributedFileSystem)) {
-      System.err.println("FileSystem is " + fs.getUri());
-      return;
-    }
     if (idx != argv.length - 1) {
       printUsage("-safemode");
       return;
@@ -374,7 +377,7 @@ public class DFSAdmin extends FsShell {
       printUsage("-safemode");
       return;
     }
-    DistributedFileSystem dfs = (DistributedFileSystem) fs;
+    DistributedFileSystem dfs = getDFS();
     boolean inSafeMode = dfs.setSafeMode(action);
 
     //
@@ -404,12 +407,7 @@ public class DFSAdmin extends FsShell {
   public int saveNamespace() throws IOException {
     int exitCode = -1;
 
-    if (!(fs instanceof DistributedFileSystem)) {
-      System.err.println("FileSystem is " + fs.getUri());
-      return exitCode;
-    }
-
-    DistributedFileSystem dfs = (DistributedFileSystem) fs;
+    DistributedFileSystem dfs = getDFS();
     dfs.saveNamespace();
     exitCode = 0;
    
@@ -425,17 +423,12 @@ public class DFSAdmin extends FsShell {
   public int restoreFaileStorage(String arg) throws IOException {
     int exitCode = -1;
 
-    if (!(fs instanceof DistributedFileSystem)) {
-      System.err.println("FileSystem is " + fs.getUri());
-      return exitCode;
-    }
-
     if(!arg.equals("check") && !arg.equals("true") && !arg.equals("false")) {
       System.err.println("restoreFailedStorage valid args are true|false|check");
       return exitCode;
     }
     
-    DistributedFileSystem dfs = (DistributedFileSystem) fs;
+    DistributedFileSystem dfs = getDFS();
     Boolean res = dfs.restoreFailedStorage(arg);
     System.out.println("restoreFailedStorage is set to " + res);
     exitCode = 0;
@@ -452,12 +445,7 @@ public class DFSAdmin extends FsShell {
   public int refreshNodes() throws IOException {
     int exitCode = -1;
 
-    if (!(fs instanceof DistributedFileSystem)) {
-      System.err.println("FileSystem is " + fs.getUri());
-      return exitCode;
-    }
-
-    DistributedFileSystem dfs = (DistributedFileSystem) fs;
+    DistributedFileSystem dfs = getDFS();
     dfs.refreshNodes();
     exitCode = 0;
    
@@ -611,18 +599,10 @@ public class DFSAdmin extends FsShell {
    * @exception IOException 
    */
   public int finalizeUpgrade() throws IOException {
-    int exitCode = -1;
-
-    if (!(fs instanceof DistributedFileSystem)) {
-      System.out.println("FileSystem is " + fs.getUri());
-      return exitCode;
-    }
-
-    DistributedFileSystem dfs = (DistributedFileSystem) fs;
+    DistributedFileSystem dfs = getDFS();
     dfs.finalizeUpgrade();
-    exitCode = 0;
-   
-    return exitCode;
+    
+    return 0;
   }
 
   /**
@@ -633,10 +613,7 @@ public class DFSAdmin extends FsShell {
    * @exception IOException 
    */
   public int upgradeProgress(String[] argv, int idx) throws IOException {
-    if (!(fs instanceof DistributedFileSystem)) {
-      System.out.println("FileSystem is " + fs.getUri());
-      return -1;
-    }
+    
     if (idx != argv.length - 1) {
       printUsage("-upgradeProgress");
       return -1;
@@ -654,7 +631,7 @@ public class DFSAdmin extends FsShell {
       return -1;
     }
 
-    DistributedFileSystem dfs = (DistributedFileSystem) fs;
+    DistributedFileSystem dfs = getDFS();
     UpgradeStatusReport status = dfs.distributedUpgradeProgress(action);
     String statusText = (status == null ? 
         "There are no upgrades in progress." :
@@ -673,7 +650,7 @@ public class DFSAdmin extends FsShell {
    */
   public int metaSave(String[] argv, int idx) throws IOException {
     String pathname = argv[idx];
-    DistributedFileSystem dfs = (DistributedFileSystem) fs;
+    DistributedFileSystem dfs = getDFS();
     dfs.metaSave(pathname);
     System.out.println("Created file " + pathname + " on server " +
                        dfs.getUri());
@@ -688,8 +665,7 @@ public class DFSAdmin extends FsShell {
    * @throws IOException If an error while getting datanode report
    */
   public int printTopology() throws IOException {
-    if (fs instanceof DistributedFileSystem) {
-      DistributedFileSystem dfs = (DistributedFileSystem)fs;
+      DistributedFileSystem dfs = getDFS();
       DFSClient client = dfs.getClient();
       DatanodeInfo[] report = client.datanodeReport(DatanodeReportType.ALL);
       
@@ -724,7 +700,6 @@ public class DFSAdmin extends FsShell {
 
         System.out.println();
       }
-    }
     return 0;
   }
   
@@ -1009,13 +984,13 @@ public class DFSAdmin extends FsShell {
       } else if ("-metasave".equals(cmd)) {
         exitCode = metaSave(argv, i);
       } else if (ClearQuotaCommand.matches(cmd)) {
-        exitCode = new ClearQuotaCommand(argv, i, fs).runAll();
+        exitCode = new ClearQuotaCommand(argv, i, getDFS()).runAll();
       } else if (SetQuotaCommand.matches(cmd)) {
-        exitCode = new SetQuotaCommand(argv, i, fs).runAll();
+        exitCode = new SetQuotaCommand(argv, i, getDFS()).runAll();
       } else if (ClearSpaceQuotaCommand.matches(cmd)) {
-        exitCode = new ClearSpaceQuotaCommand(argv, i, fs).runAll();
+        exitCode = new ClearSpaceQuotaCommand(argv, i, getDFS()).runAll();
       } else if (SetSpaceQuotaCommand.matches(cmd)) {
-        exitCode = new SetSpaceQuotaCommand(argv, i, fs).runAll();
+        exitCode = new SetSpaceQuotaCommand(argv, i, getDFS()).runAll();
       } else if ("-refreshServiceAcl".equals(cmd)) {
         exitCode = refreshServiceAcl();
       } else if ("-refreshUserToGroupsMappings".equals(cmd)) {

Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/tools/offlineEditsViewer/EditsElement.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/tools/offlineEditsViewer/EditsElement.java?rev=1095253&r1=1095252&r2=1095253&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/tools/offlineEditsViewer/EditsElement.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/tools/offlineEditsViewer/EditsElement.java Wed Apr 20 02:28:19 2011
@@ -81,5 +81,6 @@ public enum EditsElement {
     KEY_ID,
     KEY_EXPIRY_DATE,
     KEY_LENGTH,
-    KEY_BLOB
+    KEY_BLOB,
+    CHECKSUM
 }

Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/tools/offlineEditsViewer/EditsLoaderCurrent.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/tools/offlineEditsViewer/EditsLoaderCurrent.java?rev=1095253&r1=1095252&r2=1095253&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/tools/offlineEditsViewer/EditsLoaderCurrent.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/tools/offlineEditsViewer/EditsLoaderCurrent.java Wed Apr 20 02:28:19 2011
@@ -17,26 +17,15 @@
  */
 package org.apache.hadoop.hdfs.tools.offlineEditsViewer;
 
-import java.io.DataInputStream;
 import java.io.IOException;
-import java.io.EOFException;
-
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes;
 
 import static org.apache.hadoop.hdfs.tools.offlineEditsViewer.Tokenizer.ByteToken;
-import static org.apache.hadoop.hdfs.tools.offlineEditsViewer.Tokenizer.ShortToken;
 import static org.apache.hadoop.hdfs.tools.offlineEditsViewer.Tokenizer.IntToken;
 import static org.apache.hadoop.hdfs.tools.offlineEditsViewer.Tokenizer.VIntToken;
-import static org.apache.hadoop.hdfs.tools.offlineEditsViewer.Tokenizer.LongToken;
-import static org.apache.hadoop.hdfs.tools.offlineEditsViewer.Tokenizer.VLongToken;
-import static org.apache.hadoop.hdfs.tools.offlineEditsViewer.Tokenizer.StringUTF8Token;
-import static org.apache.hadoop.hdfs.tools.offlineEditsViewer.Tokenizer.StringTextToken;
-import static org.apache.hadoop.hdfs.tools.offlineEditsViewer.Tokenizer.BlobToken;
-import static org.apache.hadoop.hdfs.tools.offlineEditsViewer.Tokenizer.BytesWritableToken;
-import static org.apache.hadoop.hdfs.tools.offlineEditsViewer.Tokenizer.EmptyToken;
 
 /**
  * EditsLoaderCurrent processes Hadoop EditLogs files and walks over
@@ -49,7 +38,7 @@ import static org.apache.hadoop.hdfs.too
 class EditsLoaderCurrent implements EditsLoader {
 
   private static int [] supportedVersions = {
-    -18, -19, -20, -21, -22, -23, -24, -25, -26, -27, -28 };
+    -18, -19, -20, -21, -22, -23, -24, -25, -26, -27, -28, -30, -31 };
 
   private EditsVisitor v;
   private int editsVersion = 0;
@@ -77,7 +66,7 @@ class EditsLoaderCurrent implements Edit
    * Visit OP_INVALID
    */
   private void visit_OP_INVALID() throws IOException {
-    if(editsVersion <= -28) {
+    if(editsVersion <= -31) {
       v.visitLong(EditsElement.TRANSACTION_ID);
     }
   }
@@ -103,7 +92,7 @@ class EditsLoaderCurrent implements Edit
    */
   private void visit_OP_ADD_or_OP_CLOSE(FSEditLogOpCodes editsOpCode)
     throws IOException {
-    if(editsVersion <= -28) {
+    if(editsVersion <= -31) {
       v.visitLong(EditsElement.TRANSACTION_ID);
     }
 
@@ -149,7 +138,7 @@ class EditsLoaderCurrent implements Edit
    * Visit OP_RENAME_OLD
    */
   private void visit_OP_RENAME_OLD() throws IOException {
-    if(editsVersion <= -28) {
+    if(editsVersion <= -31) {
       v.visitLong(EditsElement.TRANSACTION_ID);
     }
 
@@ -163,7 +152,7 @@ class EditsLoaderCurrent implements Edit
    * Visit OP_DELETE
    */
   private void visit_OP_DELETE() throws IOException {
-    if(editsVersion <= -28) {
+    if(editsVersion <= -31) {
       v.visitLong(EditsElement.TRANSACTION_ID);
     }
 
@@ -176,7 +165,7 @@ class EditsLoaderCurrent implements Edit
    * Visit OP_MKDIR
    */
   private void visit_OP_MKDIR() throws IOException {
-    if(editsVersion <= -28) {
+    if(editsVersion <= -31) {
       v.visitLong(EditsElement.TRANSACTION_ID);
     }
 
@@ -198,7 +187,7 @@ class EditsLoaderCurrent implements Edit
    * Visit OP_SET_REPLICATION
    */
   private void visit_OP_SET_REPLICATION() throws IOException {
-    if(editsVersion <= -28) {
+    if(editsVersion <= -31) {
       v.visitLong(EditsElement.TRANSACTION_ID);
     }
 
@@ -210,7 +199,7 @@ class EditsLoaderCurrent implements Edit
    * Visit OP_SET_PERMISSIONS
    */
   private void visit_OP_SET_PERMISSIONS() throws IOException {
-    if(editsVersion <= -28) {
+    if(editsVersion <= -31) {
       v.visitLong(EditsElement.TRANSACTION_ID);
     }
 
@@ -222,7 +211,7 @@ class EditsLoaderCurrent implements Edit
    * Visit OP_SET_OWNER
    */
   private void visit_OP_SET_OWNER() throws IOException {
-    if(editsVersion <= -28) {
+    if(editsVersion <= -31) {
       v.visitLong(EditsElement.TRANSACTION_ID);
     }
 
@@ -235,7 +224,7 @@ class EditsLoaderCurrent implements Edit
    * Visit OP_SET_GENSTAMP
    */
   private void visit_OP_SET_GENSTAMP() throws IOException {
-    if(editsVersion <= -28) {
+    if(editsVersion <= -31) {
       v.visitLong(EditsElement.TRANSACTION_ID);
     }
 
@@ -246,7 +235,7 @@ class EditsLoaderCurrent implements Edit
    * Visit OP_TIMES
    */
   private void visit_OP_TIMES() throws IOException {
-    if(editsVersion <= -28) {
+    if(editsVersion <= -31) {
       v.visitLong(EditsElement.TRANSACTION_ID);
     }
 
@@ -260,7 +249,7 @@ class EditsLoaderCurrent implements Edit
    * Visit OP_SET_QUOTA
    */
   private void visit_OP_SET_QUOTA() throws IOException {
-    if(editsVersion <= -28) {
+    if(editsVersion  <= -31) {
       v.visitLong(EditsElement.TRANSACTION_ID);
     }
 
@@ -273,7 +262,7 @@ class EditsLoaderCurrent implements Edit
    * Visit OP_RENAME
    */
   private void visit_OP_RENAME() throws IOException {
-    if(editsVersion <= -28) {
+    if(editsVersion  <= -31) {
       v.visitLong(EditsElement.TRANSACTION_ID);
     }
 
@@ -293,7 +282,7 @@ class EditsLoaderCurrent implements Edit
    * Visit OP_CONCAT_DELETE
    */
   private void visit_OP_CONCAT_DELETE() throws IOException {
-    if(editsVersion <= -28) {
+    if(editsVersion  <= -31) {
       v.visitLong(EditsElement.TRANSACTION_ID);
     }
 
@@ -317,7 +306,7 @@ class EditsLoaderCurrent implements Edit
    * Visit OP_SYMLINK
    */
   private void visit_OP_SYMLINK() throws IOException {
-    if(editsVersion <= -28) {
+    if(editsVersion  <= -31) {
       v.visitLong(EditsElement.TRANSACTION_ID);
     }
 
@@ -340,7 +329,7 @@ class EditsLoaderCurrent implements Edit
    * Visit OP_GET_DELEGATION_TOKEN
    */
   private void visit_OP_GET_DELEGATION_TOKEN() throws IOException {
-    if(editsVersion <= -28) {
+    if(editsVersion  <= -31) {
       v.visitLong(EditsElement.TRANSACTION_ID);
     }
     
@@ -366,7 +355,7 @@ class EditsLoaderCurrent implements Edit
    */
   private void visit_OP_RENEW_DELEGATION_TOKEN()
     throws IOException {
-    if(editsVersion <= -28) {
+    if(editsVersion  <= -31) {
       v.visitLong(EditsElement.TRANSACTION_ID);
     }
 
@@ -392,7 +381,7 @@ class EditsLoaderCurrent implements Edit
    */
   private void visit_OP_CANCEL_DELEGATION_TOKEN()
     throws IOException {
-    if(editsVersion <= -28) {
+    if(editsVersion  <= -31) {
       v.visitLong(EditsElement.TRANSACTION_ID);
     }
 
@@ -417,7 +406,7 @@ class EditsLoaderCurrent implements Edit
    */
   private void visit_OP_UPDATE_MASTER_KEY()
     throws IOException {
-    if(editsVersion <= -28) {
+    if(editsVersion  <= -31) {
       v.visitLong(EditsElement.TRANSACTION_ID);
     }
     
@@ -530,6 +519,10 @@ class EditsLoaderCurrent implements Edit
         visitOpCode(editsOpCode);
 
         v.leaveEnclosingElement(); // DATA
+        
+        if (editsOpCode != FSEditLogOpCodes.OP_INVALID && editsVersion  <= -28) {
+          v.visitInt(EditsElement.CHECKSUM);
+        }
         v.leaveEnclosingElement(); // RECORD
       } while(editsOpCode != FSEditLogOpCodes.OP_INVALID);