You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by sz...@apache.org on 2012/10/19 04:28:07 UTC

svn commit: r1399950 [8/27] - in /hadoop/common/branches/HDFS-2802/hadoop-hdfs-project: ./ hadoop-hdfs-httpfs/ hadoop-hdfs-httpfs/dev-support/ hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/client/ hadoop-hdfs-httpfs/src/main/java/org/apach...

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/LeaseRenewer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/LeaseRenewer.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/LeaseRenewer.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/LeaseRenewer.java Fri Oct 19 02:25:55 2012
@@ -34,6 +34,7 @@ import org.apache.hadoop.hdfs.protocol.H
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.Daemon;
 import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Time;
 
 /**
  * <p>
@@ -156,9 +157,6 @@ class LeaseRenewer {
     }
   }
 
-  private final String clienNamePostfix = DFSUtil.getRandom().nextInt()
-      + "_" + Thread.currentThread().getId();
-
   /** The time in milliseconds that the map became empty. */
   private long emptyTime = Long.MAX_VALUE;
   /** A fixed lease renewal time period in milliseconds */
@@ -212,11 +210,6 @@ class LeaseRenewer {
     return renewal;
   }
 
-  /** @return the client name for the given id. */
-  String getClientName(final String id) {
-    return "DFSClient_" + id + "_" + clienNamePostfix;
-  }
-
   /** Add a client. */
   private synchronized void addClient(final DFSClient dfsc) {
     for(DFSClient c : dfsclients) {
@@ -270,6 +263,11 @@ class LeaseRenewer {
   synchronized boolean isRunning() {
     return daemon != null && daemon.isAlive();
   }
+
+  /** Does this renewer have nothing to renew? */
+  public boolean isEmpty() {
+    return dfsclients.isEmpty();
+  }
   
   /** Used only by tests */
   synchronized String getDaemonName() {
@@ -279,7 +277,7 @@ class LeaseRenewer {
   /** Is the empty period longer than the grace period? */  
   private synchronized boolean isRenewerExpired() {
     return emptyTime != Long.MAX_VALUE
-        && System.currentTimeMillis() - emptyTime > gracePeriod;
+        && Time.now() - emptyTime > gracePeriod;
   }
 
   synchronized void put(final String src, final DFSOutputStream out,
@@ -330,6 +328,9 @@ class LeaseRenewer {
     dfsc.removeFileBeingWritten(src);
 
     synchronized(this) {
+      if (dfsc.isFilesBeingWrittenEmpty()) {
+        dfsclients.remove(dfsc);
+      }
       //update emptyTime if necessary
       if (emptyTime == Long.MAX_VALUE) {
         for(DFSClient c : dfsclients) {
@@ -339,7 +340,7 @@ class LeaseRenewer {
           }
         }
         //discover the first time that all file-being-written maps are empty.
-        emptyTime = System.currentTimeMillis();
+        emptyTime = Time.now();
       }
     }
   }
@@ -354,7 +355,7 @@ class LeaseRenewer {
       }
       if (emptyTime == Long.MAX_VALUE) {
         //discover the first time that the client list is empty.
-        emptyTime = System.currentTimeMillis();
+        emptyTime = Time.now();
       }
     }
 
@@ -427,10 +428,9 @@ class LeaseRenewer {
    * when the lease period is half over.
    */
   private void run(final int id) throws InterruptedException {
-    for(long lastRenewed = System.currentTimeMillis();
-        clientsRunning() && !Thread.interrupted();
+    for(long lastRenewed = Time.now(); !Thread.interrupted();
         Thread.sleep(getSleepPeriod())) {
-      final long elapsed = System.currentTimeMillis() - lastRenewed;
+      final long elapsed = Time.now() - lastRenewed;
       if (elapsed >= getRenewalTime()) {
         try {
           renew();
@@ -438,7 +438,7 @@ class LeaseRenewer {
             LOG.debug("Lease renewer daemon for " + clientsString()
                 + " with renew id " + id + " executed");
           }
-          lastRenewed = System.currentTimeMillis();
+          lastRenewed = Time.now();
         } catch (SocketTimeoutException ie) {
           LOG.warn("Failed to renew lease for " + clientsString() + " for "
               + (elapsed/1000) + " seconds.  Aborting ...", ie);
@@ -468,6 +468,13 @@ class LeaseRenewer {
           //no longer the current daemon or expired
           return;
         }
+
+        // if no clients are in running state or there is no more clients
+        // registered with this renewer, stop the daemon after the grace
+        // period.
+        if (!clientsRunning() && emptyTime == Long.MAX_VALUE) {
+          emptyTime = Time.now();
+        }
       }
     }
   }

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java Fri Oct 19 02:25:55 2012
@@ -57,6 +57,7 @@ import org.apache.hadoop.io.retry.Failov
 import org.apache.hadoop.io.retry.RetryPolicies;
 import org.apache.hadoop.io.retry.RetryPolicy;
 import org.apache.hadoop.io.retry.RetryProxy;
+import org.apache.hadoop.io.retry.RetryUtils;
 import org.apache.hadoop.ipc.ProtobufRpcEngine;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RemoteException;
@@ -68,7 +69,6 @@ import org.apache.hadoop.security.author
 import org.apache.hadoop.tools.GetUserMappingsProtocol;
 
 import com.google.common.base.Preconditions;
-import com.google.protobuf.ServiceException;
 
 /**
  * Create proxy objects to communicate with a remote NN. All remote access to an
@@ -243,99 +243,20 @@ public class NameNodeProxies {
     return new NamenodeProtocolTranslatorPB(proxy);
   }
   
-  /**
-   * Return the default retry policy used in RPC.
-   * 
-   * If dfs.client.retry.policy.enabled == false, use TRY_ONCE_THEN_FAIL.
-   * 
-   * Otherwise, first unwrap ServiceException if possible, and then 
-   * (1) use multipleLinearRandomRetry for
-   *     - SafeModeException, or
-   *     - IOException other than RemoteException, or
-   *     - ServiceException; and
-   * (2) use TRY_ONCE_THEN_FAIL for
-   *     - non-SafeMode RemoteException, or
-   *     - non-IOException.
-   *     
-   * Note that dfs.client.retry.max < 0 is not allowed.
-   */
-  private static RetryPolicy getDefaultRpcRetryPolicy(Configuration conf) {
-    final RetryPolicy multipleLinearRandomRetry = getMultipleLinearRandomRetry(conf);
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("multipleLinearRandomRetry = " + multipleLinearRandomRetry);
-    }
-    if (multipleLinearRandomRetry == null) {
-      //no retry
-      return RetryPolicies.TRY_ONCE_THEN_FAIL;
-    } else {
-      return new RetryPolicy() {
-        @Override
-        public RetryAction shouldRetry(Exception e, int retries, int failovers,
-            boolean isMethodIdempotent) throws Exception {
-          if (e instanceof ServiceException) {
-            //unwrap ServiceException
-            final Throwable cause = e.getCause();
-            if (cause != null && cause instanceof Exception) {
-              e = (Exception)cause;
-            }
-          }
-
-          //see (1) and (2) in the javadoc of this method.
-          final RetryPolicy p;
-          if (e instanceof RemoteException) {
-            final RemoteException re = (RemoteException)e;
-            p = SafeModeException.class.getName().equals(re.getClassName())?
-                multipleLinearRandomRetry: RetryPolicies.TRY_ONCE_THEN_FAIL;
-          } else if (e instanceof IOException || e instanceof ServiceException) {
-            p = multipleLinearRandomRetry;
-          } else { //non-IOException
-            p = RetryPolicies.TRY_ONCE_THEN_FAIL;
-          }
-
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("RETRY " + retries + ") policy="
-                + p.getClass().getSimpleName() + ", exception=" + e);
-          }
-          LOG.info("RETRY " + retries + ") policy="
-              + p.getClass().getSimpleName() + ", exception=" + e);
-          return p.shouldRetry(e, retries, failovers, isMethodIdempotent);
-        }
-      };
-    }
-  }
-
-  /**
-   * Return the MultipleLinearRandomRetry policy specified in the conf,
-   * or null if the feature is disabled.
-   * If the policy is specified in the conf but the policy cannot be parsed,
-   * the default policy is returned.
-   * 
-   * Conf property: N pairs of sleep-time and number-of-retries
-   *   dfs.client.retry.policy = "s1,n1,s2,n2,..."
-   */
-  private static RetryPolicy getMultipleLinearRandomRetry(Configuration conf) {
-    final boolean enabled = conf.getBoolean(
-        DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_ENABLED_KEY,
-        DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_ENABLED_DEFAULT);
-    if (!enabled) {
-      return null;
-    }
-
-    final String policy = conf.get(
-        DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_SPEC_KEY,
-        DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_SPEC_DEFAULT);
-
-    final RetryPolicy r = RetryPolicies.MultipleLinearRandomRetry.parseCommaSeparatedString(policy);
-    return r != null? r: RetryPolicies.MultipleLinearRandomRetry.parseCommaSeparatedString(
-        DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_SPEC_DEFAULT);
-  }
-
   private static ClientProtocol createNNProxyWithClientProtocol(
       InetSocketAddress address, Configuration conf, UserGroupInformation ugi,
       boolean withRetries) throws IOException {
     RPC.setProtocolEngine(conf, ClientNamenodeProtocolPB.class, ProtobufRpcEngine.class);
 
-    final RetryPolicy defaultPolicy = getDefaultRpcRetryPolicy(conf);
+    final RetryPolicy defaultPolicy = 
+        RetryUtils.getDefaultRetryPolicy(
+            conf, 
+            DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_ENABLED_KEY, 
+            DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_ENABLED_DEFAULT, 
+            DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_SPEC_KEY,
+            DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_SPEC_DEFAULT,
+            SafeModeException.class);
+    
     final long version = RPC.getProtocolVersion(ClientNamenodeProtocolPB.class);
     ClientNamenodeProtocolPB proxy = RPC.getProtocolProxy(
         ClientNamenodeProtocolPB.class, version, address, ugi, conf,

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java Fri Oct 19 02:25:55 2012
@@ -33,6 +33,7 @@ import org.apache.hadoop.fs.FSInputCheck
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
+import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
 import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
 import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
@@ -458,7 +459,9 @@ public class RemoteBlockReader extends F
   void sendReadResult(Socket sock, Status statusCode) {
     assert !sentStatusCode : "already sent status code to " + sock;
     try {
-      RemoteBlockReader2.writeReadResult(sock, statusCode);
+      RemoteBlockReader2.writeReadResult(
+          NetUtils.getOutputStream(sock, HdfsServerConstants.WRITE_TIMEOUT),
+          statusCode);
       sentStatusCode = true;
     } catch (IOException e) {
       // It's ok not to be able to send this. But something is probably wrong.
@@ -484,4 +487,11 @@ public class RemoteBlockReader extends F
     throw new UnsupportedOperationException("readDirect unsupported in RemoteBlockReader");
   }
 
+  @Override
+  public IOStreamPair getStreams() {
+    // This class doesn't support encryption, which is the only thing this
+    // method is used for. See HDFS-3637.
+    return null;
+  }
+
 }

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java Fri Oct 19 02:25:55 2012
@@ -32,26 +32,23 @@ import java.nio.channels.ReadableByteCha
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.fs.ChecksumException;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
+import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
 import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
+import org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver;
 import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientReadStatusProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReadOpChecksumInfoProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
+import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
-import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
-import org.apache.hadoop.hdfs.util.DirectBufferPool;
-import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.net.SocketInputWrapper;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.DataChecksum;
 
-import com.google.common.base.Preconditions;
-
 /**
  * This is a wrapper around connection to datanode
  * and understands checksum, offset etc.
@@ -83,15 +80,15 @@ public class RemoteBlockReader2  impleme
 
   static final Log LOG = LogFactory.getLog(RemoteBlockReader2.class);
   
-  Socket dnSock; //for now just sending the status code (e.g. checksumOk) after the read.
+  Socket dnSock;
+  // for now just sending the status code (e.g. checksumOk) after the read.
+  private IOStreamPair ioStreams;
   private final ReadableByteChannel in;
   private DataChecksum checksum;
   
-  private PacketHeader curHeader;
-  private ByteBuffer curPacketBuf = null;
+  private PacketReceiver packetReceiver = new PacketReceiver(true);
   private ByteBuffer curDataSlice = null;
 
-
   /** offset in block of the last chunk received */
   private long lastSeqNo = -1;
 
@@ -99,10 +96,6 @@ public class RemoteBlockReader2  impleme
   private long startOffset;
   private final String filename;
 
-  private static DirectBufferPool bufferPool = new DirectBufferPool();
-  private final ByteBuffer headerBuf = ByteBuffer.allocate(
-      PacketHeader.PKT_HEADER_LEN);
-
   private final int bytesPerChecksum;
   private final int checksumSize;
 
@@ -126,7 +119,7 @@ public class RemoteBlockReader2  impleme
   public synchronized int read(byte[] buf, int off, int len) 
                                throws IOException {
 
-    if (curPacketBuf == null || curDataSlice.remaining() == 0 && bytesNeededToFinish > 0) {
+    if (curDataSlice == null || curDataSlice.remaining() == 0 && bytesNeededToFinish > 0) {
       readNextPacket();
     }
     if (curDataSlice.remaining() == 0) {
@@ -143,7 +136,7 @@ public class RemoteBlockReader2  impleme
 
   @Override
   public int read(ByteBuffer buf) throws IOException {
-    if (curPacketBuf == null || curDataSlice.remaining() == 0 && bytesNeededToFinish > 0) {
+    if (curDataSlice == null || curDataSlice.remaining() == 0 && bytesNeededToFinish > 0) {
       readNextPacket();
     }
     if (curDataSlice.remaining() == 0) {
@@ -161,11 +154,13 @@ public class RemoteBlockReader2  impleme
   }
 
   private void readNextPacket() throws IOException {
-    Preconditions.checkState(curHeader == null || !curHeader.isLastPacketInBlock());
-    
     //Read packet headers.
-    readPacketHeader();
+    packetReceiver.receiveNextPacket(in);
 
+    PacketHeader curHeader = packetReceiver.getHeader();
+    curDataSlice = packetReceiver.getDataSlice();
+    assert curDataSlice.capacity() == curHeader.getDataLen();
+    
     if (LOG.isTraceEnabled()) {
       LOG.trace("DFSClient readNextPacket got header " + curHeader);
     }
@@ -179,17 +174,20 @@ public class RemoteBlockReader2  impleme
     if (curHeader.getDataLen() > 0) {
       int chunks = 1 + (curHeader.getDataLen() - 1) / bytesPerChecksum;
       int checksumsLen = chunks * checksumSize;
-      int bufsize = checksumsLen + curHeader.getDataLen();
+
+      assert packetReceiver.getChecksumSlice().capacity() == checksumsLen :
+        "checksum slice capacity=" + packetReceiver.getChecksumSlice().capacity() + 
+          " checksumsLen=" + checksumsLen;
       
-      resetPacketBuffer(checksumsLen, curHeader.getDataLen());
-  
       lastSeqNo = curHeader.getSeqno();
-      if (bufsize > 0) {
-        readChannelFully(in, curPacketBuf);
-        curPacketBuf.flip();
-        if (verifyChecksum) {
-          verifyPacketChecksums();
-        }
+      if (verifyChecksum && curDataSlice.remaining() > 0) {
+        // N.B.: the checksum error offset reported here is actually
+        // relative to the start of the block, not the start of the file.
+        // This is slightly misleading, but preserves the behavior from
+        // the older BlockReader.
+        checksum.verifyChunkedSums(curDataSlice,
+            packetReceiver.getChecksumSlice(),
+            filename, curHeader.getOffsetInBlock());
       }
       bytesNeededToFinish -= curHeader.getDataLen();
     }    
@@ -206,46 +204,13 @@ public class RemoteBlockReader2  impleme
     if (bytesNeededToFinish <= 0) {
       readTrailingEmptyPacket();
       if (verifyChecksum) {
-        sendReadResult(dnSock, Status.CHECKSUM_OK);
+        sendReadResult(Status.CHECKSUM_OK);
       } else {
-        sendReadResult(dnSock, Status.SUCCESS);
-      }
-    }
-  }
-
-  private void verifyPacketChecksums() throws ChecksumException {
-    // N.B.: the checksum error offset reported here is actually
-    // relative to the start of the block, not the start of the file.
-    // This is slightly misleading, but preserves the behavior from
-    // the older BlockReader.
-    checksum.verifyChunkedSums(curDataSlice, curPacketBuf,
-        filename, curHeader.getOffsetInBlock());
-  }
-
-  private static void readChannelFully(ReadableByteChannel ch, ByteBuffer buf)
-  throws IOException {
-    while (buf.remaining() > 0) {
-      int n = ch.read(buf);
-      if (n < 0) {
-        throw new IOException("Premature EOF reading from " + ch);
+        sendReadResult(Status.SUCCESS);
       }
     }
   }
-
-  private void resetPacketBuffer(int checksumsLen, int dataLen) {
-    int packetLen = checksumsLen + dataLen;
-    if (curPacketBuf == null ||
-        curPacketBuf.capacity() < packetLen) {
-      returnPacketBufToPool();
-      curPacketBuf = bufferPool.getBuffer(packetLen);
-    }
-    curPacketBuf.position(checksumsLen);
-    curDataSlice = curPacketBuf.slice();
-    curDataSlice.limit(dataLen);
-    curPacketBuf.clear();
-    curPacketBuf.limit(checksumsLen + dataLen);
-  }
-
+  
   @Override
   public synchronized long skip(long n) throws IOException {
     /* How can we make sure we don't throw a ChecksumException, at least
@@ -266,23 +231,14 @@ public class RemoteBlockReader2  impleme
     return nSkipped;
   }
 
-  private void readPacketHeader() throws IOException {
-    headerBuf.clear();
-    readChannelFully(in, headerBuf);
-    headerBuf.flip();
-    if (curHeader == null) curHeader = new PacketHeader();
-    curHeader.readFields(headerBuf);
-  }
-
   private void readTrailingEmptyPacket() throws IOException {
     if (LOG.isTraceEnabled()) {
       LOG.trace("Reading empty packet at end of read");
     }
-    headerBuf.clear();
-    readChannelFully(in, headerBuf);
-    headerBuf.flip();
-    PacketHeader trailer = new PacketHeader();
-    trailer.readFields(headerBuf);
+    
+    packetReceiver.receiveNextPacket(in);
+
+    PacketHeader trailer = packetReceiver.getHeader();
     if (!trailer.isLastPacketInBlock() ||
        trailer.getDataLen() != 0) {
       throw new IOException("Expected empty end-of-read packet! Header: " +
@@ -292,9 +248,11 @@ public class RemoteBlockReader2  impleme
 
   protected RemoteBlockReader2(String file, String bpid, long blockId,
       ReadableByteChannel in, DataChecksum checksum, boolean verifyChecksum,
-      long startOffset, long firstChunkOffset, long bytesToRead, Socket dnSock) {
+      long startOffset, long firstChunkOffset, long bytesToRead, Socket dnSock,
+      IOStreamPair ioStreams) {
     // Path is used only for printing block and file information in debug
     this.dnSock = dnSock;
+    this.ioStreams = ioStreams;
     this.in = in;
     this.checksum = checksum;
     this.verifyChecksum = verifyChecksum;
@@ -313,7 +271,7 @@ public class RemoteBlockReader2  impleme
 
   @Override
   public synchronized void close() throws IOException {
-    returnPacketBufToPool();
+    packetReceiver.close();
     
     startOffset = -1;
     checksum = null;
@@ -324,24 +282,6 @@ public class RemoteBlockReader2  impleme
     // in will be closed when its Socket is closed.
   }
   
-  @Override
-  protected void finalize() throws Throwable {
-    try {
-      // just in case it didn't get closed, we
-      // may as well still try to return the buffer
-      returnPacketBufToPool();
-    } finally {
-      super.finalize();
-    }
-  }
-  
-  private void returnPacketBufToPool() {
-    if (curPacketBuf != null) {
-      bufferPool.returnBuffer(curPacketBuf);
-      curPacketBuf = null;
-    }
-  }
-
   /**
    * Take the socket used to talk to the DN.
    */
@@ -369,24 +309,23 @@ public class RemoteBlockReader2  impleme
    * closing our connection (which we will re-open), but won't affect
    * data correctness.
    */
-  void sendReadResult(Socket sock, Status statusCode) {
-    assert !sentStatusCode : "already sent status code to " + sock;
+  void sendReadResult(Status statusCode) {
+    assert !sentStatusCode : "already sent status code to " + dnSock;
     try {
-      writeReadResult(sock, statusCode);
+      writeReadResult(ioStreams.out, statusCode);
       sentStatusCode = true;
     } catch (IOException e) {
       // It's ok not to be able to send this. But something is probably wrong.
       LOG.info("Could not send read status (" + statusCode + ") to datanode " +
-               sock.getInetAddress() + ": " + e.getMessage());
+               dnSock.getInetAddress() + ": " + e.getMessage());
     }
   }
 
   /**
    * Serialize the actual read result on the wire.
    */
-  static void writeReadResult(Socket sock, Status statusCode)
+  static void writeReadResult(OutputStream out, Status statusCode)
       throws IOException {
-    OutputStream out = NetUtils.getOutputStream(sock, HdfsServerConstants.WRITE_TIMEOUT);
     
     ClientReadStatusProto.newBuilder()
       .setStatus(statusCode)
@@ -434,25 +373,32 @@ public class RemoteBlockReader2  impleme
    * @param clientName  Client name
    * @return New BlockReader instance, or null on error.
    */
-  public static BlockReader newBlockReader( Socket sock, String file,
+  public static BlockReader newBlockReader(Socket sock, String file,
                                      ExtendedBlock block,
                                      Token<BlockTokenIdentifier> blockToken,
                                      long startOffset, long len,
                                      int bufferSize, boolean verifyChecksum,
-                                     String clientName)
+                                     String clientName,
+                                     DataEncryptionKey encryptionKey,
+                                     IOStreamPair ioStreams)
                                      throws IOException {
+    
+    ReadableByteChannel ch;
+    if (ioStreams.in instanceof SocketInputWrapper) {
+      ch = ((SocketInputWrapper)ioStreams.in).getReadableByteChannel();
+    } else {
+      ch = (ReadableByteChannel) ioStreams.in;
+    }
+    
     // in and out will be closed when sock is closed (by the caller)
     final DataOutputStream out = new DataOutputStream(new BufferedOutputStream(
-          NetUtils.getOutputStream(sock,
-              HdfsServerConstants.WRITE_TIMEOUT)));
+          ioStreams.out));
     new Sender(out).readBlock(block, blockToken, clientName, startOffset, len);
 
     //
-    // Get bytes in block, set streams
+    // Get bytes in block
     //
-    SocketInputWrapper sin = NetUtils.getInputStream(sock);
-    ReadableByteChannel ch = sin.getReadableByteChannel();
-    DataInputStream in = new DataInputStream(sin);
+    DataInputStream in = new DataInputStream(ioStreams.in);
 
     BlockOpResponseProto status = BlockOpResponseProto.parseFrom(
         vintPrefixed(in));
@@ -474,7 +420,8 @@ public class RemoteBlockReader2  impleme
     }
 
     return new RemoteBlockReader2(file, block.getBlockPoolId(), block.getBlockId(),
-        ch, checksum, verifyChecksum, startOffset, firstChunkOffset, len, sock);
+        ch, checksum, verifyChecksum, startOffset, firstChunkOffset, len, sock,
+        ioStreams);
   }
 
   static void checkSuccess(
@@ -498,4 +445,9 @@ public class RemoteBlockReader2  impleme
       }
     }
   }
+
+  @Override
+  public IOStreamPair getStreams() {
+    return ioStreams;
+  }
 }

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/SocketCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/SocketCache.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/SocketCache.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/SocketCache.java Fri Oct 19 02:25:55 2012
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.hdfs;
 
+import java.io.Closeable;
 import java.net.Socket;
 import java.net.SocketAddress;
 
@@ -25,53 +26,135 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map.Entry;
 
+import java.io.IOException;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.LinkedListMultimap;
 import org.apache.commons.logging.Log;
+import org.apache.hadoop.HadoopIllegalArgumentException;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
 import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.util.Daemon;
+import org.apache.hadoop.util.StringUtils;
 
 /**
- * A cache of sockets.
+ * A cache of input stream sockets to Data Node.
  */
 class SocketCache {
-  static final Log LOG = LogFactory.getLog(SocketCache.class);
+  private static final Log LOG = LogFactory.getLog(SocketCache.class);
 
-  private final LinkedListMultimap<SocketAddress, Socket> multimap;
-  private final int capacity;
+  @InterfaceAudience.Private
+  static class SocketAndStreams implements Closeable {
+    public final Socket sock;
+    public final IOStreamPair ioStreams;
+    long createTime;
+    
+    public SocketAndStreams(Socket s, IOStreamPair ioStreams) {
+      this.sock = s;
+      this.ioStreams = ioStreams;
+      this.createTime = System.currentTimeMillis();
+    }
+    
+    @Override
+    public void close() {
+      if (ioStreams != null) { 
+        IOUtils.closeStream(ioStreams.in);
+        IOUtils.closeStream(ioStreams.out);
+      }
+      IOUtils.closeSocket(sock);
+    }
 
-  /**
-   * Create a SocketCache with the given capacity.
-   * @param capacity  Max cache size.
-   */
-  public SocketCache(int capacity) {
-    multimap = LinkedListMultimap.create();
-    this.capacity = capacity;
-    if (capacity <= 0) {
-      LOG.debug("SocketCache disabled in configuration.");
+    public long getCreateTime() {
+      return this.createTime;
     }
   }
 
+  private Daemon daemon;
+  /** A map for per user per datanode. */
+  private static LinkedListMultimap<SocketAddress, SocketAndStreams> multimap =
+    LinkedListMultimap.create();
+  private static int capacity;
+  private static long expiryPeriod;
+  private static SocketCache scInstance = new SocketCache();
+  private static boolean isInitedOnce = false;
+ 
+  public static synchronized SocketCache getInstance(int c, long e) {
+    // capacity is only initialized once
+    if (isInitedOnce == false) {
+      capacity = c;
+      expiryPeriod = e;
+
+      if (capacity == 0 ) {
+        LOG.info("SocketCache disabled.");
+      }
+      else if (expiryPeriod == 0) {
+        throw new IllegalStateException("Cannot initialize expiryPeriod to " +
+           expiryPeriod + "when cache is enabled.");
+      }
+      isInitedOnce = true;
+    } else { //already initialized once
+      if (capacity != c || expiryPeriod != e) {
+        LOG.info("capacity and expiry periods already set to " + capacity + 
+          " and " + expiryPeriod + " respectively. Cannot set it to " + c + 
+          " and " + e);
+      }
+    }
+
+    return scInstance;
+  }
+
+  private boolean isDaemonStarted() {
+    return (daemon == null)? false: true;
+  }
+
+  private synchronized void startExpiryDaemon() {
+    // start daemon only if not already started
+    if (isDaemonStarted() == true) {
+      return;
+    }
+    
+    daemon = new Daemon(new Runnable() {
+      @Override
+      public void run() {
+        try {
+          SocketCache.this.run();
+        } catch(InterruptedException e) {
+          //noop
+        } finally {
+          SocketCache.this.clear();
+        }
+      }
+
+      @Override
+      public String toString() {
+        return String.valueOf(SocketCache.this);
+      }
+    });
+    daemon.start();
+  }
+
   /**
    * Get a cached socket to the given address.
    * @param remote  Remote address the socket is connected to.
    * @return  A socket with unknown state, possibly closed underneath. Or null.
    */
-  public synchronized Socket get(SocketAddress remote) {
+  public synchronized SocketAndStreams get(SocketAddress remote) {
+
     if (capacity <= 0) { // disabled
       return null;
     }
-    
-    List<Socket> socklist = multimap.get(remote);
-    if (socklist == null) {
+
+    List<SocketAndStreams> sockStreamList = multimap.get(remote);
+    if (sockStreamList == null) {
       return null;
     }
 
-    Iterator<Socket> iter = socklist.iterator();
+    Iterator<SocketAndStreams> iter = sockStreamList.iterator();
     while (iter.hasNext()) {
-      Socket candidate = iter.next();
+      SocketAndStreams candidate = iter.next();
       iter.remove();
-      if (!candidate.isClosed()) {
+      if (!candidate.sock.isClosed()) {
         return candidate;
       }
     }
@@ -82,14 +165,17 @@ class SocketCache {
    * Give an unused socket to the cache.
    * @param sock socket not used by anyone.
    */
-  public synchronized void put(Socket sock) {
+  public synchronized void put(Socket sock, IOStreamPair ioStreams) {
+
+    Preconditions.checkNotNull(sock);
+    SocketAndStreams s = new SocketAndStreams(sock, ioStreams);
     if (capacity <= 0) {
       // Cache disabled.
-      IOUtils.closeSocket(sock);
+      s.close();
       return;
     }
-    
-    Preconditions.checkNotNull(sock);
+ 
+    startExpiryDaemon();
 
     SocketAddress remoteAddr = sock.getRemoteSocketAddress();
     if (remoteAddr == null) {
@@ -102,7 +188,7 @@ class SocketCache {
     if (capacity == multimap.size()) {
       evictOldest();
     }
-    multimap.put(remoteAddr, sock);
+    multimap.put(remoteAddr, s);
   }
 
   public synchronized int size() {
@@ -110,32 +196,67 @@ class SocketCache {
   }
 
   /**
+   * Evict and close sockets older than expiry period from the cache.
+   */
+  private synchronized void evictExpired(long expiryPeriod) {
+    while (multimap.size() != 0) {
+      Iterator<Entry<SocketAddress, SocketAndStreams>> iter =
+        multimap.entries().iterator();
+      Entry<SocketAddress, SocketAndStreams> entry = iter.next();
+      // if oldest socket expired, remove it
+      if (entry == null || 
+        System.currentTimeMillis() - entry.getValue().getCreateTime() < 
+        expiryPeriod) {
+        break;
+      }
+      iter.remove();
+      SocketAndStreams s = entry.getValue();
+      s.close();
+    }
+  }
+
+  /**
    * Evict the oldest entry in the cache.
    */
   private synchronized void evictOldest() {
-    Iterator<Entry<SocketAddress, Socket>> iter =
+    Iterator<Entry<SocketAddress, SocketAndStreams>> iter =
       multimap.entries().iterator();
     if (!iter.hasNext()) {
-      throw new IllegalStateException("Cannot evict from empty cache!");
+      throw new IllegalStateException("Cannot evict from empty cache! " +
+        "capacity: " + capacity);
     }
-    Entry<SocketAddress, Socket> entry = iter.next();
+    Entry<SocketAddress, SocketAndStreams> entry = iter.next();
     iter.remove();
-    Socket sock = entry.getValue();
-    IOUtils.closeSocket(sock);
+    SocketAndStreams s = entry.getValue();
+    s.close();
   }
 
   /**
-   * Empty the cache, and close all sockets.
+   * Periodically check in the cache and expire the entries
+   * older than expiryPeriod minutes
    */
-  public synchronized void clear() {
-    for (Socket sock : multimap.values()) {
-      IOUtils.closeSocket(sock);
+  private void run() throws InterruptedException {
+    for(long lastExpiryTime = System.currentTimeMillis();
+        !Thread.interrupted();
+        Thread.sleep(expiryPeriod)) {
+      final long elapsed = System.currentTimeMillis() - lastExpiryTime;
+      if (elapsed >= expiryPeriod) {
+        evictExpired(expiryPeriod);
+        lastExpiryTime = System.currentTimeMillis();
+      }
     }
-    multimap.clear();
+    clear();
+    throw new InterruptedException("Daemon Interrupted");
   }
 
-  protected void finalize() {
-    clear();
+  /**
+   * Empty the cache, and close all sockets.
+   */
+  private synchronized void clear() {
+    for (SocketAndStreams sockAndStream : multimap.values()) {
+      sockAndStream.close();
+    }
+    multimap.clear();
   }
 
 }

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/Block.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/Block.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/Block.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/Block.java Fri Oct 19 02:25:55 2012
@@ -40,6 +40,7 @@ public class Block implements Writable, 
     WritableFactories.setFactory
       (Block.class,
        new WritableFactory() {
+         @Override
          public Writable newInstance() { return new Block(); }
        });
   }
@@ -146,6 +147,7 @@ public class Block implements Writable, 
 
   /**
    */
+  @Override
   public String toString() {
     return getBlockName() + "_" + getGenerationStamp();
   }

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/BlockListAsLongs.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/BlockListAsLongs.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/BlockListAsLongs.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/BlockListAsLongs.java Fri Oct 19 02:25:55 2012
@@ -148,10 +148,12 @@ public class BlockListAsLongs implements
       this.currentReplicaState = null;
     }
 
+    @Override
     public boolean hasNext() {
       return currentBlockIndex < getNumberOfBlocks();
     }
 
+    @Override
     public Block next() {
       block.set(blockId(currentBlockIndex),
                 blockLength(currentBlockIndex),
@@ -161,6 +163,7 @@ public class BlockListAsLongs implements
       return block;
     }
 
+    @Override
     public void remove() {
       throw new UnsupportedOperationException("Sorry. can't remove.");
     }
@@ -178,6 +181,7 @@ public class BlockListAsLongs implements
   /**
    * Returns an iterator over blocks in the block report. 
    */
+  @Override
   public Iterator<Block> iterator() {
     return getBlockReportIterator();
   }

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java Fri Oct 19 02:25:55 2012
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hdfs.protocol;
 
 import java.io.IOException;
+import java.util.List;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
@@ -106,4 +107,21 @@ public interface ClientDatanodeProtocol 
    */
   BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock block,
       Token<BlockTokenIdentifier> token) throws IOException;
+  
+  /**
+   * Retrieves volume location information about a list of blocks on a datanode.
+   * This is in the form of an opaque {@link VolumeId} for each configured
+   * data directory, which is not guaranteed to be the same across DN restarts.
+   * 
+   * @param blocks
+   *          list of blocks on the local datanode
+   * @param tokens
+   *          block access tokens corresponding to the requested blocks
+   * @return an HdfsBlocksMetadata that associates {@link ExtendedBlock}s with
+   *         data directories
+   * @throws IOException
+   *           if datanode is unreachable, or replica is not found on datanode
+   */
+  HdfsBlocksMetadata getHdfsBlocksMetadata(List<ExtendedBlock> blocks,
+      List<Token<BlockTokenIdentifier>> tokens) throws IOException; 
 }

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java Fri Oct 19 02:25:55 2012
@@ -33,8 +33,6 @@ import org.apache.hadoop.fs.UnresolvedLi
 import org.apache.hadoop.fs.Options.Rename;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.hdfs.protocol.HdfsConstants.UpgradeAction;
-import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport;
 import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException;
 import org.apache.hadoop.hdfs.server.namenode.SafeModeException;
 import org.apache.hadoop.io.EnumSetWritable;
@@ -44,6 +42,7 @@ import org.apache.hadoop.security.Access
 import org.apache.hadoop.security.KerberosInfo;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenInfo;
+import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSelector;
 
@@ -668,6 +667,18 @@ public interface ClientProtocol {
    */
   public void saveNamespace() throws AccessControlException, IOException;
 
+  
+  /**
+   * Roll the edit log.
+   * Requires superuser privileges.
+   * 
+   * @throws AccessControlException if the superuser privilege is violated
+   * @throws IOException if log roll fails
+   * @return the txid of the new segment
+   */
+  @Idempotent
+  public long rollEdits() throws AccessControlException, IOException;
+
   /**
    * Enable/Disable restore failed storage.
    * <p>
@@ -694,16 +705,6 @@ public interface ClientProtocol {
   public void finalizeUpgrade() throws IOException;
 
   /**
-   * Report distributed upgrade progress or force current upgrade to proceed.
-   * 
-   * @param action {@link HdfsConstants.UpgradeAction} to perform
-   * @return upgrade status information or null if no upgrades are in progress
-   * @throws IOException
-   */
-  public UpgradeStatusReport distributedUpgradeProgress(UpgradeAction action) 
-      throws IOException;
-
-  /**
    * @return CorruptFileBlocks, containing a list of corrupt files (with
    *         duplicates if there is more than one corrupt block in a file)
    *         and a cookie
@@ -941,4 +942,11 @@ public interface ClientProtocol {
    */
   public void cancelDelegationToken(Token<DelegationTokenIdentifier> token)
       throws IOException;
+  
+  /**
+   * @return encryption key so a client can encrypt data sent via the
+   *         DataTransferProtocol to/from DataNodes.
+   * @throws IOException
+   */
+  public DataEncryptionKey getDataEncryptionKey() throws IOException;
 }

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DSQuotaExceededException.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DSQuotaExceededException.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DSQuotaExceededException.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DSQuotaExceededException.java Fri Oct 19 02:25:55 2012
@@ -37,6 +37,7 @@ public class DSQuotaExceededException ex
     super(quota, count);
   }
 
+  @Override
   public String getMessage() {
     String msg = super.getMessage();
     if (msg == null) {

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeID.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeID.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeID.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeID.java Fri Oct 19 02:25:55 2012
@@ -37,12 +37,12 @@ import org.apache.hadoop.classification.
 public class DatanodeID implements Comparable<DatanodeID> {
   public static final DatanodeID[] EMPTY_ARRAY = {};
 
-  protected String ipAddr;     // IP address
-  protected String hostName;   // hostname
-  protected String storageID;  // unique per cluster storageID
-  protected int xferPort;      // data streaming port
-  protected int infoPort;      // info server port
-  protected int ipcPort;       // IPC server port
+  private String ipAddr;     // IP address
+  private String hostName;   // hostname
+  private String storageID;  // unique per cluster storageID
+  private int xferPort;      // data streaming port
+  private int infoPort;      // info server port
+  private int ipcPort;       // IPC server port
 
   public DatanodeID(DatanodeID from) {
     this(from.getIpAddr(),
@@ -104,7 +104,7 @@ public class DatanodeID implements Compa
   /**
    * @return IP:ipcPort string
    */
-  public String getIpcAddr() {
+  private String getIpcAddr() {
     return ipAddr + ":" + ipcPort;
   }
 
@@ -123,6 +123,29 @@ public class DatanodeID implements Compa
   }
 
   /**
+   * @return hostname:ipcPort
+   */
+  private String getIpcAddrWithHostname() {
+    return hostName + ":" + ipcPort;
+  }
+
+  /**
+   * @param useHostname true to use the DN hostname, use the IP otherwise
+   * @return name:xferPort
+   */
+  public String getXferAddr(boolean useHostname) {
+    return useHostname ? getXferAddrWithHostname() : getXferAddr();
+  }
+
+  /**
+   * @param useHostname true to use the DN hostname, use the IP otherwise
+   * @return name:ipcPort
+   */
+  public String getIpcAddr(boolean useHostname) {
+    return useHostname ? getIpcAddrWithHostname() : getIpcAddr();
+  }
+
+  /**
    * @return data storage ID.
    */
   public String getStorageID() {
@@ -150,6 +173,7 @@ public class DatanodeID implements Compa
     return ipcPort;
   }
 
+  @Override
   public boolean equals(Object to) {
     if (this == to) {
       return true;
@@ -161,10 +185,12 @@ public class DatanodeID implements Compa
             storageID.equals(((DatanodeID)to).getStorageID()));
   }
   
+  @Override
   public int hashCode() {
     return getXferAddr().hashCode()^ storageID.hashCode();
   }
   
+  @Override
   public String toString() {
     return getXferAddr();
   }
@@ -187,6 +213,7 @@ public class DatanodeID implements Compa
    * @param that
    * @return as specified by Comparable
    */
+  @Override
   public int compareTo(DatanodeID that) {
     return getXferAddr().compareTo(that.getXferAddr());
   }

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java Fri Oct 19 02:25:55 2012
@@ -27,6 +27,7 @@ import org.apache.hadoop.net.NetworkTopo
 import org.apache.hadoop.net.Node;
 import org.apache.hadoop.net.NodeBase;
 import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Time;
 
 /** 
  * This class extends the primary identifier of a Datanode with ephemeral
@@ -36,13 +37,13 @@ import org.apache.hadoop.util.StringUtil
 @InterfaceAudience.Private
 @InterfaceStability.Evolving
 public class DatanodeInfo extends DatanodeID implements Node {
-  protected long capacity;
-  protected long dfsUsed;
-  protected long remaining;
-  protected long blockPoolUsed;
-  protected long lastUpdate;
-  protected int xceiverCount;
-  protected String location = NetworkTopology.DEFAULT_RACK;
+  private long capacity;
+  private long dfsUsed;
+  private long remaining;
+  private long blockPoolUsed;
+  private long lastUpdate;
+  private int xceiverCount;
+  private String location = NetworkTopology.DEFAULT_RACK;
   
   // Datanode administrative states
   public enum AdminStates {
@@ -56,6 +57,7 @@ public class DatanodeInfo extends Datano
       this.value = v;
     }
 
+    @Override
     public String toString() {
       return value;
     }
@@ -79,8 +81,7 @@ public class DatanodeInfo extends Datano
     this.lastUpdate = from.getLastUpdate();
     this.xceiverCount = from.getXceiverCount();
     this.location = from.getNetworkLocation();
-    this.adminState = from.adminState;
-    this.hostName = from.hostName;
+    this.adminState = from.getAdminState();
   }
 
   public DatanodeInfo(DatanodeID nodeID) {
@@ -126,6 +127,7 @@ public class DatanodeInfo extends Datano
   }
   
   /** Network location name */
+  @Override
   public String getName() {
     return getXferAddr();
   }
@@ -200,9 +202,11 @@ public class DatanodeInfo extends Datano
   }
 
   /** network location */
+  @Override
   public synchronized String getNetworkLocation() {return location;}
     
   /** Sets the network location */
+  @Override
   public synchronized void setNetworkLocation(String location) {
     this.location = NodeBase.normalize(location);
   }
@@ -317,7 +321,24 @@ public class DatanodeInfo extends Datano
     }
     return adminState;
   }
-
+ 
+  /**
+   * Check if the datanode is in stale state. Here if 
+   * the namenode has not received heartbeat msg from a 
+   * datanode for more than staleInterval (default value is
+   * {@link DFSConfigKeys#DFS_NAMENODE_STALE_DATANODE_INTERVAL_MILLI_DEFAULT}),
+   * the datanode will be treated as stale node.
+   * 
+   * @param staleInterval
+   *          the time interval for marking the node as stale. If the last
+   *          update time is beyond the given time interval, the node will be
+   *          marked as stale.
+   * @return true if the node is stale
+   */
+  public boolean isStale(long staleInterval) {
+    return (Time.now() - lastUpdate) >= staleInterval;
+  }
+  
   /**
    * Sets the admin state of this node.
    */
@@ -334,13 +355,17 @@ public class DatanodeInfo extends Datano
   private transient Node parent; //its parent
 
   /** Return this node's parent */
+  @Override
   public Node getParent() { return parent; }
+  @Override
   public void setParent(Node parent) {this.parent = parent;}
    
   /** Return this node's level in the tree.
    * E.g. the root of a tree returns 0 and its children return 1
    */
+  @Override
   public int getLevel() { return level; }
+  @Override
   public void setLevel(int level) {this.level = level;}
 
   @Override

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java Fri Oct 19 02:25:55 2012
@@ -60,7 +60,7 @@ public class HdfsConstants {
   public static int MAX_PATH_LENGTH = 8000;
   public static int MAX_PATH_DEPTH = 1000;
 
-  // TODO mb@media-style.com: should be conf injected?
+  // TODO should be conf injected?
   public static final int DEFAULT_DATA_SOCKET_SIZE = 128 * 1024;
   public static final int IO_FILE_BUFFER_SIZE = new HdfsConfiguration().getInt(
       DFSConfigKeys.IO_FILE_BUFFER_SIZE_KEY,
@@ -85,16 +85,6 @@ public class HdfsConstants {
   public static final long INVALID_TXID = -12345;
 
   /**
-   * Distributed upgrade actions:
-   * 
-   * 1. Get upgrade status. 2. Get detailed upgrade status. 3. Proceed with the
-   * upgrade if it is stuck, no matter what the status is.
-   */
-  public static enum UpgradeAction {
-    GET_STATUS, DETAILED_STATUS, FORCE_PROCEED;
-  }
-
-  /**
    * URI Scheme for hdfs://namenode/ URIs.
    */
   public static final String HDFS_URI_SCHEME = "hdfs";

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsProtoUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsProtoUtil.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsProtoUtil.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsProtoUtil.java Fri Oct 19 02:25:55 2012
@@ -29,6 +29,7 @@ import org.apache.hadoop.hdfs.protocol.p
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.util.ExactSizeInputStream;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.util.DataChecksum;
 import org.apache.hadoop.security.token.Token;
 
 import com.google.common.collect.Lists;
@@ -155,6 +156,14 @@ public abstract class HdfsProtoUtil {
     return ret;
   }
 
+  public static DataChecksum.Type fromProto(HdfsProtos.ChecksumTypeProto type) {
+    return DataChecksum.Type.valueOf(type.name());
+  }
+
+  public static HdfsProtos.ChecksumTypeProto toProto(DataChecksum.Type type) {
+    return HdfsProtos.ChecksumTypeProto.valueOf(type.name());
+  }
+
   public static InputStream vintPrefixed(final InputStream input)
   throws IOException {
     final int firstByte = input.read();
@@ -167,4 +176,4 @@ public abstract class HdfsProtoUtil {
   
     return new ExactSizeInputStream(input, size);
   }
-}
\ No newline at end of file
+}

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlocks.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlocks.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlocks.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlocks.java Fri Oct 19 02:25:55 2012
@@ -113,6 +113,7 @@ public class LocatedBlocks {
     Comparator<LocatedBlock> comp = 
       new Comparator<LocatedBlock>() {
         // Returns 0 iff a is inside b or b is inside a
+        @Override
         public int compare(LocatedBlock a, LocatedBlock b) {
           long aBeg = a.getStartOffset();
           long bBeg = b.getStartOffset();

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/NSQuotaExceededException.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/NSQuotaExceededException.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/NSQuotaExceededException.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/NSQuotaExceededException.java Fri Oct 19 02:25:55 2012
@@ -36,6 +36,7 @@ public final class NSQuotaExceededExcept
     super(quota, count);
   }
 
+  @Override
   public String getMessage() {
     String msg = super.getMessage();
     if (msg == null) {

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/QuotaExceededException.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/QuotaExceededException.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/QuotaExceededException.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/QuotaExceededException.java Fri Oct 19 02:25:55 2012
@@ -58,6 +58,7 @@ public class QuotaExceededException exte
     this.pathName = path;
   }
   
+  @Override
   public String getMessage() {
     return super.getMessage();
   }

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtoUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtoUtil.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtoUtil.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtoUtil.java Fri Oct 19 02:25:55 2012
@@ -24,16 +24,13 @@ import org.apache.hadoop.hdfs.protocol.E
 import org.apache.hadoop.hdfs.protocol.HdfsProtoUtil;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BaseHeaderProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ChecksumProto;
-import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ChecksumProto.ChecksumType;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientOperationHeaderProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProto;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ChecksumTypeProto;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.DataChecksum;
 
-import com.google.common.collect.BiMap;
-import com.google.common.collect.ImmutableBiMap;
-
 
 /**
  * Static utilities for dealing with the protocol buffers used by the
@@ -42,19 +39,6 @@ import com.google.common.collect.Immutab
 @InterfaceAudience.Private
 @InterfaceStability.Evolving
 public abstract class DataTransferProtoUtil {
-
-  /**
-   * Map between the internal DataChecksum identifiers and the protobuf-
-   * generated identifiers on the wire.
-   */
-  static BiMap<Integer, ChecksumProto.ChecksumType> checksumTypeMap =
-    ImmutableBiMap.<Integer, ChecksumProto.ChecksumType>builder()
-      .put(DataChecksum.CHECKSUM_CRC32, ChecksumProto.ChecksumType.CRC32)
-      .put(DataChecksum.CHECKSUM_CRC32C, ChecksumProto.ChecksumType.CRC32C)
-      .put(DataChecksum.CHECKSUM_NULL, ChecksumProto.ChecksumType.NULL)
-      .build();
-
-  
   static BlockConstructionStage fromProto(
       OpWriteBlockProto.BlockConstructionStage stage) {
     return BlockConstructionStage.valueOf(BlockConstructionStage.class,
@@ -68,7 +52,7 @@ public abstract class DataTransferProtoU
   }
 
   public static ChecksumProto toProto(DataChecksum checksum) {
-    ChecksumType type = checksumTypeMap.get(checksum.getChecksumType());
+    ChecksumTypeProto type = ChecksumTypeProto.valueOf(checksum.getChecksumType().name());
     if (type == null) {
       throw new IllegalArgumentException(
           "Can't convert checksum to protobuf: " + checksum);
@@ -84,7 +68,7 @@ public abstract class DataTransferProtoU
     if (proto == null) return null;
 
     int bytesPerChecksum = proto.getBytesPerChecksum();
-    int type = checksumTypeMap.inverse().get(proto.getType());
+    DataChecksum.Type type = DataChecksum.Type.valueOf(proto.getType().name());
     
     return DataChecksum.newDataChecksum(type, bytesPerChecksum);
   }

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketHeader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketHeader.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketHeader.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketHeader.java Fri Oct 19 02:25:55 2012
@@ -27,14 +27,31 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.PacketHeaderProto;
 import org.apache.hadoop.hdfs.util.ByteBufferOutputStream;
 
+import com.google.common.base.Preconditions;
+import com.google.common.primitives.Shorts;
+import com.google.common.primitives.Ints;
+import com.google.protobuf.InvalidProtocolBufferException;
+
 /**
  * Header data for each packet that goes through the read/write pipelines.
+ * Includes all of the information about the packet, excluding checksums and
+ * actual data.
+ * 
+ * This data includes:
+ *  - the offset in bytes into the HDFS block of the data in this packet
+ *  - the sequence number of this packet in the pipeline
+ *  - whether or not this is the last packet in the pipeline
+ *  - the length of the data in this packet
+ *  - whether or not this packet should be synced by the DNs.
+ *  
+ * When serialized, this header is written out as a protocol buffer, preceded
+ * by a 4-byte integer representing the full packet length, and a 2-byte short
+ * representing the header length.
  */
 @InterfaceAudience.Private
 @InterfaceStability.Evolving
 public class PacketHeader {
-  /** Header size for a packet */
-  private static final int PROTO_SIZE = 
+  private static final int MAX_PROTO_SIZE = 
     PacketHeaderProto.newBuilder()
       .setOffsetInBlock(0)
       .setSeqno(0)
@@ -42,8 +59,10 @@ public class PacketHeader {
       .setDataLen(0)
       .setSyncBlock(false)
       .build().getSerializedSize();
-  public static final int PKT_HEADER_LEN =
-    6 + PROTO_SIZE;
+  public static final int PKT_LENGTHS_LEN =
+      Ints.BYTES + Shorts.BYTES;
+  public static final int PKT_MAX_HEADER_LEN =
+      PKT_LENGTHS_LEN + MAX_PROTO_SIZE;
 
   private int packetLen;
   private PacketHeaderProto proto;
@@ -54,13 +73,25 @@ public class PacketHeader {
   public PacketHeader(int packetLen, long offsetInBlock, long seqno,
                       boolean lastPacketInBlock, int dataLen, boolean syncBlock) {
     this.packetLen = packetLen;
-    proto = PacketHeaderProto.newBuilder()
+    Preconditions.checkArgument(packetLen >= Ints.BYTES,
+        "packet len %s should always be at least 4 bytes",
+        packetLen);
+    
+    PacketHeaderProto.Builder builder = PacketHeaderProto.newBuilder()
       .setOffsetInBlock(offsetInBlock)
       .setSeqno(seqno)
       .setLastPacketInBlock(lastPacketInBlock)
-      .setDataLen(dataLen)
-      .setSyncBlock(syncBlock)
-      .build();
+      .setDataLen(dataLen);
+      
+    if (syncBlock) {
+      // Only set syncBlock if it is specified.
+      // This is wire-incompatible with Hadoop 2.0.0-alpha due to HDFS-3721
+      // because it changes the length of the packet header, and BlockReceiver
+      // in that version did not support variable-length headers.
+      builder.setSyncBlock(syncBlock);
+    }
+      
+    proto = builder.build();
   }
 
   public int getDataLen() {
@@ -90,10 +121,16 @@ public class PacketHeader {
   @Override
   public String toString() {
     return "PacketHeader with packetLen=" + packetLen +
-      "Header data: " + 
+      " header data: " + 
       proto.toString();
   }
   
+  public void setFieldsFromData(
+      int packetLen, byte[] headerData) throws InvalidProtocolBufferException {
+    this.packetLen = packetLen;
+    proto = PacketHeaderProto.parseFrom(headerData);
+  }
+  
   public void readFields(ByteBuffer buf) throws IOException {
     packetLen = buf.getInt();
     short protoLen = buf.getShort();
@@ -110,14 +147,21 @@ public class PacketHeader {
     proto = PacketHeaderProto.parseFrom(data);
   }
 
+  /**
+   * @return the number of bytes necessary to write out this header,
+   * including the length-prefixing of the payload and header
+   */
+  public int getSerializedSize() {
+    return PKT_LENGTHS_LEN + proto.getSerializedSize();
+  }
 
   /**
    * Write the header into the buffer.
    * This requires that PKT_HEADER_LEN bytes are available.
    */
   public void putInBuffer(final ByteBuffer buf) {
-    assert proto.getSerializedSize() == PROTO_SIZE
-      : "Expected " + (PROTO_SIZE) + " got: " + proto.getSerializedSize();
+    assert proto.getSerializedSize() <= MAX_PROTO_SIZE
+      : "Expected " + (MAX_PROTO_SIZE) + " got: " + proto.getSerializedSize();
     try {
       buf.putInt(packetLen);
       buf.putShort((short) proto.getSerializedSize());
@@ -128,12 +172,18 @@ public class PacketHeader {
   }
   
   public void write(DataOutputStream out) throws IOException {
-    assert proto.getSerializedSize() == PROTO_SIZE
-    : "Expected " + (PROTO_SIZE) + " got: " + proto.getSerializedSize();
+    assert proto.getSerializedSize() <= MAX_PROTO_SIZE
+    : "Expected " + (MAX_PROTO_SIZE) + " got: " + proto.getSerializedSize();
     out.writeInt(packetLen);
     out.writeShort(proto.getSerializedSize());
     proto.writeTo(out);
   }
+  
+  public byte[] getBytes() {
+    ByteBuffer buf = ByteBuffer.allocate(getSerializedSize());
+    putInBuffer(buf);
+    return buf.array();
+  }
 
   /**
    * Perform a sanity check on the packet, returning true if it is sane.

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PipelineAck.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PipelineAck.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PipelineAck.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PipelineAck.java Fri Oct 19 02:25:55 2012
@@ -30,6 +30,8 @@ import org.apache.hadoop.hdfs.protocol.p
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.PipelineAckProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
 
+import com.google.protobuf.TextFormat;
+
 /** Pipeline Acknowledgment **/
 @InterfaceAudience.Private
 @InterfaceStability.Evolving
@@ -120,6 +122,6 @@ public class PipelineAck {
   
   @Override //Object
   public String toString() {
-    return proto.toString();
+    return TextFormat.shortDebugString(proto);
   }
 }

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java Fri Oct 19 02:25:55 2012
@@ -38,10 +38,10 @@ import org.apache.hadoop.hdfs.protocol.p
 @InterfaceAudience.Private
 @InterfaceStability.Evolving
 public abstract class Receiver implements DataTransferProtocol {
-  protected final DataInputStream in;
-
-  /** Create a receiver for DataTransferProtocol with a socket. */
-  protected Receiver(final DataInputStream in) {
+  protected DataInputStream in;
+  
+  /** Initialize a receiver for DataTransferProtocol with a socket. */
+  protected void initialize(final DataInputStream in) {
     this.in = in;
   }
 

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolServerSideTranslatorPB.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolServerSideTranslatorPB.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolServerSideTranslatorPB.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolServerSideTranslatorPB.java Fri Oct 19 02:25:55 2012
@@ -18,19 +18,31 @@
 package org.apache.hadoop.hdfs.protocolPB;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
 import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata;
 import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.DeleteBlockPoolRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.DeleteBlockPoolResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetBlockLocalPathInfoRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetBlockLocalPathInfoResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetHdfsBlockLocationsRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetHdfsBlockLocationsResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetHdfsBlockLocationsResponseProto.Builder;
 import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetReplicaVisibleLengthRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetReplicaVisibleLengthResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.RefreshNamenodesRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.RefreshNamenodesResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockTokenIdentifierProto;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ExtendedBlockProto;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.security.token.Token;
 
+import com.google.protobuf.ByteString;
 import com.google.protobuf.RpcController;
 import com.google.protobuf.ServiceException;
 
@@ -106,4 +118,38 @@ public class ClientDatanodeProtocolServe
         .setLocalPath(resp.getBlockPath()).setLocalMetaPath(resp.getMetaPath())
         .build();
   }
+
+  @Override
+  public GetHdfsBlockLocationsResponseProto getHdfsBlockLocations(
+      RpcController controller, GetHdfsBlockLocationsRequestProto request)
+      throws ServiceException {
+    HdfsBlocksMetadata resp;
+    try {
+      // Construct the Lists to make the actual call
+      List<ExtendedBlock> blocks = 
+          new ArrayList<ExtendedBlock>(request.getBlocksCount());
+      for (ExtendedBlockProto b : request.getBlocksList()) {
+        blocks.add(PBHelper.convert(b));
+      }
+      List<Token<BlockTokenIdentifier>> tokens = 
+          new ArrayList<Token<BlockTokenIdentifier>>(request.getTokensCount());
+      for (BlockTokenIdentifierProto b : request.getTokensList()) {
+        tokens.add(PBHelper.convert(b));
+      }
+      // Call the real implementation
+      resp = impl.getHdfsBlocksMetadata(blocks, tokens);
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+    List<ByteString> volumeIdsByteStrings = 
+        new ArrayList<ByteString>(resp.getVolumeIds().size());
+    for (byte[] b : resp.getVolumeIds()) {
+      volumeIdsByteStrings.add(ByteString.copyFrom(b));
+    }
+    // Build and return the response
+    Builder builder = GetHdfsBlockLocationsResponseProto.newBuilder();
+    builder.addAllVolumeIds(volumeIdsByteStrings);
+    builder.addAllVolumeIndexes(resp.getVolumeIndexes());
+    return builder.build();
+  }
 }

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java Fri Oct 19 02:25:55 2012
@@ -20,6 +20,8 @@ package org.apache.hadoop.hdfs.protocolP
 import java.io.Closeable;
 import java.io.IOException;
 import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
 
 import javax.net.SocketFactory;
 
@@ -33,12 +35,17 @@ import org.apache.hadoop.hdfs.protocol.B
 import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.DeleteBlockPoolRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetBlockLocalPathInfoRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetBlockLocalPathInfoResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetHdfsBlockLocationsRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetHdfsBlockLocationsResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetReplicaVisibleLengthRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.RefreshNamenodesRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockTokenIdentifierProto;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ExtendedBlockProto;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.ipc.ProtobufHelper;
 import org.apache.hadoop.ipc.ProtobufRpcEngine;
@@ -50,6 +57,7 @@ import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 
+import com.google.protobuf.ByteString;
 import com.google.protobuf.RpcController;
 import com.google.protobuf.ServiceException;
 
@@ -73,10 +81,10 @@ public class ClientDatanodeProtocolTrans
       RefreshNamenodesRequestProto.newBuilder().build();
 
   public ClientDatanodeProtocolTranslatorPB(DatanodeID datanodeid,
-      Configuration conf, int socketTimeout, LocatedBlock locatedBlock)
-      throws IOException {
+      Configuration conf, int socketTimeout, boolean connectToDnViaHostname,
+      LocatedBlock locatedBlock) throws IOException {
     rpcProxy = createClientDatanodeProtocolProxy( datanodeid, conf, 
-                  socketTimeout, locatedBlock);
+                  socketTimeout, connectToDnViaHostname, locatedBlock);
   }
   
   public ClientDatanodeProtocolTranslatorPB(InetSocketAddress addr,
@@ -90,11 +98,17 @@ public class ClientDatanodeProtocolTrans
    * @param datanodeid Datanode to connect to.
    * @param conf Configuration.
    * @param socketTimeout Socket timeout to use.
+   * @param connectToDnViaHostname connect to the Datanode using its hostname
    * @throws IOException
    */
   public ClientDatanodeProtocolTranslatorPB(DatanodeID datanodeid,
-      Configuration conf, int socketTimeout) throws IOException {
-    InetSocketAddress addr = NetUtils.createSocketAddr(datanodeid.getIpcAddr());
+      Configuration conf, int socketTimeout, boolean connectToDnViaHostname)
+      throws IOException {
+    final String dnAddr = datanodeid.getIpcAddr(connectToDnViaHostname);
+    InetSocketAddress addr = NetUtils.createSocketAddr(dnAddr);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Connecting to datanode " + dnAddr + " addr=" + addr);
+    }
     rpcProxy = createClientDatanodeProtocolProxy(addr,
         UserGroupInformation.getCurrentUser(), conf,
         NetUtils.getDefaultSocketFactory(conf), socketTimeout);
@@ -102,10 +116,11 @@ public class ClientDatanodeProtocolTrans
 
   static ClientDatanodeProtocolPB createClientDatanodeProtocolProxy(
       DatanodeID datanodeid, Configuration conf, int socketTimeout,
-      LocatedBlock locatedBlock) throws IOException {
-    InetSocketAddress addr = NetUtils.createSocketAddr(datanodeid.getIpcAddr());
+      boolean connectToDnViaHostname, LocatedBlock locatedBlock) throws IOException {
+    final String dnAddr = datanodeid.getIpcAddr(connectToDnViaHostname);
+    InetSocketAddress addr = NetUtils.createSocketAddr(dnAddr);
     if (LOG.isDebugEnabled()) {
-      LOG.debug("ClientDatanodeProtocol addr=" + addr);
+      LOG.debug("Connecting to datanode " + dnAddr + " addr=" + addr);
     }
     
     // Since we're creating a new UserGroupInformation here, we know that no
@@ -200,4 +215,44 @@ public class ClientDatanodeProtocolTrans
   public Object getUnderlyingProxyObject() {
     return rpcProxy;
   }
-}
\ No newline at end of file
+
+  @Override
+  public HdfsBlocksMetadata getHdfsBlocksMetadata(List<ExtendedBlock> blocks,
+      List<Token<BlockTokenIdentifier>> tokens) throws IOException {
+    // Convert to proto objects
+    List<ExtendedBlockProto> blocksProtos = 
+        new ArrayList<ExtendedBlockProto>(blocks.size());
+    List<BlockTokenIdentifierProto> tokensProtos = 
+        new ArrayList<BlockTokenIdentifierProto>(tokens.size());
+    for (ExtendedBlock b : blocks) {
+      blocksProtos.add(PBHelper.convert(b));
+    }
+    for (Token<BlockTokenIdentifier> t : tokens) {
+      tokensProtos.add(PBHelper.convert(t));
+    }
+    // Build the request
+    GetHdfsBlockLocationsRequestProto request = 
+        GetHdfsBlockLocationsRequestProto.newBuilder()
+        .addAllBlocks(blocksProtos)
+        .addAllTokens(tokensProtos)
+        .build();
+    // Send the RPC
+    GetHdfsBlockLocationsResponseProto response;
+    try {
+      response = rpcProxy.getHdfsBlockLocations(NULL_CONTROLLER, request);
+    } catch (ServiceException e) {
+      throw ProtobufHelper.getRemoteException(e);
+    }
+    // List of volumes in the response
+    List<ByteString> volumeIdsByteStrings = response.getVolumeIdsList();
+    List<byte[]> volumeIds = new ArrayList<byte[]>(volumeIdsByteStrings.size());
+    for (ByteString bs : volumeIdsByteStrings) {
+      volumeIds.add(bs.toByteArray());
+    }
+    // Array of indexes into the list of volumes, one per block
+    List<Integer> volumeIndexes = response.getVolumeIndexesList();
+    // Parsed HdfsVolumeId values, one per block
+    return new HdfsBlocksMetadata(blocks.toArray(new ExtendedBlock[] {}), 
+        volumeIds, volumeIndexes);
+  }
+}

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java Fri Oct 19 02:25:55 2012
@@ -50,14 +50,14 @@ import org.apache.hadoop.hdfs.protocol.p
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CreateSymlinkResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.DeleteRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.DeleteResponseProto;
-import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.DistributedUpgradeProgressRequestProto;
-import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.DistributedUpgradeProgressResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.FinalizeUpgradeRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.FinalizeUpgradeResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.FsyncRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.FsyncResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetAdditionalDatanodeRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetAdditionalDatanodeResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetDataEncryptionKeyRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetDataEncryptionKeyResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetBlockLocationsRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetBlockLocationsResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetBlockLocationsResponseProto.Builder;
@@ -103,6 +103,8 @@ import org.apache.hadoop.hdfs.protocol.p
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ReportBadBlocksResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RestoreFailedStorageRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RestoreFailedStorageResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RollEditsRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RollEditsResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SaveNamespaceRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SaveNamespaceResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetBalancerBandwidthRequestProto;
@@ -127,7 +129,7 @@ import org.apache.hadoop.hdfs.protocol.p
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeIDProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeInfoProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.LocatedBlockProto;
-import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport;
+import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
 import org.apache.hadoop.io.Text;
 
 import com.google.protobuf.RpcController;
@@ -537,6 +539,20 @@ public class ClientNamenodeProtocolServe
     }
 
   }
+  
+  @Override
+  public RollEditsResponseProto rollEdits(RpcController controller,
+      RollEditsRequestProto request) throws ServiceException {
+    try {
+      long txid = server.rollEdits();
+      return RollEditsResponseProto.newBuilder()
+          .setNewSegmentTxId(txid)
+          .build();
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+  }
+
 
   static final RefreshNodesResponseProto VOID_REFRESHNODES_RESPONSE = 
       RefreshNodesResponseProto.newBuilder().build();
@@ -568,24 +584,6 @@ public class ClientNamenodeProtocolServe
   }
 
   @Override
-  public DistributedUpgradeProgressResponseProto distributedUpgradeProgress(
-      RpcController controller, DistributedUpgradeProgressRequestProto req)
-      throws ServiceException {
-    try {
-      UpgradeStatusReport result = server.distributedUpgradeProgress(PBHelper
-          .convert(req.getAction()));
-      DistributedUpgradeProgressResponseProto.Builder builder = 
-          DistributedUpgradeProgressResponseProto.newBuilder();
-      if (result != null) {
-        builder.setReport(PBHelper.convert(result));
-      }
-      return builder.build();
-    } catch (IOException e) {
-      throw new ServiceException(e);
-    }
-  }
-
-  @Override
   public ListCorruptFileBlocksResponseProto listCorruptFileBlocks(
       RpcController controller, ListCorruptFileBlocksRequestProto req)
       throws ServiceException {
@@ -830,4 +828,18 @@ public class ClientNamenodeProtocolServe
       throw new ServiceException(e);
     }
   }
+
+  @Override
+  public GetDataEncryptionKeyResponseProto getDataEncryptionKey(
+      RpcController controller, GetDataEncryptionKeyRequestProto request)
+      throws ServiceException {
+    try {
+      DataEncryptionKey encryptionKey = server.getDataEncryptionKey();
+      return GetDataEncryptionKeyResponseProto.newBuilder()
+          .setDataEncryptionKey(PBHelper.convert(encryptionKey))
+          .build();
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+  }
 }