You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by to...@apache.org on 2011/06/21 22:09:57 UTC

svn commit: r1138160 [2/6] - in /hadoop/common/branches/HDFS-1073/hdfs: ./ bin/ src/c++/libhdfs/ src/c++/libhdfs/tests/ src/contrib/ src/contrib/fuse-dfs/ src/contrib/fuse-dfs/src/ src/contrib/hdfsproxy/ src/docs/src/documentation/content/xdocs/ src/ja...

Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/BlockReader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/BlockReader.java?rev=1138160&r1=1138159&r2=1138160&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/BlockReader.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/BlockReader.java Tue Jun 21 20:09:54 2011
@@ -17,9 +17,7 @@
  */
 package org.apache.hadoop.hdfs;
 
-import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status.CHECKSUM_OK;
-import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status.ERROR_ACCESS_TOKEN;
-import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status.SUCCESS;
+import static org.apache.hadoop.hdfs.protocol.HdfsProtoUtil.vintPrefixed;
 
 import java.io.BufferedInputStream;
 import java.io.BufferedOutputStream;
@@ -34,9 +32,12 @@ import java.nio.ByteBuffer;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.fs.FSInputChecker;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
-import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.PacketHeader;
+import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
+import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientReadStatusProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants;
@@ -45,6 +46,7 @@ import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.DataChecksum;
 
+
 /** This is a wrapper around connection to datanode
  * and understands checksum, offset etc.
  *
@@ -138,9 +140,9 @@ public class BlockReader extends FSInput
     // if eos was set in the previous read, send a status code to the DN
     if (eos && !eosBefore && nRead >= 0) {
       if (needChecksum()) {
-        sendReadResult(dnSock, CHECKSUM_OK);
+        sendReadResult(dnSock, Status.CHECKSUM_OK);
       } else {
-        sendReadResult(dnSock, SUCCESS);
+        sendReadResult(dnSock, Status.SUCCESS);
       }
     }
     return nRead;
@@ -313,20 +315,13 @@ public class BlockReader extends FSInput
         pos + bytesToRead >= bytesNeededToFinish) {
 
       // Read header
-      int packetLen = in.readInt();
-      long offsetInBlock = in.readLong();
-      long seqno = in.readLong();
-      boolean lastPacketInBlock = in.readBoolean();
-      int dataLen = in.readInt();
+      PacketHeader hdr = new PacketHeader();
+      hdr.readFields(in);
 
-      if (!lastPacketInBlock ||
-          dataLen != 0) {
+      if (!hdr.isLastPacketInBlock() ||
+          hdr.getDataLen() != 0) {
         throw new IOException("Expected empty end-of-read packet! Header: " +
-                              "(packetLen : " + packetLen + 
-                              ", offsetInBlock : " + offsetInBlock +
-                              ", seqno : " + seqno + 
-                              ", lastInBlock : " + lastPacketInBlock +
-                              ", dataLen : " + dataLen);
+                              hdr);
       }
 
       eos = true;
@@ -409,7 +404,7 @@ public class BlockReader extends FSInput
                                      String clientName)
                                      throws IOException {
     // in and out will be closed when sock is closed (by the caller)
-    DataTransferProtocol.Sender.opReadBlock(
+    Sender.opReadBlock(
         new DataOutputStream(new BufferedOutputStream(
             NetUtils.getOutputStream(sock,HdfsConstants.WRITE_TIMEOUT))),
         block, startOffset, len, clientName, blockToken);
@@ -422,9 +417,10 @@ public class BlockReader extends FSInput
         new BufferedInputStream(NetUtils.getInputStream(sock), 
                                 bufferSize));
     
-    DataTransferProtocol.Status status = DataTransferProtocol.Status.read(in);
-    if (status != SUCCESS) {
-      if (status == ERROR_ACCESS_TOKEN) {
+    BlockOpResponseProto status = BlockOpResponseProto.parseFrom(
+        vintPrefixed(in));
+    if (status.getStatus() != Status.SUCCESS) {
+      if (status.getStatus() == Status.ERROR_ACCESS_TOKEN) {
         throw new InvalidBlockTokenException(
             "Got access token error for OP_READ_BLOCK, self="
                 + sock.getLocalSocketAddress() + ", remote="
@@ -499,11 +495,16 @@ public class BlockReader extends FSInput
    * closing our connection (which we will re-open), but won't affect
    * data correctness.
    */
-  void sendReadResult(Socket sock, DataTransferProtocol.Status statusCode) {
+  void sendReadResult(Socket sock, Status statusCode) {
     assert !sentStatusCode : "already sent status code to " + sock;
     try {
       OutputStream out = NetUtils.getOutputStream(sock, HdfsConstants.WRITE_TIMEOUT);
-      statusCode.writeOutputStream(out);
+      
+      ClientReadStatusProto.newBuilder()
+        .setStatus(statusCode)
+        .build()
+        .writeDelimitedTo(out);
+
       out.flush();
       sentStatusCode = true;
     } catch (IOException e) {

Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/DFSClient.java?rev=1138160&r1=1138159&r2=1138160&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/DFSClient.java Tue Jun 21 20:09:54 2011
@@ -18,10 +18,6 @@
  */
 package org.apache.hadoop.hdfs;
 
-import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Op.BLOCK_CHECKSUM;
-import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status.ERROR_ACCESS_TOKEN;
-import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status.SUCCESS;
-
 import java.io.BufferedOutputStream;
 import java.io.ByteArrayInputStream;
 import java.io.DataInputStream;
@@ -65,17 +61,23 @@ import org.apache.hadoop.hdfs.protocol.C
 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
 import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks;
 import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException;
-import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.DirectoryListing;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
+import org.apache.hadoop.hdfs.protocol.HdfsProtoUtil;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
 import org.apache.hadoop.hdfs.protocol.UnresolvedPathException;
+import org.apache.hadoop.hdfs.protocol.datatransfer.Op;
+import org.apache.hadoop.hdfs.protocol.datatransfer.ReplaceDatanodeOnFailure;
+import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants;
 import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport;
@@ -131,7 +133,7 @@ public class DFSClient implements FSCons
   SocketFactory socketFactory;
   int socketTimeout;
   final int writePacketSize;
-  final DataTransferProtocol.ReplaceDatanodeOnFailure dtpReplaceDatanodeOnFailure;
+  final ReplaceDatanodeOnFailure dtpReplaceDatanodeOnFailure;
   final FileSystem.Statistics stats;
   final int hdfsTimeout;    // timeout value for a DFS operation.
   final LeaseRenewer leaserenewer;
@@ -265,7 +267,7 @@ public class DFSClient implements FSCons
     this.writePacketSize = 
       conf.getInt(DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY, 
                   DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT);
-    this.dtpReplaceDatanodeOnFailure = DataTransferProtocol.ReplaceDatanodeOnFailure.get(conf);
+    this.dtpReplaceDatanodeOnFailure = ReplaceDatanodeOnFailure.get(conf);
 
     // The hdfsTimeout is currently the same as the ipc timeout 
     this.hdfsTimeout = Client.getTimeout(conf);
@@ -1112,15 +1114,16 @@ public class DFSClient implements FSCons
 
           if (LOG.isDebugEnabled()) {
             LOG.debug("write to " + datanodes[j].getName() + ": "
-                + BLOCK_CHECKSUM + ", block=" + block);
+                + Op.BLOCK_CHECKSUM + ", block=" + block);
           }
           // get block MD5
-          DataTransferProtocol.Sender.opBlockChecksum(out, block,
-              lb.getBlockToken());
+          Sender.opBlockChecksum(out, block, lb.getBlockToken());
+
+          final BlockOpResponseProto reply =
+            BlockOpResponseProto.parseFrom(HdfsProtoUtil.vintPrefixed(in));
 
-          final DataTransferProtocol.Status reply = DataTransferProtocol.Status.read(in);
-          if (reply != SUCCESS) {
-            if (reply == ERROR_ACCESS_TOKEN
+          if (reply.getStatus() != Status.SUCCESS) {
+            if (reply.getStatus() == Status.ERROR_ACCESS_TOKEN
                 && i > lastRetriedIndex) {
               if (LOG.isDebugEnabled()) {
                 LOG.debug("Got access token error in response to OP_BLOCK_CHECKSUM "
@@ -1138,9 +1141,12 @@ public class DFSClient implements FSCons
                   + block + " from datanode " + datanodes[j].getName());
             }
           }
+          
+          OpBlockChecksumResponseProto checksumData =
+            reply.getChecksumResponse();
 
           //read byte-per-checksum
-          final int bpc = in.readInt(); 
+          final int bpc = checksumData.getBytesPerCrc();
           if (i == 0) { //first block
             bytesPerCRC = bpc;
           }
@@ -1150,13 +1156,14 @@ public class DFSClient implements FSCons
           }
           
           //read crc-per-block
-          final long cpb = in.readLong();
+          final long cpb = checksumData.getCrcPerBlock();
           if (locatedblocks.size() > 1 && i == 0) {
             crcPerBlock = cpb;
           }
 
           //read md5
-          final MD5Hash md5 = MD5Hash.read(in);
+          final MD5Hash md5 = new MD5Hash(
+              checksumData.getMd5().toByteArray());
           md5.write(md5out);
           
           done = true;

Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/DFSOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/DFSOutputStream.java?rev=1138160&r1=1138159&r2=1138160&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/DFSOutputStream.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/DFSOutputStream.java Tue Jun 21 20:09:54 2011
@@ -17,8 +17,7 @@
  */
 package org.apache.hadoop.hdfs;
 
-import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status.ERROR_ACCESS_TOKEN;
-import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status.SUCCESS;
+import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status.SUCCESS;
 
 import java.io.BufferedOutputStream;
 import java.io.DataInputStream;
@@ -47,16 +46,20 @@ import org.apache.hadoop.fs.UnresolvedLi
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
 import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException;
-import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
-import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.BlockConstructionStage;
-import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.PacketHeader;
-import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.PipelineAck;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
+import org.apache.hadoop.hdfs.protocol.HdfsProtoUtil;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
 import org.apache.hadoop.hdfs.protocol.UnresolvedPathException;
+import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
+import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol;
+import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
+import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck;
+import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
@@ -64,7 +67,6 @@ import org.apache.hadoop.hdfs.server.nam
 import org.apache.hadoop.hdfs.server.namenode.SafeModeException;
 import org.apache.hadoop.io.EnumSetWritable;
 import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.io.Text;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.AccessControlException;
@@ -75,6 +77,8 @@ import org.apache.hadoop.util.Progressab
 import org.apache.hadoop.util.PureJavaCrc32;
 import org.apache.hadoop.util.StringUtils;
 
+
+
 /****************************************************************
  * DFSOutputStream creates files from a stream of bytes.
  *
@@ -650,7 +654,7 @@ class DFSOutputStream extends FSOutputSu
             long seqno = ack.getSeqno();
             // processes response status from datanodes.
             for (int i = ack.getNumOfReplies()-1; i >=0  && dfsClient.clientRunning; i--) {
-              final DataTransferProtocol.Status reply = ack.getReply(i);
+              final Status reply = ack.getReply(i);
               if (reply != SUCCESS) {
                 errorIndex = i; // first bad datanode
                 throw new IOException("Bad response " + reply +
@@ -843,12 +847,14 @@ class DFSOutputStream extends FSOutputSu
             DataNode.SMALL_BUFFER_SIZE));
 
         //send the TRANSFER_BLOCK request
-        DataTransferProtocol.Sender.opTransferBlock(out, block,
+        Sender.opTransferBlock(out, block,
             dfsClient.clientName, targets, blockToken);
 
         //ack
         in = new DataInputStream(NetUtils.getInputStream(sock));
-        if (SUCCESS != DataTransferProtocol.Status.read(in)) {
+        BlockOpResponseProto response =
+          BlockOpResponseProto.parseFrom(HdfsProtoUtil.vintPrefixed(in));
+        if (SUCCESS != response.getStatus()) {
           throw new IOException("Failed to add a datanode");
         }
       } finally {
@@ -990,7 +996,7 @@ class DFSOutputStream extends FSOutputSu
     //
     private boolean createBlockOutputStream(DatanodeInfo[] nodes, long newGS,
         boolean recoveryFlag) {
-      DataTransferProtocol.Status pipelineStatus = SUCCESS;
+      Status pipelineStatus = SUCCESS;
       String firstBadLink = "";
       if (DFSClient.LOG.isDebugEnabled()) {
         for (int i = 0; i < nodes.length; i++) {
@@ -1015,7 +1021,7 @@ class DFSOutputStream extends FSOutputSu
         blockReplyStream = new DataInputStream(NetUtils.getInputStream(s));
 
         // send the request
-        DataTransferProtocol.Sender.opWriteBlock(out, block,
+        Sender.opWriteBlock(out, block,
             nodes.length, recoveryFlag ? stage.getRecoveryStage() : stage, newGS, 
             block.getNumBytes(), bytesSent, dfsClient.clientName, null, nodes,
             accessToken);
@@ -1023,10 +1029,13 @@ class DFSOutputStream extends FSOutputSu
         out.flush();
 
         // receive ack for connect
-        pipelineStatus = DataTransferProtocol.Status.read(blockReplyStream);
-        firstBadLink = Text.readString(blockReplyStream);
+        BlockOpResponseProto resp = BlockOpResponseProto.parseFrom(
+            HdfsProtoUtil.vintPrefixed(blockReplyStream));
+        pipelineStatus = resp.getStatus();
+        firstBadLink = resp.getFirstBadLink();
+        
         if (pipelineStatus != SUCCESS) {
-          if (pipelineStatus == ERROR_ACCESS_TOKEN) {
+          if (pipelineStatus == Status.ERROR_ACCESS_TOKEN) {
             throw new InvalidBlockTokenException(
                 "Got access token error for connect ack with firstBadLink as "
                     + firstBadLink);
@@ -1401,9 +1410,8 @@ class DFSOutputStream extends FSOutputSu
   }
   
   /**
-   * flushes out to all replicas of the block. 
-   * The data is in the buffers of the DNs 
-   * but not neccessary on the DN's OS buffers. 
+   * Flushes out to all replicas of the block. The data is in the buffers
+   * of the DNs but not necessarily in the DN's OS buffers.
    *
    * It is a synchronous operation. When it returns,
    * it guarantees that flushed data become visible to new readers. 

Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/HftpFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/HftpFileSystem.java?rev=1138160&r1=1138159&r2=1138160&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/HftpFileSystem.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/HftpFileSystem.java Tue Jun 21 20:09:54 2011
@@ -190,9 +190,6 @@ public class HftpFileSystem extends File
         }
       }
       
-      //Renew TGT if needed
-      ugi.reloginFromKeytab();
-      
       //since we don't already have a token, go get one over https
       if (delegationToken == null) {
         delegationToken = 
@@ -204,8 +201,10 @@ public class HftpFileSystem extends File
   
 
   @Override
-  public Token<?> getDelegationToken(final String renewer) throws IOException {
+  public synchronized Token<?> getDelegationToken(final String renewer) throws IOException {
     try {
+      //Renew TGT if needed
+      ugi.reloginFromKeytab();
       return ugi.doAs(new PrivilegedExceptionAction<Token<?>>() {
         public Token<?> run() throws IOException {
           Credentials c;

Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java?rev=1138160&r1=1138159&r2=1138160&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java Tue Jun 21 20:09:54 2011
@@ -159,6 +159,11 @@ public class DatanodeInfo extends Datano
   public void setCapacity(long capacity) { 
     this.capacity = capacity; 
   }
+  
+  /** Sets the used space for the datanode. */
+  public void setDfsUsed(long dfsUsed) {
+    this.dfsUsed = dfsUsed;
+  }
 
   /** Sets raw free space. */
   public void setRemaining(long remaining) { 

Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/protocol/LayoutVersion.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/protocol/LayoutVersion.java?rev=1138160&r1=1138159&r2=1138160&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/protocol/LayoutVersion.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/protocol/LayoutVersion.java Tue Jun 21 20:09:54 2011
@@ -108,6 +108,30 @@ public class LayoutVersion {
       this.ancestorLV = ancestorLV;
       this.description = description;
     }
+    
+    /** 
+     * Accessor method for feature layout version 
+     * @return int lv value
+     */
+    public int getLayoutVersion() {
+      return lv;
+    }
+
+    /** 
+     * Accessor method for feature ancestor layout version 
+     * @return int ancestor LV value
+     */
+    public int getAncestorLayoutVersion() {
+      return ancestorLV;
+    }
+
+    /** 
+     * Accessor method for feature description 
+     * @return String feature description 
+     */
+    public String getDescription() {
+      return description;
+    }
   }
   
   // Build layout version and corresponding feature matrix

Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java?rev=1138160&r1=1138159&r2=1138160&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java Tue Jun 21 20:09:54 2011
@@ -17,7 +17,7 @@
  */
 package org.apache.hadoop.hdfs.server.balancer;
 
-import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status.ERROR_ACCESS_TOKEN;
+import static org.apache.hadoop.hdfs.protocol.HdfsProtoUtil.vintPrefixed;
 
 import java.io.BufferedInputStream;
 import java.io.BufferedOutputStream;
@@ -53,11 +53,13 @@ import org.apache.hadoop.conf.Configured
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.protocol.Block;
-import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.hdfs.protocol.FSConstants.DatanodeReportType;
+import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants;
 import org.apache.hadoop.hdfs.server.common.Util;
@@ -346,15 +348,16 @@ public class Balancer {
     private void sendRequest(DataOutputStream out) throws IOException {
       final ExtendedBlock eb = new ExtendedBlock(nnc.blockpoolID, block.getBlock());
       final Token<BlockTokenIdentifier> accessToken = nnc.getAccessToken(eb);
-      DataTransferProtocol.Sender.opReplaceBlock(out, eb, source.getStorageID(), 
+      Sender.opReplaceBlock(out, eb, source.getStorageID(), 
           proxySource.getDatanode(), accessToken);
     }
     
     /* Receive a block copy response from the input stream */ 
     private void receiveResponse(DataInputStream in) throws IOException {
-      DataTransferProtocol.Status status = DataTransferProtocol.Status.read(in);
-      if (status != DataTransferProtocol.Status.SUCCESS) {
-        if (status == ERROR_ACCESS_TOKEN)
+      BlockOpResponseProto response = BlockOpResponseProto.parseFrom(
+          vintPrefixed(in));
+      if (response.getStatus() != Status.SUCCESS) {
+        if (response.getStatus() == Status.ERROR_ACCESS_TOKEN)
           throw new IOException("block move failed due to access token error");
         throw new IOException("block move is failed");
       }

Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java?rev=1138160&r1=1138159&r2=1138160&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java Tue Jun 21 20:09:54 2011
@@ -17,8 +17,6 @@
  */
 package org.apache.hadoop.hdfs.server.datanode;
 
-import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status.ERROR;
-import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status.SUCCESS;
 import static org.apache.hadoop.hdfs.server.datanode.DataNode.DN_CLIENTTRACE_FORMAT;
 
 import java.io.BufferedOutputStream;
@@ -36,14 +34,14 @@ import java.util.zip.Checksum;
 
 import org.apache.commons.logging.Log;
 import org.apache.hadoop.fs.FSOutputSummer;
-import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.BlockConstructionStage;
-import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.PacketHeader;
-import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.PipelineAck;
-import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
+import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
+import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.util.DataTransferThrottler;
@@ -996,13 +994,13 @@ class BlockReceiver implements Closeable
             Status[] replies = null;
             if (mirrorError) { // ack read error
               replies = new Status[2];
-              replies[0] = SUCCESS;
-              replies[1] = ERROR;
+              replies[0] = Status.SUCCESS;
+              replies[1] = Status.ERROR;
             } else {
               short ackLen = type == PacketResponderType.LAST_IN_PIPELINE? 0
                   : ack.getNumOfReplies();
               replies = new Status[1+ackLen];
-              replies[0] = SUCCESS;
+              replies[0] = Status.SUCCESS;
               for (int i=0; i<ackLen; i++) {
                 replies[i+1] = ack.getReply(i);
               }

Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java?rev=1138160&r1=1138159&r2=1138160&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java Tue Jun 21 20:09:54 2011
@@ -33,7 +33,7 @@ import org.apache.commons.logging.Log;
 import org.apache.hadoop.fs.ChecksumException;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
-import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.PacketHeader;
+import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
 import org.apache.hadoop.hdfs.util.DataTransferThrottler;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.net.SocketOutputStream;

Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=1138160&r1=1138159&r2=1138160&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java Tue Jun 21 20:09:54 2011
@@ -18,9 +18,32 @@
 package org.apache.hadoop.hdfs.server.datanode;
 
 
-import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
-import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status.ERROR_ACCESS_TOKEN;
-import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status.SUCCESS;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ADMIN;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INITIAL_DELAY_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_HTTPS_NEED_AUTH_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_HTTPS_NEED_AUTH_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_PERMISSION_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_PERMISSION_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HANDLER_COUNT_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HANDLER_COUNT_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HOST_NAME_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_KEYTAB_FILE_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_STORAGEID_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SYNCONCLOSE_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SYNCONCLOSE_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_USER_NAME_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_FEDERATION_NAMESERVICES;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY;
 import static org.apache.hadoop.hdfs.server.common.Util.now;
 
 import java.io.BufferedOutputStream;
@@ -68,15 +91,19 @@ import org.apache.hadoop.hdfs.HdfsConfig
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
 import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
-import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
-import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.BlockConstructionStage;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
+import org.apache.hadoop.hdfs.protocol.HdfsProtoUtil;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
 import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
+import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
+import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol;
+import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DNTransferAckProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
 import org.apache.hadoop.hdfs.security.token.block.BlockPoolTokenSecretManager;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
@@ -133,6 +160,7 @@ import org.apache.hadoop.util.StringUtil
 import org.apache.hadoop.util.VersionInfo;
 import org.mortbay.util.ajax.JSON;
 
+
 /**********************************************************
  * DataNode is a class (and program) that stores a set of
  * blocks for a DFS deployment.  A single deployment can
@@ -1397,6 +1425,10 @@ public class DataNode extends Configured
     return blockPoolManager.getAllNamenodeThreads();
   }
   
+  int getBpOsCount() {
+    return blockPoolManager.getAllNamenodeThreads().length;
+  }
+  
   /**
    * Initializes the {@link #data}. The initialization is done only once, when
    * handshake with the the first namenode is completed.
@@ -1945,7 +1977,7 @@ public class DataNode extends Configured
               EnumSet.of(BlockTokenSecretManager.AccessMode.WRITE));
         }
 
-        DataTransferProtocol.Sender.opWriteBlock(out,
+        Sender.opWriteBlock(out,
             b, 0, stage, 0, 0, 0, clientname, srcNode, targets, accessToken);
 
         // send data & checksum
@@ -1958,12 +1990,13 @@ public class DataNode extends Configured
         // read ack
         if (isClient) {
           in = new DataInputStream(NetUtils.getInputStream(sock));
-          final DataTransferProtocol.Status s = DataTransferProtocol.Status.read(in);
+          DNTransferAckProto closeAck = DNTransferAckProto.parseFrom(
+              HdfsProtoUtil.vintPrefixed(in));
           if (LOG.isDebugEnabled()) {
-            LOG.debug(getClass().getSimpleName() + ": close-ack=" + s);
+            LOG.debug(getClass().getSimpleName() + ": close-ack=" + closeAck);
           }
-          if (s != SUCCESS) {
-            if (s == ERROR_ACCESS_TOKEN) {
+          if (closeAck.getStatus() != Status.SUCCESS) {
+            if (closeAck.getStatus() == Status.ERROR_ACCESS_TOKEN) {
               throw new InvalidBlockTokenException(
                   "Got access token error for connect ack, targets="
                    + Arrays.asList(targets));
@@ -2105,6 +2138,10 @@ public class DataNode extends Configured
     while (shouldRun) {
       try {
         blockPoolManager.joinAll();
+        if (blockPoolManager.getAllNamenodeThreads() != null
+            && blockPoolManager.getAllNamenodeThreads().length == 0) {
+          shouldRun = false;
+        }
         Thread.sleep(2000);
       } catch (InterruptedException ex) {
         LOG.warn("Received exception in Datanode#join: " + ex);
@@ -2243,6 +2280,13 @@ public class DataNode extends Configured
     } catch (Throwable e) {
       LOG.error(StringUtils.stringifyException(e));
       System.exit(-1);
+    } finally {
+      // We need to add System.exit here because either shutdown was called or
+      // some disk related conditions like volumes tolerated or volumes required
+      // condition was not met. Also, In secure mode, control will go to Jsvc
+      // and Datanode process hangs without System.exit.
+      LOG.warn("Exiting Datanode");
+      System.exit(0);
     }
   }
   

Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java?rev=1138160&r1=1138159&r2=1138160&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java Tue Jun 21 20:09:54 2011
@@ -17,9 +17,9 @@
  */
 package org.apache.hadoop.hdfs.server.datanode;
 
-import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status.ERROR;
-import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status.ERROR_ACCESS_TOKEN;
-import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status.SUCCESS;
+import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status.ERROR;
+import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status.ERROR_ACCESS_TOKEN;
+import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status.SUCCESS;
 import static org.apache.hadoop.hdfs.server.common.Util.now;
 import static org.apache.hadoop.hdfs.server.datanode.DataNode.DN_CLIENTTRACE_FORMAT;
 
@@ -39,11 +39,18 @@ import java.util.Arrays;
 
 import org.apache.commons.logging.Log;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
-import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.BlockConstructionStage;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
+import org.apache.hadoop.hdfs.protocol.HdfsProtoUtil;
+import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
+import org.apache.hadoop.hdfs.protocol.datatransfer.Op;
+import org.apache.hadoop.hdfs.protocol.datatransfer.Receiver;
+import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientReadStatusProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants;
@@ -51,20 +58,19 @@ import org.apache.hadoop.hdfs.server.dat
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.MD5Hash;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.metrics2.lib.MutableCounterLong;
-import org.apache.hadoop.metrics2.lib.MutableRate;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.token.SecretManager.InvalidToken;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.DataChecksum;
 import org.apache.hadoop.util.StringUtils;
 
+import com.google.protobuf.ByteString;
+
+
 /**
  * Thread for processing incoming/outgoing data stream.
  */
-class DataXceiver extends DataTransferProtocol.Receiver
-    implements Runnable, FSConstants {
+class DataXceiver extends Receiver implements Runnable, FSConstants {
   public static final Log LOG = DataNode.LOG;
   static final Log ClientTraceLog = DataNode.ClientTraceLog;
   
@@ -123,6 +129,7 @@ class DataXceiver extends DataTransferPr
 
     DataInputStream in=null; 
     int opsProcessed = 0;
+    Op op = null;
     try {
       in = new DataInputStream(
           new BufferedInputStream(NetUtils.getInputStream(s), 
@@ -133,7 +140,6 @@ class DataXceiver extends DataTransferPr
       // This optimistic behaviour allows the other end to reuse connections.
       // Setting keepalive timeout to 0 disable this behavior.
       do {
-        DataTransferProtocol.Op op;
         try {
           if (opsProcessed != 0) {
             assert socketKeepaliveTimeout > 0;
@@ -172,10 +178,12 @@ class DataXceiver extends DataTransferPr
         opStartTime = now();
         processOp(op, in);
         ++opsProcessed;
-      } while (s.isConnected() && socketKeepaliveTimeout > 0);
+      } while (!s.isClosed() && socketKeepaliveTimeout > 0);
     } catch (Throwable t) {
-      LOG.error(datanode.getMachineName() + ":DataXceiver, at " +
-        s.toString(), t);
+      LOG.error(datanode.getMachineName() + ":DataXceiver error processing " +
+                ((op == null) ? "unknown" : op.name()) + " operation " +
+                " src: " + remoteAddress +
+                " dest: " + localAddress, t);
     } finally {
       if (LOG.isDebugEnabled()) {
         LOG.debug(datanode.getMachineName() + ":Number of active connections is: "
@@ -200,8 +208,7 @@ class DataXceiver extends DataTransferPr
     DataOutputStream out = new DataOutputStream(
                  new BufferedOutputStream(baseStream, SMALL_BUFFER_SIZE));
     checkAccess(out, true, block, blockToken,
-        DataTransferProtocol.Op.READ_BLOCK,
-        BlockTokenSecretManager.AccessMode.READ);
+        Op.READ_BLOCK, BlockTokenSecretManager.AccessMode.READ);
   
     // send the block
     BlockSender blockSender = null;
@@ -213,7 +220,7 @@ class DataXceiver extends DataTransferPr
             "%d", "HDFS_READ", clientName, "%d",
             dnR.getStorageID(), block, "%d")
         : dnR + " Served block " + block + " to " +
-            s.getInetAddress();
+            remoteAddress;
 
     updateCurrentThreadName("Sending block " + block);
     try {
@@ -221,19 +228,23 @@ class DataXceiver extends DataTransferPr
         blockSender = new BlockSender(block, startOffset, length,
             true, true, false, datanode, clientTraceFmt);
       } catch(IOException e) {
+        LOG.info("opReadBlock " + block + " received exception " + e);
         sendResponse(s, ERROR, datanode.socketWriteTimeout);
         throw e;
       }
+      
+      // send op status
+      sendResponse(s, SUCCESS, datanode.socketWriteTimeout);
 
-      SUCCESS.write(out); // send op status
       long read = blockSender.sendBlock(out, baseStream, null); // send data
 
       if (blockSender.didSendEntireByteRange()) {
         // If we sent the entire range, then we should expect the client
         // to respond with a Status enum.
         try {
-          DataTransferProtocol.Status stat = DataTransferProtocol.Status.read(in);
-          if (stat == null) {
+          ClientReadStatusProto stat = ClientReadStatusProto.parseFrom(
+              HdfsProtoUtil.vintPrefixed(in));
+          if (!stat.hasStatus()) {
             LOG.warn("Client " + s.getInetAddress() + " did not send a valid status " +
                      "code after reading. Will close connection.");
             IOUtils.closeStream(out);
@@ -248,6 +259,10 @@ class DataXceiver extends DataTransferPr
       datanode.metrics.incrBytesRead((int) read);
       datanode.metrics.incrBlocksRead();
     } catch ( SocketException ignored ) {
+      if (LOG.isTraceEnabled()) {
+        LOG.trace(dnR + ":Ignoring exception while serving " + block + " to " +
+            remoteAddress, ignored);
+      }
       // Its ok for remote side to close the connection anytime.
       datanode.metrics.incrBlocksRead();
       IOUtils.closeStream(out);
@@ -257,7 +272,7 @@ class DataXceiver extends DataTransferPr
        */
       LOG.warn(dnR +  ":Got exception while serving " + 
           block + " to " +
-                s.getInetAddress() + ":\n" + 
+                remoteAddress + ":\n" + 
                 StringUtils.stringifyException(ioe) );
       throw ioe;
     } finally {
@@ -320,8 +335,7 @@ class DataXceiver extends DataTransferPr
             NetUtils.getOutputStream(s, datanode.socketWriteTimeout),
             SMALL_BUFFER_SIZE));
     checkAccess(replyOut, isClient, block, blockToken,
-        DataTransferProtocol.Op.WRITE_BLOCK,
-        BlockTokenSecretManager.AccessMode.WRITE);
+        Op.WRITE_BLOCK, BlockTokenSecretManager.AccessMode.WRITE);
 
     DataOutputStream mirrorOut = null;  // stream to next target
     DataInputStream mirrorIn = null;    // reply from next target
@@ -329,7 +343,7 @@ class DataXceiver extends DataTransferPr
     BlockReceiver blockReceiver = null; // responsible for data handling
     String mirrorNode = null;           // the name:port of next target
     String firstBadLink = "";           // first datanode that failed in connection setup
-    DataTransferProtocol.Status mirrorInStatus = SUCCESS;
+    Status mirrorInStatus = SUCCESS;
     try {
       if (isDatanode || 
           stage != BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) {
@@ -366,7 +380,7 @@ class DataXceiver extends DataTransferPr
                          SMALL_BUFFER_SIZE));
           mirrorIn = new DataInputStream(NetUtils.getInputStream(mirrorSock));
 
-          DataTransferProtocol.Sender.opWriteBlock(mirrorOut, originalBlock,
+          Sender.opWriteBlock(mirrorOut, originalBlock,
               pipelineSize, stage, newGs, minBytesRcvd, maxBytesRcvd, clientname,
               srcDataNode, targets, blockToken);
 
@@ -377,8 +391,10 @@ class DataXceiver extends DataTransferPr
 
           // read connect ack (only for clients, not for replication req)
           if (isClient) {
-            mirrorInStatus = DataTransferProtocol.Status.read(mirrorIn);
-            firstBadLink = Text.readString(mirrorIn);
+            BlockOpResponseProto connectAck =
+              BlockOpResponseProto.parseFrom(HdfsProtoUtil.vintPrefixed(mirrorIn));
+            mirrorInStatus = connectAck.getStatus();
+            firstBadLink = connectAck.getFirstBadLink();
             if (LOG.isDebugEnabled() || mirrorInStatus != SUCCESS) {
               LOG.info("Datanode " + targets.length +
                        " got response for connect ack " +
@@ -389,8 +405,11 @@ class DataXceiver extends DataTransferPr
 
         } catch (IOException e) {
           if (isClient) {
-            ERROR.write(replyOut);
-            Text.writeString(replyOut, mirrorNode);
+            BlockOpResponseProto.newBuilder()
+              .setStatus(ERROR)
+              .setFirstBadLink(mirrorNode)
+              .build()
+              .writeDelimitedTo(replyOut);
             replyOut.flush();
           }
           IOUtils.closeStream(mirrorOut);
@@ -400,6 +419,8 @@ class DataXceiver extends DataTransferPr
           IOUtils.closeSocket(mirrorSock);
           mirrorSock = null;
           if (isClient) {
+            LOG.error(datanode + ":Exception transfering block " +
+                      block + " to mirror " + mirrorNode + ": " + e);
             throw e;
           } else {
             LOG.info(datanode + ":Exception transfering block " +
@@ -417,8 +438,11 @@ class DataXceiver extends DataTransferPr
                    " forwarding connect ack to upstream firstbadlink is " +
                    firstBadLink);
         }
-        mirrorInStatus.write(replyOut);
-        Text.writeString(replyOut, firstBadLink);
+        BlockOpResponseProto.newBuilder()
+          .setStatus(mirrorInStatus)
+          .setFirstBadLink(firstBadLink)
+          .build()
+          .writeDelimitedTo(replyOut);
         replyOut.flush();
       }
 
@@ -433,7 +457,7 @@ class DataXceiver extends DataTransferPr
           if (LOG.isTraceEnabled()) {
             LOG.trace("TRANSFER: send close-ack");
           }
-          SUCCESS.write(replyOut);
+          writeResponse(SUCCESS, replyOut);
         }
       }
 
@@ -458,7 +482,7 @@ class DataXceiver extends DataTransferPr
 
       
     } catch (IOException ioe) {
-      LOG.info("writeBlock " + block + " received exception " + ioe);
+      LOG.info("opWriteBlock " + block + " received exception " + ioe);
       throw ioe;
     } finally {
       // close all opened streams
@@ -480,21 +504,20 @@ class DataXceiver extends DataTransferPr
       final DatanodeInfo[] targets,
       final Token<BlockTokenIdentifier> blockToken) throws IOException {
     checkAccess(null, true, blk, blockToken,
-        DataTransferProtocol.Op.TRANSFER_BLOCK,
-        BlockTokenSecretManager.AccessMode.COPY);
+        Op.TRANSFER_BLOCK, BlockTokenSecretManager.AccessMode.COPY);
 
-    updateCurrentThreadName(DataTransferProtocol.Op.TRANSFER_BLOCK + " " + blk);
+    updateCurrentThreadName(Op.TRANSFER_BLOCK + " " + blk);
 
     final DataOutputStream out = new DataOutputStream(
         NetUtils.getOutputStream(s, datanode.socketWriteTimeout));
     try {
       datanode.transferReplicaForPipelineRecovery(blk, targets, client);
-      SUCCESS.write(out);
+      writeResponse(Status.SUCCESS, out);
     } finally {
       IOUtils.closeStream(out);
     }
   }
-
+  
   /**
    * Get block checksum (MD5 of CRC32).
    */
@@ -504,8 +527,7 @@ class DataXceiver extends DataTransferPr
     final DataOutputStream out = new DataOutputStream(
         NetUtils.getOutputStream(s, datanode.socketWriteTimeout));
     checkAccess(out, true, block, blockToken,
-        DataTransferProtocol.Op.BLOCK_CHECKSUM,
-        BlockTokenSecretManager.AccessMode.READ);
+        Op.BLOCK_CHECKSUM, BlockTokenSecretManager.AccessMode.READ);
     updateCurrentThreadName("Reading metadata for block " + block);
     final MetaDataInputStream metadataIn = 
       datanode.data.getMetaDataInputStream(block);
@@ -530,10 +552,15 @@ class DataXceiver extends DataTransferPr
       }
 
       //write reply
-      SUCCESS.write(out);
-      out.writeInt(bytesPerCRC);
-      out.writeLong(crcPerBlock);
-      md5.write(out);
+      BlockOpResponseProto.newBuilder()
+        .setStatus(SUCCESS)
+        .setChecksumResponse(OpBlockChecksumResponseProto.newBuilder()             
+          .setBytesPerCrc(bytesPerCRC)
+          .setCrcPerBlock(crcPerBlock)
+          .setMd5(ByteString.copyFrom(md5.getDigest()))
+          )
+        .build()
+        .writeDelimitedTo(out);
       out.flush();
     } finally {
       IOUtils.closeStream(out);
@@ -590,7 +617,7 @@ class DataXceiver extends DataTransferPr
           baseStream, SMALL_BUFFER_SIZE));
 
       // send status first
-      SUCCESS.write(reply);
+      writeResponse(SUCCESS, reply);
       // send block content to the target
       long read = blockSender.sendBlock(reply, baseStream, 
                                         dataXceiverServer.balanceThrottler);
@@ -601,6 +628,7 @@ class DataXceiver extends DataTransferPr
       LOG.info("Copied block " + block + " to " + s.getRemoteSocketAddress());
     } catch (IOException ioe) {
       isOpSuccess = false;
+      LOG.info("opCopyBlock " + block + " received exception " + ioe);
       throw ioe;
     } finally {
       dataXceiverServer.balanceThrottler.release();
@@ -653,7 +681,7 @@ class DataXceiver extends DataTransferPr
 
     Socket proxySock = null;
     DataOutputStream proxyOut = null;
-    DataTransferProtocol.Status opStatus = SUCCESS;
+    Status opStatus = SUCCESS;
     BlockReceiver blockReceiver = null;
     DataInputStream proxyReply = null;
     
@@ -671,15 +699,16 @@ class DataXceiver extends DataTransferPr
                      new BufferedOutputStream(baseStream, SMALL_BUFFER_SIZE));
 
       /* send request to the proxy */
-      DataTransferProtocol.Sender.opCopyBlock(proxyOut, block, blockToken);
+      Sender.opCopyBlock(proxyOut, block, blockToken);
 
       // receive the response from the proxy
       proxyReply = new DataInputStream(new BufferedInputStream(
           NetUtils.getInputStream(proxySock), BUFFER_SIZE));
-      final DataTransferProtocol.Status status
-          = DataTransferProtocol.Status.read(proxyReply);
-      if (status != SUCCESS) {
-        if (status == ERROR_ACCESS_TOKEN) {
+      BlockOpResponseProto copyResponse = BlockOpResponseProto.parseFrom(
+          HdfsProtoUtil.vintPrefixed(proxyReply));
+
+      if (copyResponse.getStatus() != SUCCESS) {
+        if (copyResponse.getStatus() == ERROR_ACCESS_TOKEN) {
           throw new IOException("Copy block " + block + " from "
               + proxySock.getRemoteSocketAddress()
               + " failed due to access token error");
@@ -705,6 +734,7 @@ class DataXceiver extends DataTransferPr
       
     } catch (IOException ioe) {
       opStatus = ERROR;
+      LOG.info("opReplaceBlock " + block + " received exception " + ioe);
       throw ioe;
     } finally {
       // receive the last byte that indicates the proxy released its thread resource
@@ -737,29 +767,35 @@ class DataXceiver extends DataTransferPr
     return now() - opStartTime;
   }
 
-  private void updateCounter(MutableCounterLong localCounter,
-      MutableCounterLong remoteCounter) {
-    (isLocal? localCounter: remoteCounter).incr();
-  }
-
   /**
    * Utility function for sending a response.
    * @param s socket to write to
    * @param opStatus status message to write
    * @param timeout send timeout
    **/
-  private void sendResponse(Socket s, DataTransferProtocol.Status opStatus,
+  private void sendResponse(Socket s, Status status,
       long timeout) throws IOException {
     DataOutputStream reply = 
       new DataOutputStream(NetUtils.getOutputStream(s, timeout));
-    opStatus.write(reply);
-    reply.flush();
+    
+    writeResponse(status, reply);
   }
+  
+  private void writeResponse(Status status, OutputStream out)
+  throws IOException {
+    BlockOpResponseProto response = BlockOpResponseProto.newBuilder()
+      .setStatus(status)
+      .build();
+    
+    response.writeDelimitedTo(out);
+    out.flush();
+  }
+  
 
   private void checkAccess(DataOutputStream out, final boolean reply, 
       final ExtendedBlock blk,
       final Token<BlockTokenIdentifier> t,
-      final DataTransferProtocol.Op op,
+      final Op op,
       final BlockTokenSecretManager.AccessMode mode) throws IOException {
     if (datanode.isBlockTokenEnabled) {
       try {
@@ -771,12 +807,15 @@ class DataXceiver extends DataTransferPr
               out = new DataOutputStream(
                   NetUtils.getOutputStream(s, datanode.socketWriteTimeout));
             }
-            ERROR_ACCESS_TOKEN.write(out);
+            
+            BlockOpResponseProto.Builder resp = BlockOpResponseProto.newBuilder()
+              .setStatus(ERROR_ACCESS_TOKEN);
             if (mode == BlockTokenSecretManager.AccessMode.WRITE) {
               DatanodeRegistration dnR = 
                 datanode.getDNRegistrationForBP(blk.getBlockPoolId());
-              Text.writeString(out, dnR.getName());
+              resp.setFirstBadLink(dnR.getName());
             }
+            resp.build().writeDelimitedTo(out);
             out.flush();
           }
           LOG.warn("Block token verification failed: op=" + op

Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java?rev=1138160&r1=1138159&r2=1138160&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java Tue Jun 21 20:09:54 2011
@@ -47,7 +47,7 @@ class DataXceiverServer implements Runna
   
   ServerSocket ss;
   DataNode datanode;
-  // Record all sockets opend for data transfer
+  // Record all sockets opened for data transfer
   Map<Socket, Socket> childSockets = Collections.synchronizedMap(
                                        new HashMap<Socket, Socket>());
   
@@ -140,19 +140,18 @@ class DataXceiverServer implements Runna
       } catch (SocketTimeoutException ignored) {
         // wake up to see if should continue to run
       } catch (IOException ie) {
-        LOG.warn(datanode.getMachineName() + ":DataXceiveServer: " 
-                                + StringUtils.stringifyException(ie));
+        LOG.warn(datanode.getMachineName() + ":DataXceiveServer: ", ie);
       } catch (Throwable te) {
-        LOG.error(datanode.getMachineName() + ":DataXceiveServer: Exiting due to:" 
-                                 + StringUtils.stringifyException(te));
+        LOG.error(datanode.getMachineName()
+            + ":DataXceiveServer: Exiting due to: ", te);
         datanode.shouldRun = false;
       }
     }
     try {
       ss.close();
     } catch (IOException ie) {
-      LOG.warn(datanode.getMachineName() + ":DataXceiveServer: " 
-                              + StringUtils.stringifyException(ie));
+      LOG.warn(datanode.getMachineName()
+          + ":DataXceiveServer: Close exception due to: ", ie);
     }
   }
   

Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/DatanodeJspHelper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/DatanodeJspHelper.java?rev=1138160&r1=1138159&r2=1138160&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/DatanodeJspHelper.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/DatanodeJspHelper.java Tue Jun 21 20:09:54 2011
@@ -241,6 +241,11 @@ public class DatanodeJspHelper {
     }
     datanodePort = Integer.parseInt(datanodePortStr);
 
+    final Long genStamp = JspHelper.validateLong(req.getParameter("genstamp"));
+    if (genStamp == null) {
+      out.print("Invalid input (genstamp absent)");
+      return;
+    }
     String namenodeInfoPortStr = req.getParameter("namenodeInfoPort");
     int namenodeInfoPort = -1;
     if (namenodeInfoPortStr != null)
@@ -322,6 +327,8 @@ public class DatanodeJspHelper {
         + startOffset + "\">");
     out.print("<input type=\"hidden\" name=\"filename\" value=\"" + filename
         + "\">");
+    out.print("<input type=\"hidden\" name=\"genstamp\" value=\"" + genStamp
+        + "\">");
     out.print("<input type=\"hidden\" name=\"datanodePort\" value=\""
         + datanodePort + "\">");
     out.print("<input type=\"hidden\" name=\"namenodeInfoPort\" value=\""

Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java?rev=1138160&r1=1138159&r2=1138160&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java Tue Jun 21 20:09:54 2011
@@ -879,7 +879,8 @@ public class FSDataset implements FSCons
           if (removedVols == null) {
             removedVols = new ArrayList<FSVolume>(1);
           }
-          removedVols.add(volumeList.get(idx));
+          removedVols.add(fsv);
+          fsv.shutdown(); 
           volumeList.set(idx, null); // Remove the volume
           numFailedVolumes++;
         }

Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java?rev=1138160&r1=1138159&r2=1138160&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java Tue Jun 21 20:09:54 2011
@@ -189,7 +189,7 @@ public class BackupNode extends NameNode
   @Override // NamenodeProtocol
   public NamenodeRegistration register(NamenodeRegistration registration
   ) throws IOException {
-    throw new UnsupportedActionException("journal");
+    throw new UnsupportedActionException("register");
   }
 
   @Override // NamenodeProtocol

Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/BlockManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/BlockManager.java?rev=1138160&r1=1138159&r2=1138160&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/BlockManager.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/BlockManager.java Tue Jun 21 20:09:54 2011
@@ -570,6 +570,7 @@ public class BlockManager {
    * dumps the contents of recentInvalidateSets
    */
   private void dumpRecentInvalidateSets(PrintWriter out) {
+    assert namesystem.hasWriteLock();
     int size = recentInvalidateSets.values().size();
     out.println("Metasave: Blocks " + pendingDeletionBlocksCount 
         + " waiting deletion from " + size + " datanodes.");
@@ -1258,7 +1259,7 @@ public class BlockManager {
     // Ignore replicas already scheduled to be removed from the DN
     if(belongsToInvalidates(dn.getStorageID(), block)) {
       assert storedBlock.findDatanode(dn) < 0 : "Block " + block
-        + " in recentInvalidatesSet should not appear in DN " + this;
+        + " in recentInvalidatesSet should not appear in DN " + dn;
       return storedBlock;
     }
 
@@ -1392,9 +1393,9 @@ public class BlockManager {
                                DatanodeDescriptor delNodeHint,
                                boolean logEveryBlock)
   throws IOException {
-    assert (block != null && namesystem.hasWriteLock());
+    assert block != null && namesystem.hasWriteLock();
     BlockInfo storedBlock;
-    if (block.getClass() == BlockInfoUnderConstruction.class) {
+    if (block instanceof BlockInfoUnderConstruction) {
       //refresh our copy in case the block got completed in another thread
       storedBlock = blocksMap.getStoredBlock(block);
     } else {
@@ -1571,6 +1572,7 @@ public class BlockManager {
    */
   void processOverReplicatedBlock(Block block, short replication,
       DatanodeDescriptor addedNode, DatanodeDescriptor delNodeHint) {
+    assert namesystem.hasWriteLock();
     if (addedNode == delNodeHint) {
       delNodeHint = null;
     }
@@ -1596,6 +1598,7 @@ public class BlockManager {
   }
 
   void addToExcessReplicate(DatanodeInfo dn, Block block) {
+    assert namesystem.hasWriteLock();
     Collection<Block> excessBlocks = excessReplicateMap.get(dn.getStorageID());
     if (excessBlocks == null) {
       excessBlocks = new TreeSet<Block>();