You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by to...@apache.org on 2012/08/10 00:30:33 UTC
svn commit: r1371518 [3/6] - in
/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project:
hadoop-hdfs-httpfs/dev-support/
hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/client/
hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/server/...
Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java?rev=1371518&r1=1371517&r2=1371518&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java Thu Aug 9 22:29:36 2012
@@ -24,11 +24,12 @@ import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
+import java.io.InputStream;
import java.io.InterruptedIOException;
+import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.BufferOverflowException;
-import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.EnumSet;
@@ -56,6 +57,9 @@ import org.apache.hadoop.hdfs.protocol.N
import org.apache.hadoop.hdfs.protocol.UnresolvedPathException;
import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol;
+import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferEncryptor;
+import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
+import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck;
import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
@@ -121,7 +125,7 @@ public class DFSOutputStream extends FSO
private long lastQueuedSeqno = -1;
private long lastAckedSeqno = -1;
private long bytesCurBlock = 0; // bytes writen in current block
- private int packetSize = 0; // write packet size, including the header.
+ private int packetSize = 0; // write packet size, not including the header.
private int chunksPerPacket = 0;
private volatile IOException lastException = null;
private long artificialSlowdown = 0;
@@ -142,28 +146,31 @@ public class DFSOutputStream extends FSO
int numChunks; // number of chunks currently in packet
int maxChunks; // max chunks in packet
- /** buffer for accumulating packet checksum and data */
- ByteBuffer buffer; // wraps buf, only one of these two may be non-null
byte[] buf;
/**
* buf is pointed into like follows:
* (C is checksum data, D is payload data)
*
- * [HHHHHCCCCC________________DDDDDDDDDDDDDDDD___]
- * ^ ^ ^ ^
- * | checksumPos dataStart dataPos
- * checksumStart
+ * [_________CCCCCCCCC________________DDDDDDDDDDDDDDDD___]
+ * ^ ^ ^ ^
+ * | checksumPos dataStart dataPos
+ * checksumStart
+ *
+ * Right before sending, we move the checksum data to immediately precede
+ * the actual data, and then insert the header into the buffer immediately
+ * preceding the checksum data, so we make sure to keep enough space in
+ * front of the checksum data to support the largest conceivable header.
*/
int checksumStart;
+ int checksumPos;
int dataStart;
int dataPos;
- int checksumPos;
private static final long HEART_BEAT_SEQNO = -1L;
/**
- * create a heartbeat packet
+ * Create a heartbeat packet.
*/
Packet() {
this.lastPacketInBlock = false;
@@ -171,17 +178,19 @@ public class DFSOutputStream extends FSO
this.offsetInBlock = 0;
this.seqno = HEART_BEAT_SEQNO;
- buffer = null;
- int packetSize = PacketHeader.PKT_HEADER_LEN + HdfsConstants.BYTES_IN_INTEGER;
- buf = new byte[packetSize];
+ buf = new byte[PacketHeader.PKT_MAX_HEADER_LEN];
- checksumStart = dataStart = packetSize;
- checksumPos = checksumStart;
- dataPos = dataStart;
+ checksumStart = checksumPos = dataPos = dataStart = PacketHeader.PKT_MAX_HEADER_LEN;
maxChunks = 0;
}
- // create a new packet
+ /**
+ * Create a new packet.
+ *
+ * @param pktSize maximum size of the packet, including checksum data and actual data.
+ * @param chunksPerPkt maximum number of chunks per packet.
+ * @param offsetInBlock offset in bytes into the HDFS block.
+ */
Packet(int pktSize, int chunksPerPkt, long offsetInBlock) {
this.lastPacketInBlock = false;
this.numChunks = 0;
@@ -189,25 +198,24 @@ public class DFSOutputStream extends FSO
this.seqno = currentSeqno;
currentSeqno++;
- buffer = null;
- buf = new byte[pktSize];
+ buf = new byte[PacketHeader.PKT_MAX_HEADER_LEN + pktSize];
- checksumStart = PacketHeader.PKT_HEADER_LEN;
+ checksumStart = PacketHeader.PKT_MAX_HEADER_LEN;
checksumPos = checksumStart;
- dataStart = checksumStart + chunksPerPkt * checksum.getChecksumSize();
+ dataStart = checksumStart + (chunksPerPkt * checksum.getChecksumSize());
dataPos = dataStart;
maxChunks = chunksPerPkt;
}
void writeData(byte[] inarray, int off, int len) {
- if ( dataPos + len > buf.length) {
+ if (dataPos + len > buf.length) {
throw new BufferOverflowException();
}
System.arraycopy(inarray, off, buf, dataPos, len);
dataPos += len;
}
- void writeChecksum(byte[] inarray, int off, int len) {
+ void writeChecksum(byte[] inarray, int off, int len) {
if (checksumPos + len > dataStart) {
throw new BufferOverflowException();
}
@@ -216,45 +224,38 @@ public class DFSOutputStream extends FSO
}
/**
- * Returns ByteBuffer that contains one full packet, including header.
+ * Write the full packet, including the header, to the given output stream.
*/
- ByteBuffer getBuffer() {
- /* Once this is called, no more data can be added to the packet.
- * setting 'buf' to null ensures that.
- * This is called only when the packet is ready to be sent.
- */
- if (buffer != null) {
- return buffer;
- }
-
- //prepare the header and close any gap between checksum and data.
-
- int dataLen = dataPos - dataStart;
- int checksumLen = checksumPos - checksumStart;
+ void writeTo(DataOutputStream stm) throws IOException {
+ final int dataLen = dataPos - dataStart;
+ final int checksumLen = checksumPos - checksumStart;
+ final int pktLen = HdfsConstants.BYTES_IN_INTEGER + dataLen + checksumLen;
+
+ PacketHeader header = new PacketHeader(
+ pktLen, offsetInBlock, seqno, lastPacketInBlock, dataLen, syncBlock);
if (checksumPos != dataStart) {
- /* move the checksum to cover the gap.
- * This can happen for the last packet.
- */
+ // Move the checksum to cover the gap. This can happen for the last
+ // packet or during an hflush/hsync call.
System.arraycopy(buf, checksumStart, buf,
dataStart - checksumLen , checksumLen);
+ checksumPos = dataStart;
+ checksumStart = checksumPos - checksumLen;
}
- int pktLen = HdfsConstants.BYTES_IN_INTEGER + dataLen + checksumLen;
+ final int headerStart = checksumStart - header.getSerializedSize();
+ assert checksumStart + 1 >= header.getSerializedSize();
+ assert checksumPos == dataStart;
+ assert headerStart >= 0;
+ assert headerStart + header.getSerializedSize() == checksumStart;
- //normally dataStart == checksumPos, i.e., offset is zero.
- buffer = ByteBuffer.wrap(
- buf, dataStart - checksumPos,
- PacketHeader.PKT_HEADER_LEN + pktLen - HdfsConstants.BYTES_IN_INTEGER);
- buf = null;
- buffer.mark();
-
- PacketHeader header = new PacketHeader(
- pktLen, offsetInBlock, seqno, lastPacketInBlock, dataLen, syncBlock);
- header.putInBuffer(buffer);
+ // Copy the header data into the buffer immediately preceding the checksum
+ // data.
+ System.arraycopy(header.getBytes(), 0, buf, headerStart,
+ header.getSerializedSize());
- buffer.reset();
- return buffer;
+ // Write the now contiguous full packet to the output stream.
+ stm.write(buf, headerStart, header.getSerializedSize() + checksumLen + dataLen);
}
// get the packet's last byte's offset in the block
@@ -497,8 +498,6 @@ public class DFSOutputStream extends FSO
}
// send the packet
- ByteBuffer buf = one.getBuffer();
-
synchronized (dataQueue) {
// move packet from dataQueue to ackQueue
if (!one.isHeartbeatPacket()) {
@@ -514,8 +513,8 @@ public class DFSOutputStream extends FSO
}
// write out data to remote datanode
- try {
- blockStream.write(buf.array(), buf.position(), buf.remaining());
+ try {
+ one.writeTo(blockStream);
blockStream.flush();
} catch (IOException e) {
// HDFS-3398 treat primary DN is down since client is unable to
@@ -867,16 +866,26 @@ public class DFSOutputStream extends FSO
try {
sock = createSocketForPipeline(src, 2, dfsClient);
final long writeTimeout = dfsClient.getDatanodeWriteTimeout(2);
- out = new DataOutputStream(new BufferedOutputStream(
- NetUtils.getOutputStream(sock, writeTimeout),
+
+ OutputStream unbufOut = NetUtils.getOutputStream(sock, writeTimeout);
+ InputStream unbufIn = NetUtils.getInputStream(sock);
+ if (dfsClient.shouldEncryptData()) {
+ IOStreamPair encryptedStreams =
+ DataTransferEncryptor.getEncryptedStreams(
+ unbufOut, unbufIn, dfsClient.getDataEncryptionKey());
+ unbufOut = encryptedStreams.out;
+ unbufIn = encryptedStreams.in;
+ }
+ out = new DataOutputStream(new BufferedOutputStream(unbufOut,
HdfsConstants.SMALL_BUFFER_SIZE));
+ in = new DataInputStream(unbufIn);
//send the TRANSFER_BLOCK request
new Sender(out).transferBlock(block, blockToken, dfsClient.clientName,
targets);
+ out.flush();
//ack
- in = new DataInputStream(NetUtils.getInputStream(sock));
BlockOpResponseProto response =
BlockOpResponseProto.parseFrom(HdfsProtoUtil.vintPrefixed(in));
if (SUCCESS != response.getStatus()) {
@@ -1034,77 +1043,98 @@ public class DFSOutputStream extends FSO
// persist blocks on namenode on next flush
persistBlocks.set(true);
- boolean result = false;
- DataOutputStream out = null;
- try {
- assert null == s : "Previous socket unclosed";
- s = createSocketForPipeline(nodes[0], nodes.length, dfsClient);
- long writeTimeout = dfsClient.getDatanodeWriteTimeout(nodes.length);
-
- //
- // Xmit header info to datanode
- //
- out = new DataOutputStream(new BufferedOutputStream(
- NetUtils.getOutputStream(s, writeTimeout),
- HdfsConstants.SMALL_BUFFER_SIZE));
-
- assert null == blockReplyStream : "Previous blockReplyStream unclosed";
- blockReplyStream = new DataInputStream(NetUtils.getInputStream(s));
-
- // send the request
- new Sender(out).writeBlock(block, accessToken, dfsClient.clientName,
- nodes, null, recoveryFlag? stage.getRecoveryStage() : stage,
- nodes.length, block.getNumBytes(), bytesSent, newGS, checksum);
-
- // receive ack for connect
- BlockOpResponseProto resp = BlockOpResponseProto.parseFrom(
- HdfsProtoUtil.vintPrefixed(blockReplyStream));
- pipelineStatus = resp.getStatus();
- firstBadLink = resp.getFirstBadLink();
-
- if (pipelineStatus != SUCCESS) {
- if (pipelineStatus == Status.ERROR_ACCESS_TOKEN) {
- throw new InvalidBlockTokenException(
- "Got access token error for connect ack with firstBadLink as "
- + firstBadLink);
- } else {
- throw new IOException("Bad connect ack with firstBadLink as "
- + firstBadLink);
+ int refetchEncryptionKey = 1;
+ while (true) {
+ boolean result = false;
+ DataOutputStream out = null;
+ try {
+ assert null == s : "Previous socket unclosed";
+ assert null == blockReplyStream : "Previous blockReplyStream unclosed";
+ s = createSocketForPipeline(nodes[0], nodes.length, dfsClient);
+ long writeTimeout = dfsClient.getDatanodeWriteTimeout(nodes.length);
+
+ OutputStream unbufOut = NetUtils.getOutputStream(s, writeTimeout);
+ InputStream unbufIn = NetUtils.getInputStream(s);
+ if (dfsClient.shouldEncryptData()) {
+ IOStreamPair encryptedStreams =
+ DataTransferEncryptor.getEncryptedStreams(unbufOut,
+ unbufIn, dfsClient.getDataEncryptionKey());
+ unbufOut = encryptedStreams.out;
+ unbufIn = encryptedStreams.in;
+ }
+ out = new DataOutputStream(new BufferedOutputStream(unbufOut,
+ HdfsConstants.SMALL_BUFFER_SIZE));
+ blockReplyStream = new DataInputStream(unbufIn);
+
+ //
+ // Xmit header info to datanode
+ //
+
+ // send the request
+ new Sender(out).writeBlock(block, accessToken, dfsClient.clientName,
+ nodes, null, recoveryFlag? stage.getRecoveryStage() : stage,
+ nodes.length, block.getNumBytes(), bytesSent, newGS, checksum);
+
+ // receive ack for connect
+ BlockOpResponseProto resp = BlockOpResponseProto.parseFrom(
+ HdfsProtoUtil.vintPrefixed(blockReplyStream));
+ pipelineStatus = resp.getStatus();
+ firstBadLink = resp.getFirstBadLink();
+
+ if (pipelineStatus != SUCCESS) {
+ if (pipelineStatus == Status.ERROR_ACCESS_TOKEN) {
+ throw new InvalidBlockTokenException(
+ "Got access token error for connect ack with firstBadLink as "
+ + firstBadLink);
+ } else {
+ throw new IOException("Bad connect ack with firstBadLink as "
+ + firstBadLink);
+ }
}
- }
- assert null == blockStream : "Previous blockStream unclosed";
- blockStream = out;
- result = true; // success
-
- } catch (IOException ie) {
-
- DFSClient.LOG.info("Exception in createBlockOutputStream", ie);
-
- // find the datanode that matches
- if (firstBadLink.length() != 0) {
- for (int i = 0; i < nodes.length; i++) {
- if (nodes[i].getXferAddr().equals(firstBadLink)) {
- errorIndex = i;
- break;
+ assert null == blockStream : "Previous blockStream unclosed";
+ blockStream = out;
+ result = true; // success
+
+ } catch (IOException ie) {
+ DFSClient.LOG.info("Exception in createBlockOutputStream", ie);
+ if (ie instanceof InvalidEncryptionKeyException && refetchEncryptionKey > 0) {
+ DFSClient.LOG.info("Will fetch a new encryption key and retry, "
+ + "encryption key was invalid when connecting to "
+ + nodes[0].getXferAddr() + " : " + ie);
+ // The encryption key used is invalid.
+ refetchEncryptionKey--;
+ dfsClient.clearDataEncryptionKey();
+ // Don't close the socket/exclude this node just yet. Try again with
+ // a new encryption key.
+ continue;
+ }
+
+ // find the datanode that matches
+ if (firstBadLink.length() != 0) {
+ for (int i = 0; i < nodes.length; i++) {
+ if (nodes[i].getXferAddr().equals(firstBadLink)) {
+ errorIndex = i;
+ break;
+ }
}
+ } else {
+ errorIndex = 0;
+ }
+ hasError = true;
+ setLastException(ie);
+ result = false; // error
+ } finally {
+ if (!result) {
+ IOUtils.closeSocket(s);
+ s = null;
+ IOUtils.closeStream(out);
+ out = null;
+ IOUtils.closeStream(blockReplyStream);
+ blockReplyStream = null;
}
- } else {
- errorIndex = 0;
- }
- hasError = true;
- setLastException(ie);
- result = false; // error
- } finally {
- if (!result) {
- IOUtils.closeSocket(s);
- s = null;
- IOUtils.closeStream(out);
- out = null;
- IOUtils.closeStream(blockReplyStream);
- blockReplyStream = null;
}
+ return result;
}
- return result;
}
private LocatedBlock locateFollowingBlock(long start,
@@ -1322,9 +1352,8 @@ public class DFSOutputStream extends FSO
private void computePacketChunkSize(int psize, int csize) {
int chunkSize = csize + checksum.getChecksumSize();
- int n = PacketHeader.PKT_HEADER_LEN;
- chunksPerPacket = Math.max((psize - n + chunkSize-1)/chunkSize, 1);
- packetSize = n + chunkSize*chunksPerPacket;
+ chunksPerPacket = Math.max(psize/chunkSize, 1);
+ packetSize = chunkSize*chunksPerPacket;
if (DFSClient.LOG.isDebugEnabled()) {
DFSClient.LOG.debug("computePacketChunkSize: src=" + src +
", chunkSize=" + chunkSize +
@@ -1438,8 +1467,7 @@ public class DFSOutputStream extends FSO
// indicate the end of block and reset bytesCurBlock.
//
if (bytesCurBlock == blockSize) {
- currentPacket = new Packet(PacketHeader.PKT_HEADER_LEN, 0,
- bytesCurBlock);
+ currentPacket = new Packet(0, 0, bytesCurBlock);
currentPacket.lastPacketInBlock = true;
currentPacket.syncBlock = shouldSyncBlock;
waitAndQueueCurrentPacket();
@@ -1709,8 +1737,7 @@ public class DFSOutputStream extends FSO
if (bytesCurBlock != 0) {
// send an empty packet to mark the end of the block
- currentPacket = new Packet(PacketHeader.PKT_HEADER_LEN, 0,
- bytesCurBlock);
+ currentPacket = new Packet(0, 0, bytesCurBlock);
currentPacket.lastPacketInBlock = true;
currentPacket.syncBlock = shouldSyncBlock;
}
@@ -1763,8 +1790,7 @@ public class DFSOutputStream extends FSO
@VisibleForTesting
public synchronized void setChunksPerPacket(int value) {
chunksPerPacket = Math.min(chunksPerPacket, value);
- packetSize = PacketHeader.PKT_HEADER_LEN +
- (checksum.getBytesPerChecksum() +
+ packetSize = (checksum.getBytesPerChecksum() +
checksum.getChecksumSize()) * chunksPerPacket;
}
Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java?rev=1371518&r1=1371517&r2=1371518&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java Thu Aug 9 22:29:36 2012
@@ -247,7 +247,7 @@ public class DistributedFileSystem exten
FsPermission absolutePermission, EnumSet<CreateFlag> flag, int bufferSize,
short replication, long blockSize, Progressable progress,
int bytesPerChecksum) throws IOException {
- statistics.incrementReadOps(1);
+ statistics.incrementWriteOps(1);
return new HdfsDataOutputStream(dfs.primitiveCreate(getPathName(f),
absolutePermission, flag, true, replication, blockSize,
progress, bufferSize, bytesPerChecksum),statistics);
Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java?rev=1371518&r1=1371517&r2=1371518&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java Thu Aug 9 22:29:36 2012
@@ -33,6 +33,7 @@ import org.apache.hadoop.fs.FSInputCheck
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
+import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
@@ -458,7 +459,9 @@ public class RemoteBlockReader extends F
void sendReadResult(Socket sock, Status statusCode) {
assert !sentStatusCode : "already sent status code to " + sock;
try {
- RemoteBlockReader2.writeReadResult(sock, statusCode);
+ RemoteBlockReader2.writeReadResult(
+ NetUtils.getOutputStream(sock, HdfsServerConstants.WRITE_TIMEOUT),
+ statusCode);
sentStatusCode = true;
} catch (IOException e) {
// It's ok not to be able to send this. But something is probably wrong.
@@ -484,4 +487,11 @@ public class RemoteBlockReader extends F
throw new UnsupportedOperationException("readDirect unsupported in RemoteBlockReader");
}
+ @Override
+ public IOStreamPair getStreams() {
+ // This class doesn't support encryption, which is the only thing this
+ // method is used for. See HDFS-3637.
+ return null;
+ }
+
}
Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java?rev=1371518&r1=1371517&r2=1371518&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java Thu Aug 9 22:29:36 2012
@@ -23,6 +23,7 @@ import java.io.BufferedOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
+import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
@@ -32,26 +33,26 @@ import java.nio.channels.ReadableByteCha
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.fs.ChecksumException;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
+import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferEncryptor;
+import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
+import org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver;
import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientReadStatusProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReadOpChecksumInfoProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
+import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
-import org.apache.hadoop.hdfs.util.DirectBufferPool;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.net.SocketInputWrapper;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.DataChecksum;
-import com.google.common.base.Preconditions;
-
/**
* This is a wrapper around connection to datanode
* and understands checksum, offset etc.
@@ -83,15 +84,15 @@ public class RemoteBlockReader2 impleme
static final Log LOG = LogFactory.getLog(RemoteBlockReader2.class);
- Socket dnSock; //for now just sending the status code (e.g. checksumOk) after the read.
+ Socket dnSock;
+ // for now just sending the status code (e.g. checksumOk) after the read.
+ private IOStreamPair ioStreams;
private final ReadableByteChannel in;
private DataChecksum checksum;
- private PacketHeader curHeader;
- private ByteBuffer curPacketBuf = null;
+ private PacketReceiver packetReceiver = new PacketReceiver(true);
private ByteBuffer curDataSlice = null;
-
/** offset in block of the last chunk received */
private long lastSeqNo = -1;
@@ -99,10 +100,6 @@ public class RemoteBlockReader2 impleme
private long startOffset;
private final String filename;
- private static DirectBufferPool bufferPool = new DirectBufferPool();
- private final ByteBuffer headerBuf = ByteBuffer.allocate(
- PacketHeader.PKT_HEADER_LEN);
-
private final int bytesPerChecksum;
private final int checksumSize;
@@ -126,7 +123,7 @@ public class RemoteBlockReader2 impleme
public synchronized int read(byte[] buf, int off, int len)
throws IOException {
- if (curPacketBuf == null || curDataSlice.remaining() == 0 && bytesNeededToFinish > 0) {
+ if (curDataSlice == null || curDataSlice.remaining() == 0 && bytesNeededToFinish > 0) {
readNextPacket();
}
if (curDataSlice.remaining() == 0) {
@@ -143,7 +140,7 @@ public class RemoteBlockReader2 impleme
@Override
public int read(ByteBuffer buf) throws IOException {
- if (curPacketBuf == null || curDataSlice.remaining() == 0 && bytesNeededToFinish > 0) {
+ if (curDataSlice == null || curDataSlice.remaining() == 0 && bytesNeededToFinish > 0) {
readNextPacket();
}
if (curDataSlice.remaining() == 0) {
@@ -161,11 +158,13 @@ public class RemoteBlockReader2 impleme
}
private void readNextPacket() throws IOException {
- Preconditions.checkState(curHeader == null || !curHeader.isLastPacketInBlock());
-
//Read packet headers.
- readPacketHeader();
+ packetReceiver.receiveNextPacket(in);
+ PacketHeader curHeader = packetReceiver.getHeader();
+ curDataSlice = packetReceiver.getDataSlice();
+ assert curDataSlice.capacity() == curHeader.getDataLen();
+
if (LOG.isTraceEnabled()) {
LOG.trace("DFSClient readNextPacket got header " + curHeader);
}
@@ -179,17 +178,20 @@ public class RemoteBlockReader2 impleme
if (curHeader.getDataLen() > 0) {
int chunks = 1 + (curHeader.getDataLen() - 1) / bytesPerChecksum;
int checksumsLen = chunks * checksumSize;
- int bufsize = checksumsLen + curHeader.getDataLen();
+
+ assert packetReceiver.getChecksumSlice().capacity() == checksumsLen :
+ "checksum slice capacity=" + packetReceiver.getChecksumSlice().capacity() +
+ " checksumsLen=" + checksumsLen;
- resetPacketBuffer(checksumsLen, curHeader.getDataLen());
-
lastSeqNo = curHeader.getSeqno();
- if (bufsize > 0) {
- readChannelFully(in, curPacketBuf);
- curPacketBuf.flip();
- if (verifyChecksum) {
- verifyPacketChecksums();
- }
+ if (verifyChecksum && curDataSlice.remaining() > 0) {
+ // N.B.: the checksum error offset reported here is actually
+ // relative to the start of the block, not the start of the file.
+ // This is slightly misleading, but preserves the behavior from
+ // the older BlockReader.
+ checksum.verifyChunkedSums(curDataSlice,
+ packetReceiver.getChecksumSlice(),
+ filename, curHeader.getOffsetInBlock());
}
bytesNeededToFinish -= curHeader.getDataLen();
}
@@ -206,46 +208,13 @@ public class RemoteBlockReader2 impleme
if (bytesNeededToFinish <= 0) {
readTrailingEmptyPacket();
if (verifyChecksum) {
- sendReadResult(dnSock, Status.CHECKSUM_OK);
+ sendReadResult(Status.CHECKSUM_OK);
} else {
- sendReadResult(dnSock, Status.SUCCESS);
- }
- }
- }
-
- private void verifyPacketChecksums() throws ChecksumException {
- // N.B.: the checksum error offset reported here is actually
- // relative to the start of the block, not the start of the file.
- // This is slightly misleading, but preserves the behavior from
- // the older BlockReader.
- checksum.verifyChunkedSums(curDataSlice, curPacketBuf,
- filename, curHeader.getOffsetInBlock());
- }
-
- private static void readChannelFully(ReadableByteChannel ch, ByteBuffer buf)
- throws IOException {
- while (buf.remaining() > 0) {
- int n = ch.read(buf);
- if (n < 0) {
- throw new IOException("Premature EOF reading from " + ch);
+ sendReadResult(Status.SUCCESS);
}
}
}
-
- private void resetPacketBuffer(int checksumsLen, int dataLen) {
- int packetLen = checksumsLen + dataLen;
- if (curPacketBuf == null ||
- curPacketBuf.capacity() < packetLen) {
- returnPacketBufToPool();
- curPacketBuf = bufferPool.getBuffer(packetLen);
- }
- curPacketBuf.position(checksumsLen);
- curDataSlice = curPacketBuf.slice();
- curDataSlice.limit(dataLen);
- curPacketBuf.clear();
- curPacketBuf.limit(checksumsLen + dataLen);
- }
-
+
@Override
public synchronized long skip(long n) throws IOException {
/* How can we make sure we don't throw a ChecksumException, at least
@@ -266,23 +235,14 @@ public class RemoteBlockReader2 impleme
return nSkipped;
}
- private void readPacketHeader() throws IOException {
- headerBuf.clear();
- readChannelFully(in, headerBuf);
- headerBuf.flip();
- if (curHeader == null) curHeader = new PacketHeader();
- curHeader.readFields(headerBuf);
- }
-
private void readTrailingEmptyPacket() throws IOException {
if (LOG.isTraceEnabled()) {
LOG.trace("Reading empty packet at end of read");
}
- headerBuf.clear();
- readChannelFully(in, headerBuf);
- headerBuf.flip();
- PacketHeader trailer = new PacketHeader();
- trailer.readFields(headerBuf);
+
+ packetReceiver.receiveNextPacket(in);
+
+ PacketHeader trailer = packetReceiver.getHeader();
if (!trailer.isLastPacketInBlock() ||
trailer.getDataLen() != 0) {
throw new IOException("Expected empty end-of-read packet! Header: " +
@@ -292,9 +252,11 @@ public class RemoteBlockReader2 impleme
protected RemoteBlockReader2(String file, String bpid, long blockId,
ReadableByteChannel in, DataChecksum checksum, boolean verifyChecksum,
- long startOffset, long firstChunkOffset, long bytesToRead, Socket dnSock) {
+ long startOffset, long firstChunkOffset, long bytesToRead, Socket dnSock,
+ IOStreamPair ioStreams) {
// Path is used only for printing block and file information in debug
this.dnSock = dnSock;
+ this.ioStreams = ioStreams;
this.in = in;
this.checksum = checksum;
this.verifyChecksum = verifyChecksum;
@@ -313,7 +275,7 @@ public class RemoteBlockReader2 impleme
@Override
public synchronized void close() throws IOException {
- returnPacketBufToPool();
+ packetReceiver.close();
startOffset = -1;
checksum = null;
@@ -324,24 +286,6 @@ public class RemoteBlockReader2 impleme
// in will be closed when its Socket is closed.
}
- @Override
- protected void finalize() throws Throwable {
- try {
- // just in case it didn't get closed, we
- // may as well still try to return the buffer
- returnPacketBufToPool();
- } finally {
- super.finalize();
- }
- }
-
- private void returnPacketBufToPool() {
- if (curPacketBuf != null) {
- bufferPool.returnBuffer(curPacketBuf);
- curPacketBuf = null;
- }
- }
-
/**
* Take the socket used to talk to the DN.
*/
@@ -369,24 +313,23 @@ public class RemoteBlockReader2 impleme
* closing our connection (which we will re-open), but won't affect
* data correctness.
*/
- void sendReadResult(Socket sock, Status statusCode) {
- assert !sentStatusCode : "already sent status code to " + sock;
+ void sendReadResult(Status statusCode) {
+ assert !sentStatusCode : "already sent status code to " + dnSock;
try {
- writeReadResult(sock, statusCode);
+ writeReadResult(ioStreams.out, statusCode);
sentStatusCode = true;
} catch (IOException e) {
// It's ok not to be able to send this. But something is probably wrong.
LOG.info("Could not send read status (" + statusCode + ") to datanode " +
- sock.getInetAddress() + ": " + e.getMessage());
+ dnSock.getInetAddress() + ": " + e.getMessage());
}
}
/**
* Serialize the actual read result on the wire.
*/
- static void writeReadResult(Socket sock, Status statusCode)
+ static void writeReadResult(OutputStream out, Status statusCode)
throws IOException {
- OutputStream out = NetUtils.getOutputStream(sock, HdfsServerConstants.WRITE_TIMEOUT);
ClientReadStatusProto.newBuilder()
.setStatus(statusCode)
@@ -434,25 +377,32 @@ public class RemoteBlockReader2 impleme
* @param clientName Client name
* @return New BlockReader instance, or null on error.
*/
- public static BlockReader newBlockReader( Socket sock, String file,
+ public static BlockReader newBlockReader(Socket sock, String file,
ExtendedBlock block,
Token<BlockTokenIdentifier> blockToken,
long startOffset, long len,
int bufferSize, boolean verifyChecksum,
- String clientName)
+ String clientName,
+ DataEncryptionKey encryptionKey,
+ IOStreamPair ioStreams)
throws IOException {
+
+ ReadableByteChannel ch;
+ if (ioStreams.in instanceof SocketInputWrapper) {
+ ch = ((SocketInputWrapper)ioStreams.in).getReadableByteChannel();
+ } else {
+ ch = (ReadableByteChannel) ioStreams.in;
+ }
+
// in and out will be closed when sock is closed (by the caller)
final DataOutputStream out = new DataOutputStream(new BufferedOutputStream(
- NetUtils.getOutputStream(sock,
- HdfsServerConstants.WRITE_TIMEOUT)));
+ ioStreams.out));
new Sender(out).readBlock(block, blockToken, clientName, startOffset, len);
//
- // Get bytes in block, set streams
+ // Get bytes in block
//
- SocketInputWrapper sin = NetUtils.getInputStream(sock);
- ReadableByteChannel ch = sin.getReadableByteChannel();
- DataInputStream in = new DataInputStream(sin);
+ DataInputStream in = new DataInputStream(ioStreams.in);
BlockOpResponseProto status = BlockOpResponseProto.parseFrom(
vintPrefixed(in));
@@ -474,7 +424,8 @@ public class RemoteBlockReader2 impleme
}
return new RemoteBlockReader2(file, block.getBlockPoolId(), block.getBlockId(),
- ch, checksum, verifyChecksum, startOffset, firstChunkOffset, len, sock);
+ ch, checksum, verifyChecksum, startOffset, firstChunkOffset, len, sock,
+ ioStreams);
}
static void checkSuccess(
@@ -498,4 +449,9 @@ public class RemoteBlockReader2 impleme
}
}
}
+
+ @Override
+ public IOStreamPair getStreams() {
+ return ioStreams;
+ }
}
Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/SocketCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/SocketCache.java?rev=1371518&r1=1371517&r2=1371518&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/SocketCache.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/SocketCache.java Thu Aug 9 22:29:36 2012
@@ -18,6 +18,7 @@
package org.apache.hadoop.hdfs;
+import java.io.Closeable;
import java.net.Socket;
import java.net.SocketAddress;
@@ -29,6 +30,8 @@ import com.google.common.base.Preconditi
import com.google.common.collect.LinkedListMultimap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
import org.apache.hadoop.io.IOUtils;
/**
@@ -37,7 +40,7 @@ import org.apache.hadoop.io.IOUtils;
class SocketCache {
static final Log LOG = LogFactory.getLog(SocketCache.class);
- private final LinkedListMultimap<SocketAddress, Socket> multimap;
+ private final LinkedListMultimap<SocketAddress, SocketAndStreams> multimap;
private final int capacity;
/**
@@ -57,21 +60,21 @@ class SocketCache {
* @param remote Remote address the socket is connected to.
* @return A socket with unknown state, possibly closed underneath. Or null.
*/
- public synchronized Socket get(SocketAddress remote) {
+ public synchronized SocketAndStreams get(SocketAddress remote) {
if (capacity <= 0) { // disabled
return null;
}
- List<Socket> socklist = multimap.get(remote);
+ List<SocketAndStreams> socklist = multimap.get(remote);
if (socklist == null) {
return null;
}
- Iterator<Socket> iter = socklist.iterator();
+ Iterator<SocketAndStreams> iter = socklist.iterator();
while (iter.hasNext()) {
- Socket candidate = iter.next();
+ SocketAndStreams candidate = iter.next();
iter.remove();
- if (!candidate.isClosed()) {
+ if (!candidate.sock.isClosed()) {
return candidate;
}
}
@@ -82,10 +85,11 @@ class SocketCache {
* Give an unused socket to the cache.
* @param sock socket not used by anyone.
*/
- public synchronized void put(Socket sock) {
+ public synchronized void put(Socket sock, IOStreamPair ioStreams) {
+ SocketAndStreams s = new SocketAndStreams(sock, ioStreams);
if (capacity <= 0) {
// Cache disabled.
- IOUtils.closeSocket(sock);
+ s.close();
return;
}
@@ -102,7 +106,7 @@ class SocketCache {
if (capacity == multimap.size()) {
evictOldest();
}
- multimap.put(remoteAddr, sock);
+ multimap.put(remoteAddr, new SocketAndStreams(sock, ioStreams));
}
public synchronized int size() {
@@ -113,23 +117,23 @@ class SocketCache {
* Evict the oldest entry in the cache.
*/
private synchronized void evictOldest() {
- Iterator<Entry<SocketAddress, Socket>> iter =
+ Iterator<Entry<SocketAddress, SocketAndStreams>> iter =
multimap.entries().iterator();
if (!iter.hasNext()) {
throw new IllegalStateException("Cannot evict from empty cache!");
}
- Entry<SocketAddress, Socket> entry = iter.next();
+ Entry<SocketAddress, SocketAndStreams> entry = iter.next();
iter.remove();
- Socket sock = entry.getValue();
- IOUtils.closeSocket(sock);
+ SocketAndStreams s = entry.getValue();
+ s.close();
}
/**
* Empty the cache, and close all sockets.
*/
public synchronized void clear() {
- for (Socket sock : multimap.values()) {
- IOUtils.closeSocket(sock);
+ for (SocketAndStreams s : multimap.values()) {
+ s.close();
}
multimap.clear();
}
@@ -138,5 +142,25 @@ class SocketCache {
protected void finalize() {
clear();
}
+
+ @InterfaceAudience.Private
+ static class SocketAndStreams implements Closeable {
+ public final Socket sock;
+ public final IOStreamPair ioStreams;
+
+ public SocketAndStreams(Socket s, IOStreamPair ioStreams) {
+ this.sock = s;
+ this.ioStreams = ioStreams;
+ }
+
+ @Override
+ public void close() {
+ if (ioStreams != null) {
+ IOUtils.closeStream(ioStreams.in);
+ IOUtils.closeStream(ioStreams.out);
+ }
+ IOUtils.closeSocket(sock);
+ }
+ }
}
Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java?rev=1371518&r1=1371517&r2=1371518&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java Thu Aug 9 22:29:36 2012
@@ -44,6 +44,7 @@ import org.apache.hadoop.security.Access
import org.apache.hadoop.security.KerberosInfo;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenInfo;
+import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSelector;
@@ -941,4 +942,11 @@ public interface ClientProtocol {
*/
public void cancelDelegationToken(Token<DelegationTokenIdentifier> token)
throws IOException;
+
+ /**
+ * @return encryption key so a client can encrypt data sent via the
+ * DataTransferProtocol to/from DataNodes.
+ * @throws IOException
+ */
+ public DataEncryptionKey getDataEncryptionKey() throws IOException;
}
Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketHeader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketHeader.java?rev=1371518&r1=1371517&r2=1371518&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketHeader.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketHeader.java Thu Aug 9 22:29:36 2012
@@ -27,14 +27,31 @@ import org.apache.hadoop.classification.
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.PacketHeaderProto;
import org.apache.hadoop.hdfs.util.ByteBufferOutputStream;
+import com.google.common.base.Preconditions;
+import com.google.common.primitives.Shorts;
+import com.google.common.primitives.Ints;
+import com.google.protobuf.InvalidProtocolBufferException;
+
/**
* Header data for each packet that goes through the read/write pipelines.
+ * Includes all of the information about the packet, excluding checksums and
+ * actual data.
+ *
+ * This data includes:
+ * - the offset in bytes into the HDFS block of the data in this packet
+ * - the sequence number of this packet in the pipeline
+ * - whether or not this is the last packet in the pipeline
+ * - the length of the data in this packet
+ * - whether or not this packet should be synced by the DNs.
+ *
+ * When serialized, this header is written out as a protocol buffer, preceded
+ * by a 4-byte integer representing the full packet length, and a 2-byte short
+ * representing the header length.
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class PacketHeader {
- /** Header size for a packet */
- private static final int PROTO_SIZE =
+ private static final int MAX_PROTO_SIZE =
PacketHeaderProto.newBuilder()
.setOffsetInBlock(0)
.setSeqno(0)
@@ -42,8 +59,10 @@ public class PacketHeader {
.setDataLen(0)
.setSyncBlock(false)
.build().getSerializedSize();
- public static final int PKT_HEADER_LEN =
- 6 + PROTO_SIZE;
+ public static final int PKT_LENGTHS_LEN =
+ Ints.BYTES + Shorts.BYTES;
+ public static final int PKT_MAX_HEADER_LEN =
+ PKT_LENGTHS_LEN + MAX_PROTO_SIZE;
private int packetLen;
private PacketHeaderProto proto;
@@ -54,13 +73,25 @@ public class PacketHeader {
public PacketHeader(int packetLen, long offsetInBlock, long seqno,
boolean lastPacketInBlock, int dataLen, boolean syncBlock) {
this.packetLen = packetLen;
- proto = PacketHeaderProto.newBuilder()
+ Preconditions.checkArgument(packetLen >= Ints.BYTES,
+ "packet len %s should always be at least 4 bytes",
+ packetLen);
+
+ PacketHeaderProto.Builder builder = PacketHeaderProto.newBuilder()
.setOffsetInBlock(offsetInBlock)
.setSeqno(seqno)
.setLastPacketInBlock(lastPacketInBlock)
- .setDataLen(dataLen)
- .setSyncBlock(syncBlock)
- .build();
+ .setDataLen(dataLen);
+
+ if (syncBlock) {
+ // Only set syncBlock if it is specified.
+ // This is wire-incompatible with Hadoop 2.0.0-alpha due to HDFS-3721
+ // because it changes the length of the packet header, and BlockReceiver
+ // in that version did not support variable-length headers.
+ builder.setSyncBlock(syncBlock);
+ }
+
+ proto = builder.build();
}
public int getDataLen() {
@@ -90,10 +121,16 @@ public class PacketHeader {
@Override
public String toString() {
return "PacketHeader with packetLen=" + packetLen +
- "Header data: " +
+ " header data: " +
proto.toString();
}
+ public void setFieldsFromData(
+ int packetLen, byte[] headerData) throws InvalidProtocolBufferException {
+ this.packetLen = packetLen;
+ proto = PacketHeaderProto.parseFrom(headerData);
+ }
+
public void readFields(ByteBuffer buf) throws IOException {
packetLen = buf.getInt();
short protoLen = buf.getShort();
@@ -110,14 +147,21 @@ public class PacketHeader {
proto = PacketHeaderProto.parseFrom(data);
}
+ /**
+ * @return the number of bytes necessary to write out this header,
+ * including the length-prefixing of the payload and header
+ */
+ public int getSerializedSize() {
+ return PKT_LENGTHS_LEN + proto.getSerializedSize();
+ }
/**
* Write the header into the buffer.
* This requires that PKT_HEADER_LEN bytes are available.
*/
public void putInBuffer(final ByteBuffer buf) {
- assert proto.getSerializedSize() == PROTO_SIZE
- : "Expected " + (PROTO_SIZE) + " got: " + proto.getSerializedSize();
+ assert proto.getSerializedSize() <= MAX_PROTO_SIZE
+ : "Expected " + (MAX_PROTO_SIZE) + " got: " + proto.getSerializedSize();
try {
buf.putInt(packetLen);
buf.putShort((short) proto.getSerializedSize());
@@ -128,12 +172,18 @@ public class PacketHeader {
}
public void write(DataOutputStream out) throws IOException {
- assert proto.getSerializedSize() == PROTO_SIZE
- : "Expected " + (PROTO_SIZE) + " got: " + proto.getSerializedSize();
+ assert proto.getSerializedSize() <= MAX_PROTO_SIZE
+ : "Expected " + (MAX_PROTO_SIZE) + " got: " + proto.getSerializedSize();
out.writeInt(packetLen);
out.writeShort(proto.getSerializedSize());
proto.writeTo(out);
}
+
+ public byte[] getBytes() {
+ ByteBuffer buf = ByteBuffer.allocate(getSerializedSize());
+ putInBuffer(buf);
+ return buf.array();
+ }
/**
* Perform a sanity check on the packet, returning true if it is sane.
Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java?rev=1371518&r1=1371517&r2=1371518&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java Thu Aug 9 22:29:36 2012
@@ -38,10 +38,10 @@ import org.apache.hadoop.hdfs.protocol.p
@InterfaceAudience.Private
@InterfaceStability.Evolving
public abstract class Receiver implements DataTransferProtocol {
- protected final DataInputStream in;
-
- /** Create a receiver for DataTransferProtocol with a socket. */
- protected Receiver(final DataInputStream in) {
+ protected DataInputStream in;
+
+ /** Initialize a receiver for DataTransferProtocol with a socket. */
+ protected void initialize(final DataInputStream in) {
this.in = in;
}
Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java?rev=1371518&r1=1371517&r2=1371518&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java Thu Aug 9 22:29:36 2012
@@ -58,6 +58,8 @@ import org.apache.hadoop.hdfs.protocol.p
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.FsyncResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetAdditionalDatanodeRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetAdditionalDatanodeResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetDataEncryptionKeyRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetDataEncryptionKeyResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetBlockLocationsRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetBlockLocationsResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetBlockLocationsResponseProto.Builder;
@@ -127,6 +129,7 @@ import org.apache.hadoop.hdfs.protocol.p
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeIDProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeInfoProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.LocatedBlockProto;
+import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport;
import org.apache.hadoop.io.Text;
@@ -830,4 +833,18 @@ public class ClientNamenodeProtocolServe
throw new ServiceException(e);
}
}
+
+ @Override
+ public GetDataEncryptionKeyResponseProto getDataEncryptionKey(
+ RpcController controller, GetDataEncryptionKeyRequestProto request)
+ throws ServiceException {
+ try {
+ DataEncryptionKey encryptionKey = server.getDataEncryptionKey();
+ return GetDataEncryptionKeyResponseProto.newBuilder()
+ .setDataEncryptionKey(PBHelper.convert(encryptionKey))
+ .build();
+ } catch (IOException e) {
+ throw new ServiceException(e);
+ }
+ }
}
Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java?rev=1371518&r1=1371517&r2=1371518&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java Thu Aug 9 22:29:36 2012
@@ -63,6 +63,7 @@ import org.apache.hadoop.hdfs.protocol.p
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.FinalizeUpgradeRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.FsyncRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetAdditionalDatanodeRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetDataEncryptionKeyRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetBlockLocationsRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetBlockLocationsResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetContentSummaryRequestProto;
@@ -99,6 +100,7 @@ import org.apache.hadoop.hdfs.protocol.p
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetTimesRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpdateBlockForPipelineRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpdatePipelineRequestProto;
+import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport;
import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException;
@@ -815,9 +817,22 @@ public class ClientNamenodeProtocolTrans
ClientNamenodeProtocolPB.class, RPC.RpcKind.RPC_PROTOCOL_BUFFER,
RPC.getProtocolVersion(ClientNamenodeProtocolPB.class), methodName);
}
+
+ @Override
+ public DataEncryptionKey getDataEncryptionKey() throws IOException {
+ GetDataEncryptionKeyRequestProto req = GetDataEncryptionKeyRequestProto
+ .newBuilder().build();
+ try {
+ return PBHelper.convert(rpcProxy.getDataEncryptionKey(null, req)
+ .getDataEncryptionKey());
+ } catch (ServiceException e) {
+ throw ProtobufHelper.getRemoteException(e);
+ }
+ }
@Override
public Object getUnderlyingProxyObject() {
return rpcProxy;
}
+
}
Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java?rev=1371518&r1=1371517&r2=1371518&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java Thu Aug 9 22:29:36 2012
@@ -62,6 +62,7 @@ import org.apache.hadoop.hdfs.protocol.p
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.RegisterCommandProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.StorageReportProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.UpgradeCommandProto;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DataEncryptionKeyProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockKeyProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockTokenIdentifierProto;
@@ -96,6 +97,7 @@ import org.apache.hadoop.hdfs.protocol.p
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageInfoProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.UpgradeStatusReportProto;
import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.JournalInfoProto;
+import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
import org.apache.hadoop.hdfs.security.token.block.BlockKey;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
@@ -973,12 +975,37 @@ public class PBHelper {
.setIsLastBlockComplete(lb.isLastBlockComplete()).build();
}
+ // DataEncryptionKey
+ public static DataEncryptionKey convert(DataEncryptionKeyProto bet) {
+ String encryptionAlgorithm = bet.getEncryptionAlgorithm();
+ return new DataEncryptionKey(bet.getKeyId(),
+ bet.getBlockPoolId(),
+ bet.getNonce().toByteArray(),
+ bet.getEncryptionKey().toByteArray(),
+ bet.getExpiryDate(),
+ encryptionAlgorithm.isEmpty() ? null : encryptionAlgorithm);
+ }
+
+ public static DataEncryptionKeyProto convert(DataEncryptionKey bet) {
+ DataEncryptionKeyProto.Builder b = DataEncryptionKeyProto.newBuilder()
+ .setKeyId(bet.keyId)
+ .setBlockPoolId(bet.blockPoolId)
+ .setNonce(ByteString.copyFrom(bet.nonce))
+ .setEncryptionKey(ByteString.copyFrom(bet.encryptionKey))
+ .setExpiryDate(bet.expiryDate);
+ if (bet.encryptionAlgorithm != null) {
+ b.setEncryptionAlgorithm(bet.encryptionAlgorithm);
+ }
+ return b.build();
+ }
+
public static FsServerDefaults convert(FsServerDefaultsProto fs) {
if (fs == null) return null;
return new FsServerDefaults(
fs.getBlockSize(), fs.getBytesPerChecksum(),
fs.getWritePacketSize(), (short) fs.getReplication(),
- fs.getFileBufferSize());
+ fs.getFileBufferSize(),
+ fs.getEncryptDataTransfer());
}
public static FsServerDefaultsProto convert(FsServerDefaults fs) {
@@ -986,7 +1013,10 @@ public class PBHelper {
return FsServerDefaultsProto.newBuilder().
setBlockSize(fs.getBlockSize()).
setBytesPerChecksum(fs.getBytesPerChecksum()).
- setWritePacketSize(fs.getWritePacketSize()).setReplication(fs.getReplication()).setFileBufferSize(fs.getFileBufferSize()).build();
+ setWritePacketSize(fs.getWritePacketSize())
+ .setReplication(fs.getReplication())
+ .setFileBufferSize(fs.getFileBufferSize())
+ .setEncryptDataTransfer(fs.getEncryptDataTransfer()).build();
}
public static FsPermissionProto convert(FsPermission p) {
Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockPoolTokenSecretManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockPoolTokenSecretManager.java?rev=1371518&r1=1371517&r2=1371518&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockPoolTokenSecretManager.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockPoolTokenSecretManager.java Thu Aug 9 22:29:36 2012
@@ -119,4 +119,13 @@ public class BlockPoolTokenSecretManager
btsm.clearAllKeysForTesting();
}
}
+
+ public DataEncryptionKey generateDataEncryptionKey(String blockPoolId) {
+ return get(blockPoolId).generateDataEncryptionKey();
+ }
+
+ public byte[] retrieveDataEncryptionKey(int keyId, String blockPoolId,
+ byte[] nonce) throws IOException {
+ return get(blockPoolId).retrieveDataEncryptionKey(keyId, nonce);
+ }
}
Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java?rev=1371518&r1=1371517&r2=1371518&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java Thu Aug 9 22:29:36 2012
@@ -32,6 +32,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.SecretManager;
@@ -74,6 +75,10 @@ public class BlockTokenSecretManager ext
private BlockKey currentKey;
private BlockKey nextKey;
private Map<Integer, BlockKey> allKeys;
+ private String blockPoolId;
+ private String encryptionAlgorithm;
+
+ private SecureRandom nonceGenerator = new SecureRandom();
public static enum AccessMode {
READ, WRITE, COPY, REPLACE
@@ -86,8 +91,9 @@ public class BlockTokenSecretManager ext
* @param tokenLifetime how long an individual token is valid
*/
public BlockTokenSecretManager(long keyUpdateInterval,
- long tokenLifetime) {
- this(false, keyUpdateInterval, tokenLifetime);
+ long tokenLifetime, String blockPoolId, String encryptionAlgorithm) {
+ this(false, keyUpdateInterval, tokenLifetime, blockPoolId,
+ encryptionAlgorithm);
}
/**
@@ -100,8 +106,10 @@ public class BlockTokenSecretManager ext
* @param otherNnId the NN ID of the other NN in an HA setup
*/
public BlockTokenSecretManager(long keyUpdateInterval,
- long tokenLifetime, int nnIndex) {
- this(true, keyUpdateInterval, tokenLifetime);
+ long tokenLifetime, int nnIndex, String blockPoolId,
+ String encryptionAlgorithm) {
+ this(true, keyUpdateInterval, tokenLifetime, blockPoolId,
+ encryptionAlgorithm);
Preconditions.checkArgument(nnIndex == 0 || nnIndex == 1);
this.nnIndex = nnIndex;
setSerialNo(new SecureRandom().nextInt());
@@ -109,17 +117,24 @@ public class BlockTokenSecretManager ext
}
private BlockTokenSecretManager(boolean isMaster, long keyUpdateInterval,
- long tokenLifetime) {
+ long tokenLifetime, String blockPoolId, String encryptionAlgorithm) {
this.isMaster = isMaster;
this.keyUpdateInterval = keyUpdateInterval;
this.tokenLifetime = tokenLifetime;
this.allKeys = new HashMap<Integer, BlockKey>();
+ this.blockPoolId = blockPoolId;
+ this.encryptionAlgorithm = encryptionAlgorithm;
+ generateKeys();
}
@VisibleForTesting
public synchronized void setSerialNo(int serialNo) {
this.serialNo = (serialNo & LOW_MASK) | (nnIndex << 31);
}
+
+ public void setBlockPoolId(String blockPoolId) {
+ this.blockPoolId = blockPoolId;
+ }
/** Initialize block keys */
private synchronized void generateKeys() {
@@ -371,6 +386,49 @@ public class BlockTokenSecretManager ext
return createPassword(identifier.getBytes(), key.getKey());
}
+ /**
+ * Generate a data encryption key for this block pool, using the current
+ * BlockKey.
+ *
+ * @return a data encryption key which may be used to encrypt traffic
+ * over the DataTransferProtocol
+ */
+ public DataEncryptionKey generateDataEncryptionKey() {
+ byte[] nonce = new byte[8];
+ nonceGenerator.nextBytes(nonce);
+ BlockKey key = null;
+ synchronized (this) {
+ key = currentKey;
+ }
+ byte[] encryptionKey = createPassword(nonce, key.getKey());
+ return new DataEncryptionKey(key.getKeyId(), blockPoolId, nonce,
+ encryptionKey, Time.now() + tokenLifetime,
+ encryptionAlgorithm);
+ }
+
+ /**
+ * Recreate an encryption key based on the given key id and nonce.
+ *
+ * @param keyId identifier of the secret key used to generate the encryption key.
+ * @param nonce random value used to create the encryption key
+ * @return the encryption key which corresponds to this (keyId, blockPoolId, nonce)
+ * @throws InvalidToken
+ * @throws InvalidEncryptionKeyException
+ */
+ public byte[] retrieveDataEncryptionKey(int keyId, byte[] nonce)
+ throws InvalidEncryptionKeyException {
+ BlockKey key = null;
+ synchronized (this) {
+ key = allKeys.get(keyId);
+ if (key == null) {
+ throw new InvalidEncryptionKeyException("Can't re-compute encryption key"
+ + " for nonce, since the required block key (keyID=" + keyId
+ + ") doesn't exist. Current key: " + currentKey.getKeyId());
+ }
+ }
+ return createPassword(nonce, key.getKey());
+ }
+
@VisibleForTesting
public synchronized void setKeyUpdateIntervalForTesting(long millis) {
this.keyUpdateInterval = millis;
Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java?rev=1371518&r1=1371517&r2=1371518&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java Thu Aug 9 22:29:36 2012
@@ -24,6 +24,8 @@ import java.io.BufferedOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
import java.net.Socket;
import java.net.URI;
import java.text.DateFormat;
@@ -57,6 +59,8 @@ import org.apache.hadoop.hdfs.protocol.D
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
+import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferEncryptor;
+import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
@@ -312,11 +316,22 @@ public class Balancer {
NetUtils.createSocketAddr(target.datanode.getXferAddr()),
HdfsServerConstants.READ_TIMEOUT);
sock.setKeepAlive(true);
- out = new DataOutputStream( new BufferedOutputStream(
- sock.getOutputStream(), HdfsConstants.IO_FILE_BUFFER_SIZE));
+
+ OutputStream unbufOut = sock.getOutputStream();
+ InputStream unbufIn = sock.getInputStream();
+ if (nnc.getDataEncryptionKey() != null) {
+ IOStreamPair encryptedStreams =
+ DataTransferEncryptor.getEncryptedStreams(
+ unbufOut, unbufIn, nnc.getDataEncryptionKey());
+ unbufOut = encryptedStreams.out;
+ unbufIn = encryptedStreams.in;
+ }
+ out = new DataOutputStream(new BufferedOutputStream(unbufOut,
+ HdfsConstants.IO_FILE_BUFFER_SIZE));
+ in = new DataInputStream(new BufferedInputStream(unbufIn,
+ HdfsConstants.IO_FILE_BUFFER_SIZE));
+
sendRequest(out);
- in = new DataInputStream( new BufferedInputStream(
- sock.getInputStream(), HdfsConstants.IO_FILE_BUFFER_SIZE));
receiveResponse(in);
bytesMoved.inc(block.getNumBytes());
LOG.info( "Moving block " + block.getBlock().getBlockId() +
Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java?rev=1371518&r1=1371517&r2=1371518&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java Thu Aug 9 22:29:36 2012
@@ -29,10 +29,12 @@ import org.apache.hadoop.classification.
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.NameNodeProxies;
import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
@@ -60,10 +62,12 @@ class NameNodeConnector {
final OutputStream out;
private final boolean isBlockTokenEnabled;
+ private final boolean encryptDataTransfer;
private boolean shouldRun;
private long keyUpdaterInterval;
private BlockTokenSecretManager blockTokenSecretManager;
private Daemon keyupdaterthread; // AccessKeyUpdater thread
+ private DataEncryptionKey encryptionKey;
NameNodeConnector(URI nameNodeUri,
Configuration conf) throws IOException {
@@ -88,8 +92,11 @@ class NameNodeConnector {
LOG.info("Block token params received from NN: keyUpdateInterval="
+ blockKeyUpdateInterval / (60 * 1000) + " min(s), tokenLifetime="
+ blockTokenLifetime / (60 * 1000) + " min(s)");
+ String encryptionAlgorithm = conf.get(
+ DFSConfigKeys.DFS_DATA_ENCRYPTION_ALGORITHM_KEY);
this.blockTokenSecretManager = new BlockTokenSecretManager(
- blockKeyUpdateInterval, blockTokenLifetime);
+ blockKeyUpdateInterval, blockTokenLifetime, blockpoolID,
+ encryptionAlgorithm);
this.blockTokenSecretManager.addKeys(keys);
/*
* Balancer should sync its block keys with NN more frequently than NN
@@ -102,7 +109,8 @@ class NameNodeConnector {
this.shouldRun = true;
this.keyupdaterthread.start();
}
-
+ this.encryptDataTransfer = fs.getServerDefaults(new Path("/"))
+ .getEncryptDataTransfer();
// Check if there is another balancer running.
// Exit if there is another one running.
out = checkAndMarkRunningBalancer();
@@ -126,6 +134,20 @@ class NameNodeConnector {
BlockTokenSecretManager.AccessMode.COPY));
}
}
+
+ DataEncryptionKey getDataEncryptionKey()
+ throws IOException {
+ if (encryptDataTransfer) {
+ synchronized (this) {
+ if (encryptionKey == null) {
+ encryptionKey = blockTokenSecretManager.generateDataEncryptionKey();
+ }
+ return encryptionKey;
+ }
+ } else {
+ return null;
+ }
+ }
/* The idea for making sure that there is no more than one balancer
* running in an HDFS is to create a file in the HDFS, writes the IP address
@@ -208,4 +230,4 @@ class NameNodeConnector {
}
}
}
-}
\ No newline at end of file
+}
Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java?rev=1371518&r1=1371517&r2=1371518&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java Thu Aug 9 22:29:36 2012
@@ -49,6 +49,7 @@ import org.apache.hadoop.hdfs.protocol.E
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
+import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager.AccessMode;
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
@@ -205,6 +206,9 @@ public class BlockManager {
/** variable to enable check for enough racks */
final boolean shouldCheckForEnoughRacks;
+
+ // whether or not to issue block encryption keys.
+ final boolean encryptDataTransfer;
/**
* When running inside a Standby node, the node may receive block reports
@@ -285,12 +289,18 @@ public class BlockManager {
this.replicationRecheckInterval =
conf.getInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY,
DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_DEFAULT) * 1000L;
+
+ this.encryptDataTransfer =
+ conf.getBoolean(DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_KEY,
+ DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_DEFAULT);
+
LOG.info("defaultReplication = " + defaultReplication);
LOG.info("maxReplication = " + maxReplication);
LOG.info("minReplication = " + minReplication);
LOG.info("maxReplicationStreams = " + maxReplicationStreams);
LOG.info("shouldCheckForEnoughRacks = " + shouldCheckForEnoughRacks);
LOG.info("replicationRecheckInterval = " + replicationRecheckInterval);
+ LOG.info("encryptDataTransfer = " + encryptDataTransfer);
}
private static BlockTokenSecretManager createBlockTokenSecretManager(
@@ -310,10 +320,14 @@ public class BlockManager {
final long lifetimeMin = conf.getLong(
DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_LIFETIME_KEY,
DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_LIFETIME_DEFAULT);
+ final String encryptionAlgorithm = conf.get(
+ DFSConfigKeys.DFS_DATA_ENCRYPTION_ALGORITHM_KEY);
LOG.info(DFSConfigKeys.DFS_BLOCK_ACCESS_KEY_UPDATE_INTERVAL_KEY
+ "=" + updateMin + " min(s), "
+ DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_LIFETIME_KEY
- + "=" + lifetimeMin + " min(s)");
+ + "=" + lifetimeMin + " min(s), "
+ + DFSConfigKeys.DFS_DATA_ENCRYPTION_ALGORITHM_KEY
+ + "=" + encryptionAlgorithm);
String nsId = DFSUtil.getNamenodeNameServiceId(conf);
boolean isHaEnabled = HAUtil.isHAEnabled(conf, nsId);
@@ -322,10 +336,17 @@ public class BlockManager {
String thisNnId = HAUtil.getNameNodeId(conf, nsId);
String otherNnId = HAUtil.getNameNodeIdOfOtherNode(conf, nsId);
return new BlockTokenSecretManager(updateMin*60*1000L,
- lifetimeMin*60*1000L, thisNnId.compareTo(otherNnId) < 0 ? 0 : 1);
+ lifetimeMin*60*1000L, thisNnId.compareTo(otherNnId) < 0 ? 0 : 1, null,
+ encryptionAlgorithm);
} else {
return new BlockTokenSecretManager(updateMin*60*1000L,
- lifetimeMin*60*1000L, 0);
+ lifetimeMin*60*1000L, 0, null, encryptionAlgorithm);
+ }
+ }
+
+ public void setBlockPoolId(String blockPoolId) {
+ if (isBlockTokenEnabled()) {
+ blockTokenSecretManager.setBlockPoolId(blockPoolId);
}
}
@@ -792,6 +813,14 @@ public class BlockManager {
nodeinfo.needKeyUpdate = false;
}
}
+
+ public DataEncryptionKey generateDataEncryptionKey() {
+ if (isBlockTokenEnabled() && encryptDataTransfer) {
+ return blockTokenSecretManager.generateDataEncryptionKey();
+ } else {
+ return null;
+ }
+ }
/**
* Clamp the specified replication between the minimum and the maximum
Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java?rev=1371518&r1=1371517&r2=1371518&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java Thu Aug 9 22:29:36 2012
@@ -50,6 +50,7 @@ import org.apache.hadoop.hdfs.protocol.D
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
@@ -195,7 +196,8 @@ public class JspHelper {
public static void streamBlockInAscii(InetSocketAddress addr, String poolId,
long blockId, Token<BlockTokenIdentifier> blockToken, long genStamp,
long blockSize, long offsetIntoBlock, long chunkSizeToView,
- JspWriter out, Configuration conf) throws IOException {
+ JspWriter out, Configuration conf, DataEncryptionKey encryptionKey)
+ throws IOException {
if (chunkSizeToView == 0) return;
Socket s = NetUtils.getDefaultSocketFactory(conf).createSocket();
s.connect(addr, HdfsServerConstants.READ_TIMEOUT);
@@ -208,7 +210,7 @@ public class JspHelper {
BlockReader blockReader = BlockReaderFactory.newBlockReader(
conf, s, file,
new ExtendedBlock(poolId, blockId, 0, genStamp), blockToken,
- offsetIntoBlock, amtToRead);
+ offsetIntoBlock, amtToRead, encryptionKey);
byte[] buf = new byte[(int)amtToRead];
int readOffset = 0;
Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java?rev=1371518&r1=1371517&r2=1371518&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java Thu Aug 9 22:29:36 2012
@@ -39,6 +39,7 @@ import org.apache.hadoop.hdfs.protocol.L
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NodeType;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.util.VersionInfo;
import com.google.common.base.Preconditions;
@@ -221,7 +222,7 @@ public abstract class Storage extends St
* One of the storage directories.
*/
@InterfaceAudience.Private
- public static class StorageDirectory {
+ public static class StorageDirectory implements FormatConfirmable {
final File root; // root directory
final boolean useLock; // flag to enable storage lock
final StorageDirType dirType; // storage dir type
@@ -576,6 +577,32 @@ public abstract class Storage extends St
throw new IOException("Unexpected FS state: " + curState);
}
}
+
+ /**
+ * @return true if the storage directory should prompt the user prior
+ * to formatting (i.e if the directory appears to contain some data)
+ * @throws IOException if the SD cannot be accessed due to an IO error
+ */
+ @Override
+ public boolean hasSomeData() throws IOException {
+ // Its alright for a dir not to exist, or to exist (properly accessible)
+ // and be completely empty.
+ if (!root.exists()) return false;
+
+ if (!root.isDirectory()) {
+ // a file where you expect a directory should not cause silent
+ // formatting
+ return true;
+ }
+
+ if (FileUtil.listFiles(root).length == 0) {
+ // Empty dir can format without prompt.
+ return false;
+ }
+
+ return true;
+ }
+
/**
* Lock storage to provide exclusive access.
@@ -779,6 +806,68 @@ public abstract class Storage extends St
}
/**
+ * Iterate over each of the {@link FormatConfirmable} objects,
+ * potentially checking with the user whether it should be formatted.
+ *
+ * If running in interactive mode, will prompt the user for each
+ * directory to allow them to format anyway. Otherwise, returns
+ * false, unless 'force' is specified.
+ *
+ * @param force format regardless of whether dirs exist
+ * @param interactive prompt the user when a dir exists
+ * @return true if formatting should proceed
+ * @throws IOException if some storage cannot be accessed
+ */
+ public static boolean confirmFormat(
+ Iterable<? extends FormatConfirmable> items,
+ boolean force, boolean interactive) throws IOException {
+ for (FormatConfirmable item : items) {
+ if (!item.hasSomeData())
+ continue;
+ if (force) { // Don't confirm, always format.
+ System.err.println(
+ "Data exists in " + item + ". Formatting anyway.");
+ continue;
+ }
+ if (!interactive) { // Don't ask - always don't format
+ System.err.println(
+ "Running in non-interactive mode, and data appears to exist in " +
+ item + ". Not formatting.");
+ return false;
+ }
+ if (!ToolRunner.confirmPrompt("Re-format filesystem in " + item + " ?")) {
+ System.err.println("Format aborted in " + item);
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+ /**
+ * Interface for classes which need to have the user confirm their
+ * formatting during NameNode -format and other similar operations.
+ *
+ * This is currently a storage directory or journal manager.
+ */
+ @InterfaceAudience.Private
+ public interface FormatConfirmable {
+ /**
+ * @return true if the storage seems to have some valid data in it,
+ * and the user should be required to confirm the format. Otherwise,
+ * false.
+ * @throws IOException if the storage cannot be accessed at all.
+ */
+ public boolean hasSomeData() throws IOException;
+
+ /**
+ * @return a string representation of the formattable item, suitable
+ * for display to the user inside a prompt
+ */
+ public String toString();
+ }
+
+ /**
* Get common storage fields.
* Should be overloaded if additional fields need to be get.
*