You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by to...@apache.org on 2012/08/10 00:30:33 UTC

svn commit: r1371518 [3/6] - in /hadoop/common/branches/HDFS-3077/hadoop-hdfs-project: hadoop-hdfs-httpfs/dev-support/ hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/client/ hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/server/...

Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java?rev=1371518&r1=1371517&r2=1371518&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java Thu Aug  9 22:29:36 2012
@@ -24,11 +24,12 @@ import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.io.InputStream;
 import java.io.InterruptedIOException;
+import java.io.OutputStream;
 import java.net.InetSocketAddress;
 import java.net.Socket;
 import java.nio.BufferOverflowException;
-import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.EnumSet;
@@ -56,6 +57,9 @@ import org.apache.hadoop.hdfs.protocol.N
 import org.apache.hadoop.hdfs.protocol.UnresolvedPathException;
 import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
 import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol;
+import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferEncryptor;
+import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
+import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
 import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
 import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck;
 import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
@@ -121,7 +125,7 @@ public class DFSOutputStream extends FSO
   private long lastQueuedSeqno = -1;
   private long lastAckedSeqno = -1;
   private long bytesCurBlock = 0; // bytes writen in current block
-  private int packetSize = 0; // write packet size, including the header.
+  private int packetSize = 0; // write packet size, not including the header.
   private int chunksPerPacket = 0;
   private volatile IOException lastException = null;
   private long artificialSlowdown = 0;
@@ -142,28 +146,31 @@ public class DFSOutputStream extends FSO
     int     numChunks;           // number of chunks currently in packet
     int     maxChunks;           // max chunks in packet
 
-    /** buffer for accumulating packet checksum and data */
-    ByteBuffer buffer; // wraps buf, only one of these two may be non-null
     byte[]  buf;
 
     /**
      * buf is pointed into like follows:
      *  (C is checksum data, D is payload data)
      *
-     * [HHHHHCCCCC________________DDDDDDDDDDDDDDDD___]
-     *       ^    ^               ^               ^
-     *       |    checksumPos     dataStart       dataPos
-     *   checksumStart
+     * [_________CCCCCCCCC________________DDDDDDDDDDDDDDDD___]
+     *           ^        ^               ^               ^
+     *           |        checksumPos     dataStart       dataPos
+     *           checksumStart
+     * 
+     * Right before sending, we move the checksum data to immediately precede
+     * the actual data, and then insert the header into the buffer immediately
+     * preceding the checksum data, so we make sure to keep enough space in
+     * front of the checksum data to support the largest conceivable header. 
      */
     int checksumStart;
+    int checksumPos;
     int dataStart;
     int dataPos;
-    int checksumPos;
 
     private static final long HEART_BEAT_SEQNO = -1L;
 
     /**
-     *  create a heartbeat packet
+     * Create a heartbeat packet.
      */
     Packet() {
       this.lastPacketInBlock = false;
@@ -171,17 +178,19 @@ public class DFSOutputStream extends FSO
       this.offsetInBlock = 0;
       this.seqno = HEART_BEAT_SEQNO;
       
-      buffer = null;
-      int packetSize = PacketHeader.PKT_HEADER_LEN + HdfsConstants.BYTES_IN_INTEGER;
-      buf = new byte[packetSize];
+      buf = new byte[PacketHeader.PKT_MAX_HEADER_LEN];
       
-      checksumStart = dataStart = packetSize;
-      checksumPos = checksumStart;
-      dataPos = dataStart;
+      checksumStart = checksumPos = dataPos = dataStart = PacketHeader.PKT_MAX_HEADER_LEN;
       maxChunks = 0;
     }
     
-    // create a new packet
+    /**
+     * Create a new packet.
+     * 
+     * @param pktSize maximum size of the packet, including checksum data and actual data.
+     * @param chunksPerPkt maximum number of chunks per packet.
+     * @param offsetInBlock offset in bytes into the HDFS block.
+     */
     Packet(int pktSize, int chunksPerPkt, long offsetInBlock) {
       this.lastPacketInBlock = false;
       this.numChunks = 0;
@@ -189,25 +198,24 @@ public class DFSOutputStream extends FSO
       this.seqno = currentSeqno;
       currentSeqno++;
       
-      buffer = null;
-      buf = new byte[pktSize];
+      buf = new byte[PacketHeader.PKT_MAX_HEADER_LEN + pktSize];
       
-      checksumStart = PacketHeader.PKT_HEADER_LEN;
+      checksumStart = PacketHeader.PKT_MAX_HEADER_LEN;
       checksumPos = checksumStart;
-      dataStart = checksumStart + chunksPerPkt * checksum.getChecksumSize();
+      dataStart = checksumStart + (chunksPerPkt * checksum.getChecksumSize());
       dataPos = dataStart;
       maxChunks = chunksPerPkt;
     }
 
     void writeData(byte[] inarray, int off, int len) {
-      if ( dataPos + len > buf.length) {
+      if (dataPos + len > buf.length) {
         throw new BufferOverflowException();
       }
       System.arraycopy(inarray, off, buf, dataPos, len);
       dataPos += len;
     }
 
-    void  writeChecksum(byte[] inarray, int off, int len) {
+    void writeChecksum(byte[] inarray, int off, int len) {
       if (checksumPos + len > dataStart) {
         throw new BufferOverflowException();
       }
@@ -216,45 +224,38 @@ public class DFSOutputStream extends FSO
     }
     
     /**
-     * Returns ByteBuffer that contains one full packet, including header.
+     * Write the full packet, including the header, to the given output stream.
      */
-    ByteBuffer getBuffer() {
-      /* Once this is called, no more data can be added to the packet.
-       * setting 'buf' to null ensures that.
-       * This is called only when the packet is ready to be sent.
-       */
-      if (buffer != null) {
-        return buffer;
-      }
-      
-      //prepare the header and close any gap between checksum and data.
-      
-      int dataLen = dataPos - dataStart;
-      int checksumLen = checksumPos - checksumStart;
+    void writeTo(DataOutputStream stm) throws IOException {
+      final int dataLen = dataPos - dataStart;
+      final int checksumLen = checksumPos - checksumStart;
+      final int pktLen = HdfsConstants.BYTES_IN_INTEGER + dataLen + checksumLen;
+
+      PacketHeader header = new PacketHeader(
+        pktLen, offsetInBlock, seqno, lastPacketInBlock, dataLen, syncBlock);
       
       if (checksumPos != dataStart) {
-        /* move the checksum to cover the gap.
-         * This can happen for the last packet.
-         */
+        // Move the checksum to cover the gap. This can happen for the last
+        // packet or during an hflush/hsync call.
         System.arraycopy(buf, checksumStart, buf, 
                          dataStart - checksumLen , checksumLen); 
+        checksumPos = dataStart;
+        checksumStart = checksumPos - checksumLen;
       }
       
-      int pktLen = HdfsConstants.BYTES_IN_INTEGER + dataLen + checksumLen;
+      final int headerStart = checksumStart - header.getSerializedSize();
+      assert checksumStart + 1 >= header.getSerializedSize();
+      assert checksumPos == dataStart;
+      assert headerStart >= 0;
+      assert headerStart + header.getSerializedSize() == checksumStart;
       
-      //normally dataStart == checksumPos, i.e., offset is zero.
-      buffer = ByteBuffer.wrap(
-        buf, dataStart - checksumPos,
-        PacketHeader.PKT_HEADER_LEN + pktLen - HdfsConstants.BYTES_IN_INTEGER);
-      buf = null;
-      buffer.mark();
-
-      PacketHeader header = new PacketHeader(
-        pktLen, offsetInBlock, seqno, lastPacketInBlock, dataLen, syncBlock);
-      header.putInBuffer(buffer);
+      // Copy the header data into the buffer immediately preceding the checksum
+      // data.
+      System.arraycopy(header.getBytes(), 0, buf, headerStart,
+          header.getSerializedSize());
       
-      buffer.reset();
-      return buffer;
+      // Write the now contiguous full packet to the output stream.
+      stm.write(buf, headerStart, header.getSerializedSize() + checksumLen + dataLen);
     }
     
     // get the packet's last byte's offset in the block
@@ -497,8 +498,6 @@ public class DFSOutputStream extends FSO
           }
           
           // send the packet
-          ByteBuffer buf = one.getBuffer();
-
           synchronized (dataQueue) {
             // move packet from dataQueue to ackQueue
             if (!one.isHeartbeatPacket()) {
@@ -514,8 +513,8 @@ public class DFSOutputStream extends FSO
           }
 
           // write out data to remote datanode
-          try {            
-            blockStream.write(buf.array(), buf.position(), buf.remaining());
+          try {
+            one.writeTo(blockStream);
             blockStream.flush();   
           } catch (IOException e) {
             // HDFS-3398 treat primary DN is down since client is unable to 
@@ -867,16 +866,26 @@ public class DFSOutputStream extends FSO
       try {
         sock = createSocketForPipeline(src, 2, dfsClient);
         final long writeTimeout = dfsClient.getDatanodeWriteTimeout(2);
-        out = new DataOutputStream(new BufferedOutputStream(
-            NetUtils.getOutputStream(sock, writeTimeout),
+        
+        OutputStream unbufOut = NetUtils.getOutputStream(sock, writeTimeout);
+        InputStream unbufIn = NetUtils.getInputStream(sock);
+        if (dfsClient.shouldEncryptData()) {
+          IOStreamPair encryptedStreams =
+              DataTransferEncryptor.getEncryptedStreams(
+                  unbufOut, unbufIn, dfsClient.getDataEncryptionKey());
+          unbufOut = encryptedStreams.out;
+          unbufIn = encryptedStreams.in;
+        }
+        out = new DataOutputStream(new BufferedOutputStream(unbufOut,
             HdfsConstants.SMALL_BUFFER_SIZE));
+        in = new DataInputStream(unbufIn);
 
         //send the TRANSFER_BLOCK request
         new Sender(out).transferBlock(block, blockToken, dfsClient.clientName,
             targets);
+        out.flush();
 
         //ack
-        in = new DataInputStream(NetUtils.getInputStream(sock));
         BlockOpResponseProto response =
           BlockOpResponseProto.parseFrom(HdfsProtoUtil.vintPrefixed(in));
         if (SUCCESS != response.getStatus()) {
@@ -1034,77 +1043,98 @@ public class DFSOutputStream extends FSO
       // persist blocks on namenode on next flush
       persistBlocks.set(true);
 
-      boolean result = false;
-      DataOutputStream out = null;
-      try {
-        assert null == s : "Previous socket unclosed";
-        s = createSocketForPipeline(nodes[0], nodes.length, dfsClient);
-        long writeTimeout = dfsClient.getDatanodeWriteTimeout(nodes.length);
-
-        //
-        // Xmit header info to datanode
-        //
-        out = new DataOutputStream(new BufferedOutputStream(
-            NetUtils.getOutputStream(s, writeTimeout),
-            HdfsConstants.SMALL_BUFFER_SIZE));
-        
-        assert null == blockReplyStream : "Previous blockReplyStream unclosed";
-        blockReplyStream = new DataInputStream(NetUtils.getInputStream(s));
-
-        // send the request
-        new Sender(out).writeBlock(block, accessToken, dfsClient.clientName,
-            nodes, null, recoveryFlag? stage.getRecoveryStage() : stage, 
-            nodes.length, block.getNumBytes(), bytesSent, newGS, checksum);
-
-        // receive ack for connect
-        BlockOpResponseProto resp = BlockOpResponseProto.parseFrom(
-            HdfsProtoUtil.vintPrefixed(blockReplyStream));
-        pipelineStatus = resp.getStatus();
-        firstBadLink = resp.getFirstBadLink();
-        
-        if (pipelineStatus != SUCCESS) {
-          if (pipelineStatus == Status.ERROR_ACCESS_TOKEN) {
-            throw new InvalidBlockTokenException(
-                "Got access token error for connect ack with firstBadLink as "
-                    + firstBadLink);
-          } else {
-            throw new IOException("Bad connect ack with firstBadLink as "
-                + firstBadLink);
+      int refetchEncryptionKey = 1;
+      while (true) {
+        boolean result = false;
+        DataOutputStream out = null;
+        try {
+          assert null == s : "Previous socket unclosed";
+          assert null == blockReplyStream : "Previous blockReplyStream unclosed";
+          s = createSocketForPipeline(nodes[0], nodes.length, dfsClient);
+          long writeTimeout = dfsClient.getDatanodeWriteTimeout(nodes.length);
+          
+          OutputStream unbufOut = NetUtils.getOutputStream(s, writeTimeout);
+          InputStream unbufIn = NetUtils.getInputStream(s);
+          if (dfsClient.shouldEncryptData()) {
+            IOStreamPair encryptedStreams =
+                DataTransferEncryptor.getEncryptedStreams(unbufOut,
+                    unbufIn, dfsClient.getDataEncryptionKey());
+            unbufOut = encryptedStreams.out;
+            unbufIn = encryptedStreams.in;
+          }
+          out = new DataOutputStream(new BufferedOutputStream(unbufOut,
+              HdfsConstants.SMALL_BUFFER_SIZE));
+          blockReplyStream = new DataInputStream(unbufIn);
+  
+          //
+          // Xmit header info to datanode
+          //
+  
+          // send the request
+          new Sender(out).writeBlock(block, accessToken, dfsClient.clientName,
+              nodes, null, recoveryFlag? stage.getRecoveryStage() : stage, 
+              nodes.length, block.getNumBytes(), bytesSent, newGS, checksum);
+  
+          // receive ack for connect
+          BlockOpResponseProto resp = BlockOpResponseProto.parseFrom(
+              HdfsProtoUtil.vintPrefixed(blockReplyStream));
+          pipelineStatus = resp.getStatus();
+          firstBadLink = resp.getFirstBadLink();
+          
+          if (pipelineStatus != SUCCESS) {
+            if (pipelineStatus == Status.ERROR_ACCESS_TOKEN) {
+              throw new InvalidBlockTokenException(
+                  "Got access token error for connect ack with firstBadLink as "
+                      + firstBadLink);
+            } else {
+              throw new IOException("Bad connect ack with firstBadLink as "
+                  + firstBadLink);
+            }
           }
-        }
-        assert null == blockStream : "Previous blockStream unclosed";
-        blockStream = out;
-        result =  true; // success
-
-      } catch (IOException ie) {
-
-        DFSClient.LOG.info("Exception in createBlockOutputStream", ie);
-
-        // find the datanode that matches
-        if (firstBadLink.length() != 0) {
-          for (int i = 0; i < nodes.length; i++) {
-            if (nodes[i].getXferAddr().equals(firstBadLink)) {
-              errorIndex = i;
-              break;
+          assert null == blockStream : "Previous blockStream unclosed";
+          blockStream = out;
+          result =  true; // success
+  
+        } catch (IOException ie) {
+          DFSClient.LOG.info("Exception in createBlockOutputStream", ie);
+          if (ie instanceof InvalidEncryptionKeyException && refetchEncryptionKey > 0) {
+            DFSClient.LOG.info("Will fetch a new encryption key and retry, " 
+                + "encryption key was invalid when connecting to "
+                + nodes[0].getXferAddr() + " : " + ie);
+            // The encryption key used is invalid.
+            refetchEncryptionKey--;
+            dfsClient.clearDataEncryptionKey();
+            // Don't close the socket/exclude this node just yet. Try again with
+            // a new encryption key.
+            continue;
+          }
+  
+          // find the datanode that matches
+          if (firstBadLink.length() != 0) {
+            for (int i = 0; i < nodes.length; i++) {
+              if (nodes[i].getXferAddr().equals(firstBadLink)) {
+                errorIndex = i;
+                break;
+              }
             }
+          } else {
+            errorIndex = 0;
+          }
+          hasError = true;
+          setLastException(ie);
+          result =  false;  // error
+        } finally {
+          if (!result) {
+            IOUtils.closeSocket(s);
+            s = null;
+            IOUtils.closeStream(out);
+            out = null;
+            IOUtils.closeStream(blockReplyStream);
+            blockReplyStream = null;
           }
-        } else {
-          errorIndex = 0;
-        }
-        hasError = true;
-        setLastException(ie);
-        result =  false;  // error
-      } finally {
-        if (!result) {
-          IOUtils.closeSocket(s);
-          s = null;
-          IOUtils.closeStream(out);
-          out = null;
-          IOUtils.closeStream(blockReplyStream);
-          blockReplyStream = null;
         }
+        return result;
       }
-      return result;
     }
 
     private LocatedBlock locateFollowingBlock(long start,
@@ -1322,9 +1352,8 @@ public class DFSOutputStream extends FSO
 
   private void computePacketChunkSize(int psize, int csize) {
     int chunkSize = csize + checksum.getChecksumSize();
-    int n = PacketHeader.PKT_HEADER_LEN;
-    chunksPerPacket = Math.max((psize - n + chunkSize-1)/chunkSize, 1);
-    packetSize = n + chunkSize*chunksPerPacket;
+    chunksPerPacket = Math.max(psize/chunkSize, 1);
+    packetSize = chunkSize*chunksPerPacket;
     if (DFSClient.LOG.isDebugEnabled()) {
       DFSClient.LOG.debug("computePacketChunkSize: src=" + src +
                 ", chunkSize=" + chunkSize +
@@ -1438,8 +1467,7 @@ public class DFSOutputStream extends FSO
       // indicate the end of block and reset bytesCurBlock.
       //
       if (bytesCurBlock == blockSize) {
-        currentPacket = new Packet(PacketHeader.PKT_HEADER_LEN, 0, 
-            bytesCurBlock);
+        currentPacket = new Packet(0, 0, bytesCurBlock);
         currentPacket.lastPacketInBlock = true;
         currentPacket.syncBlock = shouldSyncBlock;
         waitAndQueueCurrentPacket();
@@ -1709,8 +1737,7 @@ public class DFSOutputStream extends FSO
 
       if (bytesCurBlock != 0) {
         // send an empty packet to mark the end of the block
-        currentPacket = new Packet(PacketHeader.PKT_HEADER_LEN, 0, 
-            bytesCurBlock);
+        currentPacket = new Packet(0, 0, bytesCurBlock);
         currentPacket.lastPacketInBlock = true;
         currentPacket.syncBlock = shouldSyncBlock;
       }
@@ -1763,8 +1790,7 @@ public class DFSOutputStream extends FSO
   @VisibleForTesting
   public synchronized void setChunksPerPacket(int value) {
     chunksPerPacket = Math.min(chunksPerPacket, value);
-    packetSize = PacketHeader.PKT_HEADER_LEN +
-                 (checksum.getBytesPerChecksum() + 
+    packetSize = (checksum.getBytesPerChecksum() + 
                   checksum.getChecksumSize()) * chunksPerPacket;
   }
 

Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java?rev=1371518&r1=1371517&r2=1371518&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java Thu Aug  9 22:29:36 2012
@@ -247,7 +247,7 @@ public class DistributedFileSystem exten
     FsPermission absolutePermission, EnumSet<CreateFlag> flag, int bufferSize,
     short replication, long blockSize, Progressable progress,
     int bytesPerChecksum) throws IOException {
-    statistics.incrementReadOps(1);
+    statistics.incrementWriteOps(1);
     return new HdfsDataOutputStream(dfs.primitiveCreate(getPathName(f),
         absolutePermission, flag, true, replication, blockSize,
         progress, bufferSize, bytesPerChecksum),statistics);

Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java?rev=1371518&r1=1371517&r2=1371518&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java Thu Aug  9 22:29:36 2012
@@ -33,6 +33,7 @@ import org.apache.hadoop.fs.FSInputCheck
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
+import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
 import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
 import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
@@ -458,7 +459,9 @@ public class RemoteBlockReader extends F
   void sendReadResult(Socket sock, Status statusCode) {
     assert !sentStatusCode : "already sent status code to " + sock;
     try {
-      RemoteBlockReader2.writeReadResult(sock, statusCode);
+      RemoteBlockReader2.writeReadResult(
+          NetUtils.getOutputStream(sock, HdfsServerConstants.WRITE_TIMEOUT),
+          statusCode);
       sentStatusCode = true;
     } catch (IOException e) {
       // It's ok not to be able to send this. But something is probably wrong.
@@ -484,4 +487,11 @@ public class RemoteBlockReader extends F
     throw new UnsupportedOperationException("readDirect unsupported in RemoteBlockReader");
   }
 
+  @Override
+  public IOStreamPair getStreams() {
+    // This class doesn't support encryption, which is the only thing this
+    // method is used for. See HDFS-3637.
+    return null;
+  }
+
 }

Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java?rev=1371518&r1=1371517&r2=1371518&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java Thu Aug  9 22:29:36 2012
@@ -23,6 +23,7 @@ import java.io.BufferedOutputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
+import java.io.InputStream;
 import java.io.OutputStream;
 import java.net.InetSocketAddress;
 import java.net.Socket;
@@ -32,26 +33,26 @@ import java.nio.channels.ReadableByteCha
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.fs.ChecksumException;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
+import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferEncryptor;
+import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
 import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
+import org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver;
 import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientReadStatusProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReadOpChecksumInfoProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
+import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
-import org.apache.hadoop.hdfs.util.DirectBufferPool;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.net.SocketInputWrapper;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.DataChecksum;
 
-import com.google.common.base.Preconditions;
-
 /**
  * This is a wrapper around connection to datanode
  * and understands checksum, offset etc.
@@ -83,15 +84,15 @@ public class RemoteBlockReader2  impleme
 
   static final Log LOG = LogFactory.getLog(RemoteBlockReader2.class);
   
-  Socket dnSock; //for now just sending the status code (e.g. checksumOk) after the read.
+  Socket dnSock;
+  // for now just sending the status code (e.g. checksumOk) after the read.
+  private IOStreamPair ioStreams;
   private final ReadableByteChannel in;
   private DataChecksum checksum;
   
-  private PacketHeader curHeader;
-  private ByteBuffer curPacketBuf = null;
+  private PacketReceiver packetReceiver = new PacketReceiver(true);
   private ByteBuffer curDataSlice = null;
 
-
   /** offset in block of the last chunk received */
   private long lastSeqNo = -1;
 
@@ -99,10 +100,6 @@ public class RemoteBlockReader2  impleme
   private long startOffset;
   private final String filename;
 
-  private static DirectBufferPool bufferPool = new DirectBufferPool();
-  private final ByteBuffer headerBuf = ByteBuffer.allocate(
-      PacketHeader.PKT_HEADER_LEN);
-
   private final int bytesPerChecksum;
   private final int checksumSize;
 
@@ -126,7 +123,7 @@ public class RemoteBlockReader2  impleme
   public synchronized int read(byte[] buf, int off, int len) 
                                throws IOException {
 
-    if (curPacketBuf == null || curDataSlice.remaining() == 0 && bytesNeededToFinish > 0) {
+    if (curDataSlice == null || curDataSlice.remaining() == 0 && bytesNeededToFinish > 0) {
       readNextPacket();
     }
     if (curDataSlice.remaining() == 0) {
@@ -143,7 +140,7 @@ public class RemoteBlockReader2  impleme
 
   @Override
   public int read(ByteBuffer buf) throws IOException {
-    if (curPacketBuf == null || curDataSlice.remaining() == 0 && bytesNeededToFinish > 0) {
+    if (curDataSlice == null || curDataSlice.remaining() == 0 && bytesNeededToFinish > 0) {
       readNextPacket();
     }
     if (curDataSlice.remaining() == 0) {
@@ -161,11 +158,13 @@ public class RemoteBlockReader2  impleme
   }
 
   private void readNextPacket() throws IOException {
-    Preconditions.checkState(curHeader == null || !curHeader.isLastPacketInBlock());
-    
     //Read packet headers.
-    readPacketHeader();
+    packetReceiver.receiveNextPacket(in);
 
+    PacketHeader curHeader = packetReceiver.getHeader();
+    curDataSlice = packetReceiver.getDataSlice();
+    assert curDataSlice.capacity() == curHeader.getDataLen();
+    
     if (LOG.isTraceEnabled()) {
       LOG.trace("DFSClient readNextPacket got header " + curHeader);
     }
@@ -179,17 +178,20 @@ public class RemoteBlockReader2  impleme
     if (curHeader.getDataLen() > 0) {
       int chunks = 1 + (curHeader.getDataLen() - 1) / bytesPerChecksum;
       int checksumsLen = chunks * checksumSize;
-      int bufsize = checksumsLen + curHeader.getDataLen();
+
+      assert packetReceiver.getChecksumSlice().capacity() == checksumsLen :
+        "checksum slice capacity=" + packetReceiver.getChecksumSlice().capacity() + 
+          " checksumsLen=" + checksumsLen;
       
-      resetPacketBuffer(checksumsLen, curHeader.getDataLen());
-  
       lastSeqNo = curHeader.getSeqno();
-      if (bufsize > 0) {
-        readChannelFully(in, curPacketBuf);
-        curPacketBuf.flip();
-        if (verifyChecksum) {
-          verifyPacketChecksums();
-        }
+      if (verifyChecksum && curDataSlice.remaining() > 0) {
+        // N.B.: the checksum error offset reported here is actually
+        // relative to the start of the block, not the start of the file.
+        // This is slightly misleading, but preserves the behavior from
+        // the older BlockReader.
+        checksum.verifyChunkedSums(curDataSlice,
+            packetReceiver.getChecksumSlice(),
+            filename, curHeader.getOffsetInBlock());
       }
       bytesNeededToFinish -= curHeader.getDataLen();
     }    
@@ -206,46 +208,13 @@ public class RemoteBlockReader2  impleme
     if (bytesNeededToFinish <= 0) {
       readTrailingEmptyPacket();
       if (verifyChecksum) {
-        sendReadResult(dnSock, Status.CHECKSUM_OK);
+        sendReadResult(Status.CHECKSUM_OK);
       } else {
-        sendReadResult(dnSock, Status.SUCCESS);
-      }
-    }
-  }
-
-  private void verifyPacketChecksums() throws ChecksumException {
-    // N.B.: the checksum error offset reported here is actually
-    // relative to the start of the block, not the start of the file.
-    // This is slightly misleading, but preserves the behavior from
-    // the older BlockReader.
-    checksum.verifyChunkedSums(curDataSlice, curPacketBuf,
-        filename, curHeader.getOffsetInBlock());
-  }
-
-  private static void readChannelFully(ReadableByteChannel ch, ByteBuffer buf)
-  throws IOException {
-    while (buf.remaining() > 0) {
-      int n = ch.read(buf);
-      if (n < 0) {
-        throw new IOException("Premature EOF reading from " + ch);
+        sendReadResult(Status.SUCCESS);
       }
     }
   }
-
-  private void resetPacketBuffer(int checksumsLen, int dataLen) {
-    int packetLen = checksumsLen + dataLen;
-    if (curPacketBuf == null ||
-        curPacketBuf.capacity() < packetLen) {
-      returnPacketBufToPool();
-      curPacketBuf = bufferPool.getBuffer(packetLen);
-    }
-    curPacketBuf.position(checksumsLen);
-    curDataSlice = curPacketBuf.slice();
-    curDataSlice.limit(dataLen);
-    curPacketBuf.clear();
-    curPacketBuf.limit(checksumsLen + dataLen);
-  }
-
+  
   @Override
   public synchronized long skip(long n) throws IOException {
     /* How can we make sure we don't throw a ChecksumException, at least
@@ -266,23 +235,14 @@ public class RemoteBlockReader2  impleme
     return nSkipped;
   }
 
-  private void readPacketHeader() throws IOException {
-    headerBuf.clear();
-    readChannelFully(in, headerBuf);
-    headerBuf.flip();
-    if (curHeader == null) curHeader = new PacketHeader();
-    curHeader.readFields(headerBuf);
-  }
-
   private void readTrailingEmptyPacket() throws IOException {
     if (LOG.isTraceEnabled()) {
       LOG.trace("Reading empty packet at end of read");
     }
-    headerBuf.clear();
-    readChannelFully(in, headerBuf);
-    headerBuf.flip();
-    PacketHeader trailer = new PacketHeader();
-    trailer.readFields(headerBuf);
+    
+    packetReceiver.receiveNextPacket(in);
+
+    PacketHeader trailer = packetReceiver.getHeader();
     if (!trailer.isLastPacketInBlock() ||
        trailer.getDataLen() != 0) {
       throw new IOException("Expected empty end-of-read packet! Header: " +
@@ -292,9 +252,11 @@ public class RemoteBlockReader2  impleme
 
   protected RemoteBlockReader2(String file, String bpid, long blockId,
       ReadableByteChannel in, DataChecksum checksum, boolean verifyChecksum,
-      long startOffset, long firstChunkOffset, long bytesToRead, Socket dnSock) {
+      long startOffset, long firstChunkOffset, long bytesToRead, Socket dnSock,
+      IOStreamPair ioStreams) {
     // Path is used only for printing block and file information in debug
     this.dnSock = dnSock;
+    this.ioStreams = ioStreams;
     this.in = in;
     this.checksum = checksum;
     this.verifyChecksum = verifyChecksum;
@@ -313,7 +275,7 @@ public class RemoteBlockReader2  impleme
 
   @Override
   public synchronized void close() throws IOException {
-    returnPacketBufToPool();
+    packetReceiver.close();
     
     startOffset = -1;
     checksum = null;
@@ -324,24 +286,6 @@ public class RemoteBlockReader2  impleme
     // in will be closed when its Socket is closed.
   }
   
-  @Override
-  protected void finalize() throws Throwable {
-    try {
-      // just in case it didn't get closed, we
-      // may as well still try to return the buffer
-      returnPacketBufToPool();
-    } finally {
-      super.finalize();
-    }
-  }
-  
-  private void returnPacketBufToPool() {
-    if (curPacketBuf != null) {
-      bufferPool.returnBuffer(curPacketBuf);
-      curPacketBuf = null;
-    }
-  }
-
   /**
    * Take the socket used to talk to the DN.
    */
@@ -369,24 +313,23 @@ public class RemoteBlockReader2  impleme
    * closing our connection (which we will re-open), but won't affect
    * data correctness.
    */
-  void sendReadResult(Socket sock, Status statusCode) {
-    assert !sentStatusCode : "already sent status code to " + sock;
+  void sendReadResult(Status statusCode) {
+    assert !sentStatusCode : "already sent status code to " + dnSock;
     try {
-      writeReadResult(sock, statusCode);
+      writeReadResult(ioStreams.out, statusCode);
       sentStatusCode = true;
     } catch (IOException e) {
       // It's ok not to be able to send this. But something is probably wrong.
       LOG.info("Could not send read status (" + statusCode + ") to datanode " +
-               sock.getInetAddress() + ": " + e.getMessage());
+               dnSock.getInetAddress() + ": " + e.getMessage());
     }
   }
 
   /**
    * Serialize the actual read result on the wire.
    */
-  static void writeReadResult(Socket sock, Status statusCode)
+  static void writeReadResult(OutputStream out, Status statusCode)
       throws IOException {
-    OutputStream out = NetUtils.getOutputStream(sock, HdfsServerConstants.WRITE_TIMEOUT);
     
     ClientReadStatusProto.newBuilder()
       .setStatus(statusCode)
@@ -434,25 +377,32 @@ public class RemoteBlockReader2  impleme
    * @param clientName  Client name
    * @return New BlockReader instance, or null on error.
    */
-  public static BlockReader newBlockReader( Socket sock, String file,
+  public static BlockReader newBlockReader(Socket sock, String file,
                                      ExtendedBlock block,
                                      Token<BlockTokenIdentifier> blockToken,
                                      long startOffset, long len,
                                      int bufferSize, boolean verifyChecksum,
-                                     String clientName)
+                                     String clientName,
+                                     DataEncryptionKey encryptionKey,
+                                     IOStreamPair ioStreams)
                                      throws IOException {
+    
+    ReadableByteChannel ch;
+    if (ioStreams.in instanceof SocketInputWrapper) {
+      ch = ((SocketInputWrapper)ioStreams.in).getReadableByteChannel();
+    } else {
+      ch = (ReadableByteChannel) ioStreams.in;
+    }
+    
     // in and out will be closed when sock is closed (by the caller)
     final DataOutputStream out = new DataOutputStream(new BufferedOutputStream(
-          NetUtils.getOutputStream(sock,
-              HdfsServerConstants.WRITE_TIMEOUT)));
+          ioStreams.out));
     new Sender(out).readBlock(block, blockToken, clientName, startOffset, len);
 
     //
-    // Get bytes in block, set streams
+    // Get bytes in block
     //
-    SocketInputWrapper sin = NetUtils.getInputStream(sock);
-    ReadableByteChannel ch = sin.getReadableByteChannel();
-    DataInputStream in = new DataInputStream(sin);
+    DataInputStream in = new DataInputStream(ioStreams.in);
 
     BlockOpResponseProto status = BlockOpResponseProto.parseFrom(
         vintPrefixed(in));
@@ -474,7 +424,8 @@ public class RemoteBlockReader2  impleme
     }
 
     return new RemoteBlockReader2(file, block.getBlockPoolId(), block.getBlockId(),
-        ch, checksum, verifyChecksum, startOffset, firstChunkOffset, len, sock);
+        ch, checksum, verifyChecksum, startOffset, firstChunkOffset, len, sock,
+        ioStreams);
   }
 
   static void checkSuccess(
@@ -498,4 +449,9 @@ public class RemoteBlockReader2  impleme
       }
     }
   }
+
+  @Override
+  public IOStreamPair getStreams() {
+    return ioStreams;
+  }
 }

Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/SocketCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/SocketCache.java?rev=1371518&r1=1371517&r2=1371518&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/SocketCache.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/SocketCache.java Thu Aug  9 22:29:36 2012
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.hdfs;
 
+import java.io.Closeable;
 import java.net.Socket;
 import java.net.SocketAddress;
 
@@ -29,6 +30,8 @@ import com.google.common.base.Preconditi
 import com.google.common.collect.LinkedListMultimap;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
 import org.apache.hadoop.io.IOUtils;
 
 /**
@@ -37,7 +40,7 @@ import org.apache.hadoop.io.IOUtils;
 class SocketCache {
   static final Log LOG = LogFactory.getLog(SocketCache.class);
 
-  private final LinkedListMultimap<SocketAddress, Socket> multimap;
+  private final LinkedListMultimap<SocketAddress, SocketAndStreams> multimap;
   private final int capacity;
 
   /**
@@ -57,21 +60,21 @@ class SocketCache {
    * @param remote  Remote address the socket is connected to.
    * @return  A socket with unknown state, possibly closed underneath. Or null.
    */
-  public synchronized Socket get(SocketAddress remote) {
+  public synchronized SocketAndStreams get(SocketAddress remote) {
     if (capacity <= 0) { // disabled
       return null;
     }
     
-    List<Socket> socklist = multimap.get(remote);
+    List<SocketAndStreams> socklist = multimap.get(remote);
     if (socklist == null) {
       return null;
     }
 
-    Iterator<Socket> iter = socklist.iterator();
+    Iterator<SocketAndStreams> iter = socklist.iterator();
     while (iter.hasNext()) {
-      Socket candidate = iter.next();
+      SocketAndStreams candidate = iter.next();
       iter.remove();
-      if (!candidate.isClosed()) {
+      if (!candidate.sock.isClosed()) {
         return candidate;
       }
     }
@@ -82,10 +85,11 @@ class SocketCache {
    * Give an unused socket to the cache.
    * @param sock socket not used by anyone.
    */
-  public synchronized void put(Socket sock) {
+  public synchronized void put(Socket sock, IOStreamPair ioStreams) {
+    SocketAndStreams s = new SocketAndStreams(sock, ioStreams);
     if (capacity <= 0) {
       // Cache disabled.
-      IOUtils.closeSocket(sock);
+      s.close();
       return;
     }
     
@@ -102,7 +106,7 @@ class SocketCache {
     if (capacity == multimap.size()) {
       evictOldest();
     }
-    multimap.put(remoteAddr, sock);
+    multimap.put(remoteAddr, new SocketAndStreams(sock, ioStreams));
   }
 
   public synchronized int size() {
@@ -113,23 +117,23 @@ class SocketCache {
    * Evict the oldest entry in the cache.
    */
   private synchronized void evictOldest() {
-    Iterator<Entry<SocketAddress, Socket>> iter =
+    Iterator<Entry<SocketAddress, SocketAndStreams>> iter =
       multimap.entries().iterator();
     if (!iter.hasNext()) {
       throw new IllegalStateException("Cannot evict from empty cache!");
     }
-    Entry<SocketAddress, Socket> entry = iter.next();
+    Entry<SocketAddress, SocketAndStreams> entry = iter.next();
     iter.remove();
-    Socket sock = entry.getValue();
-    IOUtils.closeSocket(sock);
+    SocketAndStreams s = entry.getValue();
+    s.close();
   }
 
   /**
    * Empty the cache, and close all sockets.
    */
   public synchronized void clear() {
-    for (Socket sock : multimap.values()) {
-      IOUtils.closeSocket(sock);
+    for (SocketAndStreams s : multimap.values()) {
+      s.close();
     }
     multimap.clear();
   }
@@ -138,5 +142,25 @@ class SocketCache {
   protected void finalize() {
     clear();
   }
+  
+  @InterfaceAudience.Private
+  static class SocketAndStreams implements Closeable {
+    public final Socket sock;
+    public final IOStreamPair ioStreams;
+    
+    public SocketAndStreams(Socket s, IOStreamPair ioStreams) {
+      this.sock = s;
+      this.ioStreams = ioStreams;
+    }
+    
+    @Override
+    public void close() {
+      if (ioStreams != null) { 
+        IOUtils.closeStream(ioStreams.in);
+        IOUtils.closeStream(ioStreams.out);
+      }
+      IOUtils.closeSocket(sock);
+    }
+  }
 
 }

Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java?rev=1371518&r1=1371517&r2=1371518&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java Thu Aug  9 22:29:36 2012
@@ -44,6 +44,7 @@ import org.apache.hadoop.security.Access
 import org.apache.hadoop.security.KerberosInfo;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenInfo;
+import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSelector;
 
@@ -941,4 +942,11 @@ public interface ClientProtocol {
    */
   public void cancelDelegationToken(Token<DelegationTokenIdentifier> token)
       throws IOException;
+  
+  /**
+   * @return encryption key so a client can encrypt data sent via the
+   *         DataTransferProtocol to/from DataNodes.
+   * @throws IOException
+   */
+  public DataEncryptionKey getDataEncryptionKey() throws IOException;
 }

Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketHeader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketHeader.java?rev=1371518&r1=1371517&r2=1371518&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketHeader.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketHeader.java Thu Aug  9 22:29:36 2012
@@ -27,14 +27,31 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.PacketHeaderProto;
 import org.apache.hadoop.hdfs.util.ByteBufferOutputStream;
 
+import com.google.common.base.Preconditions;
+import com.google.common.primitives.Shorts;
+import com.google.common.primitives.Ints;
+import com.google.protobuf.InvalidProtocolBufferException;
+
 /**
  * Header data for each packet that goes through the read/write pipelines.
+ * Includes all of the information about the packet, excluding checksums and
+ * actual data.
+ * 
+ * This data includes:
+ *  - the offset in bytes into the HDFS block of the data in this packet
+ *  - the sequence number of this packet in the pipeline
+ *  - whether or not this is the last packet in the pipeline
+ *  - the length of the data in this packet
+ *  - whether or not this packet should be synced by the DNs.
+ *  
+ * When serialized, this header is written out as a protocol buffer, preceded
+ * by a 4-byte integer representing the full packet length, and a 2-byte short
+ * representing the header length.
  */
 @InterfaceAudience.Private
 @InterfaceStability.Evolving
 public class PacketHeader {
-  /** Header size for a packet */
-  private static final int PROTO_SIZE = 
+  private static final int MAX_PROTO_SIZE = 
     PacketHeaderProto.newBuilder()
       .setOffsetInBlock(0)
       .setSeqno(0)
@@ -42,8 +59,10 @@ public class PacketHeader {
       .setDataLen(0)
       .setSyncBlock(false)
       .build().getSerializedSize();
-  public static final int PKT_HEADER_LEN =
-    6 + PROTO_SIZE;
+  public static final int PKT_LENGTHS_LEN =
+      Ints.BYTES + Shorts.BYTES;
+  public static final int PKT_MAX_HEADER_LEN =
+      PKT_LENGTHS_LEN + MAX_PROTO_SIZE;
 
   private int packetLen;
   private PacketHeaderProto proto;
@@ -54,13 +73,25 @@ public class PacketHeader {
   public PacketHeader(int packetLen, long offsetInBlock, long seqno,
                       boolean lastPacketInBlock, int dataLen, boolean syncBlock) {
     this.packetLen = packetLen;
-    proto = PacketHeaderProto.newBuilder()
+    Preconditions.checkArgument(packetLen >= Ints.BYTES,
+        "packet len %s should always be at least 4 bytes",
+        packetLen);
+    
+    PacketHeaderProto.Builder builder = PacketHeaderProto.newBuilder()
       .setOffsetInBlock(offsetInBlock)
       .setSeqno(seqno)
       .setLastPacketInBlock(lastPacketInBlock)
-      .setDataLen(dataLen)
-      .setSyncBlock(syncBlock)
-      .build();
+      .setDataLen(dataLen);
+      
+    if (syncBlock) {
+      // Only set syncBlock if it is specified.
+      // This is wire-incompatible with Hadoop 2.0.0-alpha due to HDFS-3721
+      // because it changes the length of the packet header, and BlockReceiver
+      // in that version did not support variable-length headers.
+      builder.setSyncBlock(syncBlock);
+    }
+      
+    proto = builder.build();
   }
 
   public int getDataLen() {
@@ -90,10 +121,16 @@ public class PacketHeader {
   @Override
   public String toString() {
     return "PacketHeader with packetLen=" + packetLen +
-      "Header data: " + 
+      " header data: " + 
       proto.toString();
   }
   
+  public void setFieldsFromData(
+      int packetLen, byte[] headerData) throws InvalidProtocolBufferException {
+    this.packetLen = packetLen;
+    proto = PacketHeaderProto.parseFrom(headerData);
+  }
+  
   public void readFields(ByteBuffer buf) throws IOException {
     packetLen = buf.getInt();
     short protoLen = buf.getShort();
@@ -110,14 +147,21 @@ public class PacketHeader {
     proto = PacketHeaderProto.parseFrom(data);
   }
 
+  /**
+   * @return the number of bytes necessary to write out this header,
+   * including the length-prefixing of the payload and header
+   */
+  public int getSerializedSize() {
+    return PKT_LENGTHS_LEN + proto.getSerializedSize();
+  }
 
   /**
    * Write the header into the buffer.
    * This requires that PKT_HEADER_LEN bytes are available.
    */
   public void putInBuffer(final ByteBuffer buf) {
-    assert proto.getSerializedSize() == PROTO_SIZE
-      : "Expected " + (PROTO_SIZE) + " got: " + proto.getSerializedSize();
+    assert proto.getSerializedSize() <= MAX_PROTO_SIZE
+      : "Expected " + (MAX_PROTO_SIZE) + " got: " + proto.getSerializedSize();
     try {
       buf.putInt(packetLen);
       buf.putShort((short) proto.getSerializedSize());
@@ -128,12 +172,18 @@ public class PacketHeader {
   }
   
   public void write(DataOutputStream out) throws IOException {
-    assert proto.getSerializedSize() == PROTO_SIZE
-    : "Expected " + (PROTO_SIZE) + " got: " + proto.getSerializedSize();
+    assert proto.getSerializedSize() <= MAX_PROTO_SIZE
+    : "Expected " + (MAX_PROTO_SIZE) + " got: " + proto.getSerializedSize();
     out.writeInt(packetLen);
     out.writeShort(proto.getSerializedSize());
     proto.writeTo(out);
   }
+  
+  public byte[] getBytes() {
+    ByteBuffer buf = ByteBuffer.allocate(getSerializedSize());
+    putInBuffer(buf);
+    return buf.array();
+  }
 
   /**
    * Perform a sanity check on the packet, returning true if it is sane.

Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java?rev=1371518&r1=1371517&r2=1371518&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java Thu Aug  9 22:29:36 2012
@@ -38,10 +38,10 @@ import org.apache.hadoop.hdfs.protocol.p
 @InterfaceAudience.Private
 @InterfaceStability.Evolving
 public abstract class Receiver implements DataTransferProtocol {
-  protected final DataInputStream in;
-
-  /** Create a receiver for DataTransferProtocol with a socket. */
-  protected Receiver(final DataInputStream in) {
+  protected DataInputStream in;
+  
+  /** Initialize a receiver for DataTransferProtocol with a socket. */
+  protected void initialize(final DataInputStream in) {
     this.in = in;
   }
 

Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java?rev=1371518&r1=1371517&r2=1371518&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java Thu Aug  9 22:29:36 2012
@@ -58,6 +58,8 @@ import org.apache.hadoop.hdfs.protocol.p
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.FsyncResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetAdditionalDatanodeRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetAdditionalDatanodeResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetDataEncryptionKeyRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetDataEncryptionKeyResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetBlockLocationsRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetBlockLocationsResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetBlockLocationsResponseProto.Builder;
@@ -127,6 +129,7 @@ import org.apache.hadoop.hdfs.protocol.p
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeIDProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeInfoProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.LocatedBlockProto;
+import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
 import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport;
 import org.apache.hadoop.io.Text;
 
@@ -830,4 +833,18 @@ public class ClientNamenodeProtocolServe
       throw new ServiceException(e);
     }
   }
+
+  @Override
+  public GetDataEncryptionKeyResponseProto getDataEncryptionKey(
+      RpcController controller, GetDataEncryptionKeyRequestProto request)
+      throws ServiceException {
+    try {
+      DataEncryptionKey encryptionKey = server.getDataEncryptionKey();
+      return GetDataEncryptionKeyResponseProto.newBuilder()
+          .setDataEncryptionKey(PBHelper.convert(encryptionKey))
+          .build();
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+  }
 }

Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java?rev=1371518&r1=1371517&r2=1371518&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java Thu Aug  9 22:29:36 2012
@@ -63,6 +63,7 @@ import org.apache.hadoop.hdfs.protocol.p
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.FinalizeUpgradeRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.FsyncRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetAdditionalDatanodeRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetDataEncryptionKeyRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetBlockLocationsRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetBlockLocationsResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetContentSummaryRequestProto;
@@ -99,6 +100,7 @@ import org.apache.hadoop.hdfs.protocol.p
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetTimesRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpdateBlockForPipelineRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpdatePipelineRequestProto;
+import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
 import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport;
 import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException;
@@ -815,9 +817,22 @@ public class ClientNamenodeProtocolTrans
         ClientNamenodeProtocolPB.class, RPC.RpcKind.RPC_PROTOCOL_BUFFER,
         RPC.getProtocolVersion(ClientNamenodeProtocolPB.class), methodName);
   }
+  
+  @Override
+  public DataEncryptionKey getDataEncryptionKey() throws IOException {
+    GetDataEncryptionKeyRequestProto req = GetDataEncryptionKeyRequestProto
+        .newBuilder().build();
+    try {
+      return PBHelper.convert(rpcProxy.getDataEncryptionKey(null, req)
+          .getDataEncryptionKey());
+    } catch (ServiceException e) {
+      throw ProtobufHelper.getRemoteException(e);
+    }
+  }
 
   @Override
   public Object getUnderlyingProxyObject() {
     return rpcProxy;
   }
+  
 }

Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java?rev=1371518&r1=1371517&r2=1371518&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java Thu Aug  9 22:29:36 2012
@@ -62,6 +62,7 @@ import org.apache.hadoop.hdfs.protocol.p
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.RegisterCommandProto;
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.StorageReportProto;
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.UpgradeCommandProto;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DataEncryptionKeyProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockKeyProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockTokenIdentifierProto;
@@ -96,6 +97,7 @@ import org.apache.hadoop.hdfs.protocol.p
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageInfoProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.UpgradeStatusReportProto;
 import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.JournalInfoProto;
+import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
 import org.apache.hadoop.hdfs.security.token.block.BlockKey;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
@@ -973,12 +975,37 @@ public class PBHelper {
         .setIsLastBlockComplete(lb.isLastBlockComplete()).build();
   }
   
+  // DataEncryptionKey
+  public static DataEncryptionKey convert(DataEncryptionKeyProto bet) {
+    String encryptionAlgorithm = bet.getEncryptionAlgorithm();
+    return new DataEncryptionKey(bet.getKeyId(),
+        bet.getBlockPoolId(),
+        bet.getNonce().toByteArray(),
+        bet.getEncryptionKey().toByteArray(),
+        bet.getExpiryDate(),
+        encryptionAlgorithm.isEmpty() ? null : encryptionAlgorithm);
+  }
+  
+  public static DataEncryptionKeyProto convert(DataEncryptionKey bet) {
+    DataEncryptionKeyProto.Builder b = DataEncryptionKeyProto.newBuilder()
+        .setKeyId(bet.keyId)
+        .setBlockPoolId(bet.blockPoolId)
+        .setNonce(ByteString.copyFrom(bet.nonce))
+        .setEncryptionKey(ByteString.copyFrom(bet.encryptionKey))
+        .setExpiryDate(bet.expiryDate);
+    if (bet.encryptionAlgorithm != null) {
+      b.setEncryptionAlgorithm(bet.encryptionAlgorithm);
+    }
+    return b.build();
+  }
+  
   public static FsServerDefaults convert(FsServerDefaultsProto fs) {
     if (fs == null) return null;
     return new FsServerDefaults(
         fs.getBlockSize(), fs.getBytesPerChecksum(), 
         fs.getWritePacketSize(), (short) fs.getReplication(),
-        fs.getFileBufferSize());
+        fs.getFileBufferSize(),
+        fs.getEncryptDataTransfer());
   }
   
   public static FsServerDefaultsProto convert(FsServerDefaults fs) {
@@ -986,7 +1013,10 @@ public class PBHelper {
     return FsServerDefaultsProto.newBuilder().
       setBlockSize(fs.getBlockSize()).
       setBytesPerChecksum(fs.getBytesPerChecksum()).
-      setWritePacketSize(fs.getWritePacketSize()).setReplication(fs.getReplication()).setFileBufferSize(fs.getFileBufferSize()).build();
+      setWritePacketSize(fs.getWritePacketSize())
+      .setReplication(fs.getReplication())
+      .setFileBufferSize(fs.getFileBufferSize())
+      .setEncryptDataTransfer(fs.getEncryptDataTransfer()).build();
   }
   
   public static FsPermissionProto convert(FsPermission p) {

Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockPoolTokenSecretManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockPoolTokenSecretManager.java?rev=1371518&r1=1371517&r2=1371518&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockPoolTokenSecretManager.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockPoolTokenSecretManager.java Thu Aug  9 22:29:36 2012
@@ -119,4 +119,13 @@ public class BlockPoolTokenSecretManager
       btsm.clearAllKeysForTesting();
     }
   }
+
+  public DataEncryptionKey generateDataEncryptionKey(String blockPoolId) {
+    return get(blockPoolId).generateDataEncryptionKey();
+  }
+  
+  public byte[] retrieveDataEncryptionKey(int keyId, String blockPoolId,
+      byte[] nonce) throws IOException {
+    return get(blockPoolId).retrieveDataEncryptionKey(keyId, nonce);
+  }
 }

Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java?rev=1371518&r1=1371517&r2=1371518&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java Thu Aug  9 22:29:36 2012
@@ -32,6 +32,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
 import org.apache.hadoop.io.WritableUtils;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.SecretManager;
@@ -74,6 +75,10 @@ public class BlockTokenSecretManager ext
   private BlockKey currentKey;
   private BlockKey nextKey;
   private Map<Integer, BlockKey> allKeys;
+  private String blockPoolId;
+  private String encryptionAlgorithm;
+  
+  private SecureRandom nonceGenerator = new SecureRandom();
 
   public static enum AccessMode {
     READ, WRITE, COPY, REPLACE
@@ -86,8 +91,9 @@ public class BlockTokenSecretManager ext
    * @param tokenLifetime how long an individual token is valid
    */
   public BlockTokenSecretManager(long keyUpdateInterval,
-      long tokenLifetime) {
-    this(false, keyUpdateInterval, tokenLifetime);
+      long tokenLifetime, String blockPoolId, String encryptionAlgorithm) {
+    this(false, keyUpdateInterval, tokenLifetime, blockPoolId,
+        encryptionAlgorithm);
   }
   
   /**
@@ -100,8 +106,10 @@ public class BlockTokenSecretManager ext
    * @param otherNnId the NN ID of the other NN in an HA setup
    */
   public BlockTokenSecretManager(long keyUpdateInterval,
-      long tokenLifetime, int nnIndex) {
-    this(true, keyUpdateInterval, tokenLifetime);
+      long tokenLifetime, int nnIndex, String blockPoolId,
+      String encryptionAlgorithm) {
+    this(true, keyUpdateInterval, tokenLifetime, blockPoolId,
+        encryptionAlgorithm);
     Preconditions.checkArgument(nnIndex == 0 || nnIndex == 1);
     this.nnIndex = nnIndex;
     setSerialNo(new SecureRandom().nextInt());
@@ -109,17 +117,24 @@ public class BlockTokenSecretManager ext
   }
   
   private BlockTokenSecretManager(boolean isMaster, long keyUpdateInterval,
-      long tokenLifetime) {
+      long tokenLifetime, String blockPoolId, String encryptionAlgorithm) {
     this.isMaster = isMaster;
     this.keyUpdateInterval = keyUpdateInterval;
     this.tokenLifetime = tokenLifetime;
     this.allKeys = new HashMap<Integer, BlockKey>();
+    this.blockPoolId = blockPoolId;
+    this.encryptionAlgorithm = encryptionAlgorithm;
+    generateKeys();
   }
   
   @VisibleForTesting
   public synchronized void setSerialNo(int serialNo) {
     this.serialNo = (serialNo & LOW_MASK) | (nnIndex << 31);
   }
+  
+  public void setBlockPoolId(String blockPoolId) {
+    this.blockPoolId = blockPoolId;
+  }
 
   /** Initialize block keys */
   private synchronized void generateKeys() {
@@ -371,6 +386,49 @@ public class BlockTokenSecretManager ext
     return createPassword(identifier.getBytes(), key.getKey());
   }
   
+  /**
+   * Generate a data encryption key for this block pool, using the current
+   * BlockKey.
+   * 
+   * @return a data encryption key which may be used to encrypt traffic
+   *         over the DataTransferProtocol
+   */
+  public DataEncryptionKey generateDataEncryptionKey() {
+    byte[] nonce = new byte[8];
+    nonceGenerator.nextBytes(nonce);
+    BlockKey key = null;
+    synchronized (this) {
+      key = currentKey;
+    }
+    byte[] encryptionKey = createPassword(nonce, key.getKey());
+    return new DataEncryptionKey(key.getKeyId(), blockPoolId, nonce,
+        encryptionKey, Time.now() + tokenLifetime,
+        encryptionAlgorithm);
+  }
+  
+  /**
+   * Recreate an encryption key based on the given key id and nonce.
+   * 
+   * @param keyId identifier of the secret key used to generate the encryption key.
+   * @param nonce random value used to create the encryption key
+   * @return the encryption key which corresponds to this (keyId, blockPoolId, nonce)
+   * @throws InvalidToken
+   * @throws InvalidEncryptionKeyException 
+   */
+  public byte[] retrieveDataEncryptionKey(int keyId, byte[] nonce)
+      throws InvalidEncryptionKeyException {
+    BlockKey key = null;
+    synchronized (this) {
+      key = allKeys.get(keyId);
+      if (key == null) {
+        throw new InvalidEncryptionKeyException("Can't re-compute encryption key"
+            + " for nonce, since the required block key (keyID=" + keyId
+            + ") doesn't exist. Current key: " + currentKey.getKeyId());
+      }
+    }
+    return createPassword(nonce, key.getKey());
+  }
+  
   @VisibleForTesting
   public synchronized void setKeyUpdateIntervalForTesting(long millis) {
     this.keyUpdateInterval = millis;

Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java?rev=1371518&r1=1371517&r2=1371518&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java Thu Aug  9 22:29:36 2012
@@ -24,6 +24,8 @@ import java.io.BufferedOutputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
 import java.net.Socket;
 import java.net.URI;
 import java.text.DateFormat;
@@ -57,6 +59,8 @@ import org.apache.hadoop.hdfs.protocol.D
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
+import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferEncryptor;
+import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
 import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
@@ -312,11 +316,22 @@ public class Balancer {
             NetUtils.createSocketAddr(target.datanode.getXferAddr()),
             HdfsServerConstants.READ_TIMEOUT);
         sock.setKeepAlive(true);
-        out = new DataOutputStream( new BufferedOutputStream(
-            sock.getOutputStream(), HdfsConstants.IO_FILE_BUFFER_SIZE));
+        
+        OutputStream unbufOut = sock.getOutputStream();
+        InputStream unbufIn = sock.getInputStream();
+        if (nnc.getDataEncryptionKey() != null) {
+          IOStreamPair encryptedStreams =
+              DataTransferEncryptor.getEncryptedStreams(
+                  unbufOut, unbufIn, nnc.getDataEncryptionKey());
+          unbufOut = encryptedStreams.out;
+          unbufIn = encryptedStreams.in;
+        }
+        out = new DataOutputStream(new BufferedOutputStream(unbufOut,
+            HdfsConstants.IO_FILE_BUFFER_SIZE));
+        in = new DataInputStream(new BufferedInputStream(unbufIn,
+            HdfsConstants.IO_FILE_BUFFER_SIZE));
+        
         sendRequest(out);
-        in = new DataInputStream( new BufferedInputStream(
-            sock.getInputStream(), HdfsConstants.IO_FILE_BUFFER_SIZE));
         receiveResponse(in);
         bytesMoved.inc(block.getNumBytes());
         LOG.info( "Moving block " + block.getBlock().getBlockId() +

Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java?rev=1371518&r1=1371517&r2=1371518&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java Thu Aug  9 22:29:36 2012
@@ -29,10 +29,12 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.NameNodeProxies;
 import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
 import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
@@ -60,10 +62,12 @@ class NameNodeConnector {
   final OutputStream out;
 
   private final boolean isBlockTokenEnabled;
+  private final boolean encryptDataTransfer;
   private boolean shouldRun;
   private long keyUpdaterInterval;
   private BlockTokenSecretManager blockTokenSecretManager;
   private Daemon keyupdaterthread; // AccessKeyUpdater thread
+  private DataEncryptionKey encryptionKey;
 
   NameNodeConnector(URI nameNodeUri,
       Configuration conf) throws IOException {
@@ -88,8 +92,11 @@ class NameNodeConnector {
       LOG.info("Block token params received from NN: keyUpdateInterval="
           + blockKeyUpdateInterval / (60 * 1000) + " min(s), tokenLifetime="
           + blockTokenLifetime / (60 * 1000) + " min(s)");
+      String encryptionAlgorithm = conf.get(
+          DFSConfigKeys.DFS_DATA_ENCRYPTION_ALGORITHM_KEY);
       this.blockTokenSecretManager = new BlockTokenSecretManager(
-          blockKeyUpdateInterval, blockTokenLifetime);
+          blockKeyUpdateInterval, blockTokenLifetime, blockpoolID,
+          encryptionAlgorithm);
       this.blockTokenSecretManager.addKeys(keys);
       /*
        * Balancer should sync its block keys with NN more frequently than NN
@@ -102,7 +109,8 @@ class NameNodeConnector {
       this.shouldRun = true;
       this.keyupdaterthread.start();
     }
-
+    this.encryptDataTransfer = fs.getServerDefaults(new Path("/"))
+        .getEncryptDataTransfer();
     // Check if there is another balancer running.
     // Exit if there is another one running.
     out = checkAndMarkRunningBalancer(); 
@@ -126,6 +134,20 @@ class NameNodeConnector {
           BlockTokenSecretManager.AccessMode.COPY));
     }
   }
+  
+  DataEncryptionKey getDataEncryptionKey()
+      throws IOException {
+    if (encryptDataTransfer) {
+      synchronized (this) {
+        if (encryptionKey == null) {
+          encryptionKey = blockTokenSecretManager.generateDataEncryptionKey();
+        }
+        return encryptionKey;
+      }
+    } else {
+      return null;
+    }
+  }
 
   /* The idea for making sure that there is no more than one balancer
    * running in an HDFS is to create a file in the HDFS, writes the IP address
@@ -208,4 +230,4 @@ class NameNodeConnector {
       }
     }
   }
-}
\ No newline at end of file
+}

Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java?rev=1371518&r1=1371517&r2=1371518&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java Thu Aug  9 22:29:36 2012
@@ -49,6 +49,7 @@ import org.apache.hadoop.hdfs.protocol.E
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
+import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager.AccessMode;
 import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
@@ -205,6 +206,9 @@ public class BlockManager {
 
   /** variable to enable check for enough racks */
   final boolean shouldCheckForEnoughRacks;
+  
+  // whether or not to issue block encryption keys.
+  final boolean encryptDataTransfer;
 
   /**
    * When running inside a Standby node, the node may receive block reports
@@ -285,12 +289,18 @@ public class BlockManager {
     this.replicationRecheckInterval = 
       conf.getInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 
                   DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_DEFAULT) * 1000L;
+    
+    this.encryptDataTransfer =
+        conf.getBoolean(DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_KEY,
+            DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_DEFAULT);
+    
     LOG.info("defaultReplication         = " + defaultReplication);
     LOG.info("maxReplication             = " + maxReplication);
     LOG.info("minReplication             = " + minReplication);
     LOG.info("maxReplicationStreams      = " + maxReplicationStreams);
     LOG.info("shouldCheckForEnoughRacks  = " + shouldCheckForEnoughRacks);
     LOG.info("replicationRecheckInterval = " + replicationRecheckInterval);
+    LOG.info("encryptDataTransfer        = " + encryptDataTransfer);
   }
 
   private static BlockTokenSecretManager createBlockTokenSecretManager(
@@ -310,10 +320,14 @@ public class BlockManager {
     final long lifetimeMin = conf.getLong(
         DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_LIFETIME_KEY, 
         DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_LIFETIME_DEFAULT);
+    final String encryptionAlgorithm = conf.get(
+        DFSConfigKeys.DFS_DATA_ENCRYPTION_ALGORITHM_KEY);
     LOG.info(DFSConfigKeys.DFS_BLOCK_ACCESS_KEY_UPDATE_INTERVAL_KEY
         + "=" + updateMin + " min(s), "
         + DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_LIFETIME_KEY
-        + "=" + lifetimeMin + " min(s)");
+        + "=" + lifetimeMin + " min(s), "
+        + DFSConfigKeys.DFS_DATA_ENCRYPTION_ALGORITHM_KEY
+        + "=" + encryptionAlgorithm);
     
     String nsId = DFSUtil.getNamenodeNameServiceId(conf);
     boolean isHaEnabled = HAUtil.isHAEnabled(conf, nsId);
@@ -322,10 +336,17 @@ public class BlockManager {
       String thisNnId = HAUtil.getNameNodeId(conf, nsId);
       String otherNnId = HAUtil.getNameNodeIdOfOtherNode(conf, nsId);
       return new BlockTokenSecretManager(updateMin*60*1000L,
-          lifetimeMin*60*1000L, thisNnId.compareTo(otherNnId) < 0 ? 0 : 1);
+          lifetimeMin*60*1000L, thisNnId.compareTo(otherNnId) < 0 ? 0 : 1, null,
+          encryptionAlgorithm);
     } else {
       return new BlockTokenSecretManager(updateMin*60*1000L,
-          lifetimeMin*60*1000L, 0);
+          lifetimeMin*60*1000L, 0, null, encryptionAlgorithm);
+    }
+  }
+  
+  public void setBlockPoolId(String blockPoolId) {
+    if (isBlockTokenEnabled()) {
+      blockTokenSecretManager.setBlockPoolId(blockPoolId);
     }
   }
 
@@ -792,6 +813,14 @@ public class BlockManager {
       nodeinfo.needKeyUpdate = false;
     }
   }
+  
+  public DataEncryptionKey generateDataEncryptionKey() {
+    if (isBlockTokenEnabled() && encryptDataTransfer) {
+      return blockTokenSecretManager.generateDataEncryptionKey();
+    } else {
+      return null;
+    }
+  }
 
   /**
    * Clamp the specified replication between the minimum and the maximum

Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java?rev=1371518&r1=1371517&r2=1371518&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java Thu Aug  9 22:29:36 2012
@@ -50,6 +50,7 @@ import org.apache.hadoop.hdfs.protocol.D
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
@@ -195,7 +196,8 @@ public class JspHelper {
   public static void streamBlockInAscii(InetSocketAddress addr, String poolId,
       long blockId, Token<BlockTokenIdentifier> blockToken, long genStamp,
       long blockSize, long offsetIntoBlock, long chunkSizeToView,
-      JspWriter out, Configuration conf) throws IOException {
+      JspWriter out, Configuration conf, DataEncryptionKey encryptionKey)
+          throws IOException {
     if (chunkSizeToView == 0) return;
     Socket s = NetUtils.getDefaultSocketFactory(conf).createSocket();
     s.connect(addr, HdfsServerConstants.READ_TIMEOUT);
@@ -208,7 +210,7 @@ public class JspHelper {
     BlockReader blockReader = BlockReaderFactory.newBlockReader(
         conf, s, file,
         new ExtendedBlock(poolId, blockId, 0, genStamp), blockToken,
-        offsetIntoBlock, amtToRead);
+        offsetIntoBlock, amtToRead, encryptionKey);
         
     byte[] buf = new byte[(int)amtToRead];
     int readOffset = 0;

Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java?rev=1371518&r1=1371517&r2=1371518&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java Thu Aug  9 22:29:36 2012
@@ -39,6 +39,7 @@ import org.apache.hadoop.hdfs.protocol.L
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NodeType;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
 import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.util.ToolRunner;
 import org.apache.hadoop.util.VersionInfo;
 
 import com.google.common.base.Preconditions;
@@ -221,7 +222,7 @@ public abstract class Storage extends St
    * One of the storage directories.
    */
   @InterfaceAudience.Private
-  public static class StorageDirectory {
+  public static class StorageDirectory implements FormatConfirmable {
     final File root;              // root directory
     final boolean useLock;        // flag to enable storage lock
     final StorageDirType dirType; // storage dir type
@@ -576,6 +577,32 @@ public abstract class Storage extends St
         throw new IOException("Unexpected FS state: " + curState);
       }
     }
+    
+    /**
+     * @return true if the storage directory should prompt the user prior
+     * to formatting (i.e if the directory appears to contain some data)
+     * @throws IOException if the SD cannot be accessed due to an IO error
+     */
+    @Override
+    public boolean hasSomeData() throws IOException {
+      // Its alright for a dir not to exist, or to exist (properly accessible)
+      // and be completely empty.
+      if (!root.exists()) return false;
+      
+      if (!root.isDirectory()) {
+        // a file where you expect a directory should not cause silent
+        // formatting
+        return true;
+      }
+      
+      if (FileUtil.listFiles(root).length == 0) {
+        // Empty dir can format without prompt.
+        return false;
+      }
+      
+      return true;
+    }
+
 
     /**
      * Lock storage to provide exclusive access.
@@ -779,6 +806,68 @@ public abstract class Storage extends St
   }
   
   /**
+   * Iterate over each of the {@link FormatConfirmable} objects,
+   * potentially checking with the user whether it should be formatted.
+   * 
+   * If running in interactive mode, will prompt the user for each
+   * directory to allow them to format anyway. Otherwise, returns
+   * false, unless 'force' is specified.
+   * 
+   * @param force format regardless of whether dirs exist
+   * @param interactive prompt the user when a dir exists
+   * @return true if formatting should proceed
+   * @throws IOException if some storage cannot be accessed
+   */
+  public static boolean confirmFormat(
+      Iterable<? extends FormatConfirmable> items,
+      boolean force, boolean interactive) throws IOException {
+    for (FormatConfirmable item : items) {
+      if (!item.hasSomeData())
+        continue;
+      if (force) { // Don't confirm, always format.
+        System.err.println(
+            "Data exists in " + item + ". Formatting anyway.");
+        continue;
+      }
+      if (!interactive) { // Don't ask - always don't format
+        System.err.println(
+            "Running in non-interactive mode, and data appears to exist in " +
+            item + ". Not formatting.");
+        return false;
+      }
+      if (!ToolRunner.confirmPrompt("Re-format filesystem in " + item + " ?")) {
+        System.err.println("Format aborted in " + item);
+        return false;
+      }
+    }
+    
+    return true;
+  }
+  
+  /**
+   * Interface for classes which need to have the user confirm their
+   * formatting during NameNode -format and other similar operations.
+   * 
+   * This is currently a storage directory or journal manager.
+   */
+  @InterfaceAudience.Private
+  public interface FormatConfirmable {
+    /**
+     * @return true if the storage seems to have some valid data in it,
+     * and the user should be required to confirm the format. Otherwise,
+     * false.
+     * @throws IOException if the storage cannot be accessed at all.
+     */
+    public boolean hasSomeData() throws IOException;
+    
+    /**
+     * @return a string representation of the formattable item, suitable
+     * for display to the user inside a prompt
+     */
+    public String toString();
+  }
+  
+  /**
    * Get common storage fields.
    * Should be overloaded if additional fields need to be get.
    *