You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by wa...@apache.org on 2015/09/30 17:42:15 UTC
[15/50] [abbrv] hadoop git commit: Merge remote-tracking branch
'apache/trunk' into HDFS-7285
http://git-wip-us.apache.org/repos/asf/hadoop/blob/53358fe6/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java
index 0000000,7509da5..6be94f3
mode 000000,100644..100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java
@@@ -1,0 -1,512 +1,517 @@@
+ /**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ package org.apache.hadoop.hdfs;
+
+ import java.io.BufferedInputStream;
+ import java.io.BufferedOutputStream;
+ import java.io.DataInputStream;
+ import java.io.DataOutputStream;
+ import java.io.IOException;
+ import java.nio.ByteBuffer;
+ import java.util.EnumSet;
+
+ import org.apache.hadoop.classification.InterfaceAudience;
+ import org.apache.hadoop.fs.FSInputChecker;
+ import org.apache.hadoop.fs.Path;
+ import org.apache.hadoop.fs.ReadOption;
+ import org.apache.hadoop.hdfs.net.Peer;
+ import org.apache.hadoop.hdfs.protocol.Block;
+ import org.apache.hadoop.hdfs.protocol.DatanodeID;
+ import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+ import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
+ import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
+ import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
+ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
+ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReadOpChecksumInfoProto;
+ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
+ import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
+ import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+ import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
+ import org.apache.hadoop.hdfs.shortcircuit.ClientMmap;
+ import org.apache.hadoop.io.IOUtils;
+ import org.apache.hadoop.net.NetUtils;
+ import org.apache.hadoop.security.token.Token;
+ import org.apache.hadoop.util.DataChecksum;
+ import org.apache.htrace.Sampler;
+ import org.apache.htrace.Trace;
+ import org.apache.htrace.TraceScope;
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+
+
+ /**
+ * @deprecated this is an old implementation that is being left around
+ * in case any issues spring up with the new {@link RemoteBlockReader2} implementation.
+ * It will be removed in the next release.
+ */
+ @InterfaceAudience.Private
+ @Deprecated
+ public class RemoteBlockReader extends FSInputChecker implements BlockReader {
+ static final Logger LOG = LoggerFactory.getLogger(FSInputChecker.class);
+
+ private final Peer peer;
+ private final DatanodeID datanodeID;
+ private final DataInputStream in;
+ private DataChecksum checksum;
+
+ /** offset in block of the last chunk received */
+ private long lastChunkOffset = -1;
+ private long lastChunkLen = -1;
+ private long lastSeqNo = -1;
+
+ /** offset in block where reader wants to actually read */
+ private long startOffset;
+
+ private final long blockId;
+
+ /** offset in block of of first chunk - may be less than startOffset
+ if startOffset is not chunk-aligned */
+ private final long firstChunkOffset;
+
+ private final int bytesPerChecksum;
+ private final int checksumSize;
+
+ /**
+ * The total number of bytes we need to transfer from the DN.
+ * This is the amount that the user has requested plus some padding
+ * at the beginning so that the read can begin on a chunk boundary.
+ */
+ private final long bytesNeededToFinish;
+
+ /**
+ * True if we are reading from a local DataNode.
+ */
+ private final boolean isLocal;
+
+ private boolean eos = false;
+ private boolean sentStatusCode = false;
+
+ ByteBuffer checksumBytes = null;
+ /** Amount of unread data in the current received packet */
+ int dataLeft = 0;
+
+ private final PeerCache peerCache;
+
+ /* FSInputChecker interface */
+
+ /* same interface as inputStream java.io.InputStream#read()
+ * used by DFSInputStream#read()
+ * This violates one rule when there is a checksum error:
+ * "Read should not modify user buffer before successful read"
+ * because it first reads the data to user buffer and then checks
+ * the checksum.
+ */
+ @Override
+ public synchronized int read(byte[] buf, int off, int len)
+ throws IOException {
+
+ // This has to be set here, *before* the skip, since we can
+ // hit EOS during the skip, in the case that our entire read
+ // is smaller than the checksum chunk.
+ boolean eosBefore = eos;
+
+ //for the first read, skip the extra bytes at the front.
+ if (lastChunkLen < 0 && startOffset > firstChunkOffset && len > 0) {
+ // Skip these bytes. But don't call this.skip()!
+ int toSkip = (int)(startOffset - firstChunkOffset);
+ if ( super.readAndDiscard(toSkip) != toSkip ) {
+ // should never happen
+ throw new IOException("Could not skip required number of bytes");
+ }
+ }
+
+ int nRead = super.read(buf, off, len);
+
+ // if eos was set in the previous read, send a status code to the DN
+ if (eos && !eosBefore && nRead >= 0) {
+ if (needChecksum()) {
+ sendReadResult(peer, Status.CHECKSUM_OK);
+ } else {
+ sendReadResult(peer, Status.SUCCESS);
+ }
+ }
+ return nRead;
+ }
+
+ @Override
+ public synchronized long skip(long n) throws IOException {
+ /* How can we make sure we don't throw a ChecksumException, at least
+ * in majority of the cases?. This one throws. */
+ long nSkipped = 0;
+ while (nSkipped < n) {
+ int toSkip = (int)Math.min(n-nSkipped, Integer.MAX_VALUE);
+ int ret = readAndDiscard(toSkip);
+ if (ret <= 0) {
+ return nSkipped;
+ }
+ nSkipped += ret;
+ }
+ return nSkipped;
+ }
+
+ @Override
+ public int read() throws IOException {
+ throw new IOException("read() is not expected to be invoked. " +
+ "Use read(buf, off, len) instead.");
+ }
+
+ @Override
+ public boolean seekToNewSource(long targetPos) throws IOException {
+ /* Checksum errors are handled outside the BlockReader.
+ * DFSInputStream does not always call 'seekToNewSource'. In the
+ * case of pread(), it just tries a different replica without seeking.
+ */
+ return false;
+ }
+
+ @Override
+ public void seek(long pos) throws IOException {
+ throw new IOException("Seek() is not supported in BlockInputChecker");
+ }
+
+ @Override
+ protected long getChunkPosition(long pos) {
+ throw new RuntimeException("getChunkPosition() is not supported, " +
+ "since seek is not required");
+ }
+
+ /**
+ * Makes sure that checksumBytes has enough capacity
+ * and limit is set to the number of checksum bytes needed
+ * to be read.
+ */
+ private void adjustChecksumBytes(int dataLen) {
+ int requiredSize =
+ ((dataLen + bytesPerChecksum - 1)/bytesPerChecksum)*checksumSize;
+ if (checksumBytes == null || requiredSize > checksumBytes.capacity()) {
+ checksumBytes = ByteBuffer.wrap(new byte[requiredSize]);
+ } else {
+ checksumBytes.clear();
+ }
+ checksumBytes.limit(requiredSize);
+ }
+
+ @Override
+ protected synchronized int readChunk(long pos, byte[] buf, int offset,
+ int len, byte[] checksumBuf)
+ throws IOException {
+ TraceScope scope =
+ Trace.startSpan("RemoteBlockReader#readChunk(" + blockId + ")",
+ Sampler.NEVER);
+ try {
+ return readChunkImpl(pos, buf, offset, len, checksumBuf);
+ } finally {
+ scope.close();
+ }
+ }
+
+ private synchronized int readChunkImpl(long pos, byte[] buf, int offset,
+ int len, byte[] checksumBuf)
+ throws IOException {
+ // Read one chunk.
+ if (eos) {
+ // Already hit EOF
+ return -1;
+ }
+
+ // Read one DATA_CHUNK.
+ long chunkOffset = lastChunkOffset;
+ if ( lastChunkLen > 0 ) {
+ chunkOffset += lastChunkLen;
+ }
+
+ // pos is relative to the start of the first chunk of the read.
+ // chunkOffset is relative to the start of the block.
+ // This makes sure that the read passed from FSInputChecker is the
+ // for the same chunk we expect to be reading from the DN.
+ if ( (pos + firstChunkOffset) != chunkOffset ) {
+ throw new IOException("Mismatch in pos : " + pos + " + " +
+ firstChunkOffset + " != " + chunkOffset);
+ }
+
+ // Read next packet if the previous packet has been read completely.
+ if (dataLeft <= 0) {
+ //Read packet headers.
+ PacketHeader header = new PacketHeader();
+ header.readFields(in);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("DFSClient readChunk got header " + header);
+ }
+
+ // Sanity check the lengths
+ if (!header.sanityCheck(lastSeqNo)) {
+ throw new IOException("BlockReader: error in packet header " +
+ header);
+ }
+
+ lastSeqNo = header.getSeqno();
+ dataLeft = header.getDataLen();
+ adjustChecksumBytes(header.getDataLen());
+ if (header.getDataLen() > 0) {
+ IOUtils.readFully(in, checksumBytes.array(), 0,
+ checksumBytes.limit());
+ }
+ }
+
+ // Sanity checks
+ assert len >= bytesPerChecksum;
+ assert checksum != null;
+ assert checksumSize == 0 || (checksumBuf.length % checksumSize == 0);
+
+
+ int checksumsToRead, bytesToRead;
+
+ if (checksumSize > 0) {
+
+ // How many chunks left in our packet - this is a ceiling
+ // since we may have a partial chunk at the end of the file
+ int chunksLeft = (dataLeft - 1) / bytesPerChecksum + 1;
+
+ // How many chunks we can fit in databuffer
+ // - note this is a floor since we always read full chunks
+ int chunksCanFit = Math.min(len / bytesPerChecksum,
+ checksumBuf.length / checksumSize);
+
+ // How many chunks should we read
+ checksumsToRead = Math.min(chunksLeft, chunksCanFit);
+ // How many bytes should we actually read
+ bytesToRead = Math.min(
+ checksumsToRead * bytesPerChecksum, // full chunks
+ dataLeft); // in case we have a partial
+ } else {
+ // no checksum
+ bytesToRead = Math.min(dataLeft, len);
+ checksumsToRead = 0;
+ }
+
+ if ( bytesToRead > 0 ) {
+ // Assert we have enough space
+ assert bytesToRead <= len;
+ assert checksumBytes.remaining() >= checksumSize * checksumsToRead;
+ assert checksumBuf.length >= checksumSize * checksumsToRead;
+ IOUtils.readFully(in, buf, offset, bytesToRead);
+ checksumBytes.get(checksumBuf, 0, checksumSize * checksumsToRead);
+ }
+
+ dataLeft -= bytesToRead;
+ assert dataLeft >= 0;
+
+ lastChunkOffset = chunkOffset;
+ lastChunkLen = bytesToRead;
+
+ // If there's no data left in the current packet after satisfying
+ // this read, and we have satisfied the client read, we expect
+ // an empty packet header from the DN to signify this.
+ // Note that pos + bytesToRead may in fact be greater since the
+ // DN finishes off the entire last chunk.
+ if (dataLeft == 0 &&
+ pos + bytesToRead >= bytesNeededToFinish) {
+
+ // Read header
+ PacketHeader hdr = new PacketHeader();
+ hdr.readFields(in);
+
+ if (!hdr.isLastPacketInBlock() ||
+ hdr.getDataLen() != 0) {
+ throw new IOException("Expected empty end-of-read packet! Header: " +
+ hdr);
+ }
+
+ eos = true;
+ }
+
+ if ( bytesToRead == 0 ) {
+ return -1;
+ }
+
+ return bytesToRead;
+ }
+
+ private RemoteBlockReader(String file, String bpid, long blockId,
+ DataInputStream in, DataChecksum checksum, boolean verifyChecksum,
+ long startOffset, long firstChunkOffset, long bytesToRead, Peer peer,
+ DatanodeID datanodeID, PeerCache peerCache) {
+ // Path is used only for printing block and file information in debug
+ super(new Path("/" + Block.BLOCK_FILE_PREFIX + blockId +
+ ":" + bpid + ":of:"+ file)/*too non path-like?*/,
+ 1, verifyChecksum,
+ checksum.getChecksumSize() > 0? checksum : null,
+ checksum.getBytesPerChecksum(),
+ checksum.getChecksumSize());
+
+ this.isLocal = DFSUtilClient.isLocalAddress(NetUtils.
+ createSocketAddr(datanodeID.getXferAddr()));
+
+ this.peer = peer;
+ this.datanodeID = datanodeID;
+ this.in = in;
+ this.checksum = checksum;
+ this.startOffset = Math.max( startOffset, 0 );
+ this.blockId = blockId;
+
+ // The total number of bytes that we need to transfer from the DN is
+ // the amount that the user wants (bytesToRead), plus the padding at
+ // the beginning in order to chunk-align. Note that the DN may elect
+ // to send more than this amount if the read starts/ends mid-chunk.
+ this.bytesNeededToFinish = bytesToRead + (startOffset - firstChunkOffset);
+
+ this.firstChunkOffset = firstChunkOffset;
+ lastChunkOffset = firstChunkOffset;
+ lastChunkLen = -1;
+
+ bytesPerChecksum = this.checksum.getBytesPerChecksum();
+ checksumSize = this.checksum.getChecksumSize();
+ this.peerCache = peerCache;
+ }
+
+ /**
+ * Create a new BlockReader specifically to satisfy a read.
+ * This method also sends the OP_READ_BLOCK request.
+ *
+ * @param file File location
+ * @param block The block object
+ * @param blockToken The block token for security
+ * @param startOffset The read offset, relative to block head
+ * @param len The number of bytes to read
+ * @param bufferSize The IO buffer size (not the client buffer size)
+ * @param verifyChecksum Whether to verify checksum
+ * @param clientName Client name
+ * @return New BlockReader instance, or null on error.
+ */
+ public static RemoteBlockReader newBlockReader(String file,
+ ExtendedBlock block,
+ Token<BlockTokenIdentifier> blockToken,
+ long startOffset, long len,
+ int bufferSize, boolean verifyChecksum,
+ String clientName, Peer peer,
+ DatanodeID datanodeID,
+ PeerCache peerCache,
+ CachingStrategy cachingStrategy)
+ throws IOException {
+ // in and out will be closed when sock is closed (by the caller)
+ final DataOutputStream out =
+ new DataOutputStream(new BufferedOutputStream(peer.getOutputStream()));
+ new Sender(out).readBlock(block, blockToken, clientName, startOffset, len,
+ verifyChecksum, cachingStrategy);
+
+ //
+ // Get bytes in block, set streams
+ //
+
+ DataInputStream in = new DataInputStream(
+ new BufferedInputStream(peer.getInputStream(), bufferSize));
+
+ BlockOpResponseProto status = BlockOpResponseProto.parseFrom(
+ PBHelperClient.vintPrefixed(in));
+ RemoteBlockReader2.checkSuccess(status, peer, block, file);
+ ReadOpChecksumInfoProto checksumInfo =
+ status.getReadOpChecksumInfo();
+ DataChecksum checksum = DataTransferProtoUtil.fromProto(
+ checksumInfo.getChecksum());
+ //Warning when we get CHECKSUM_NULL?
+
+ // Read the first chunk offset.
+ long firstChunkOffset = checksumInfo.getChunkOffset();
+
+ if ( firstChunkOffset < 0 || firstChunkOffset > startOffset ||
+ firstChunkOffset <= (startOffset - checksum.getBytesPerChecksum())) {
+ throw new IOException("BlockReader: error in first chunk offset (" +
+ firstChunkOffset + ") startOffset is " +
+ startOffset + " for file " + file);
+ }
+
+ return new RemoteBlockReader(file, block.getBlockPoolId(), block.getBlockId(),
+ in, checksum, verifyChecksum, startOffset, firstChunkOffset, len,
+ peer, datanodeID, peerCache);
+ }
+
+ @Override
+ public synchronized void close() throws IOException {
+ startOffset = -1;
+ checksum = null;
+ if (peerCache != null & sentStatusCode) {
+ peerCache.put(datanodeID, peer);
+ } else {
+ peer.close();
+ }
+
+ // in will be closed when its Socket is closed.
+ }
+
+ @Override
+ public void readFully(byte[] buf, int readOffset, int amtToRead)
+ throws IOException {
+ IOUtils.readFully(this, buf, readOffset, amtToRead);
+ }
+
+ @Override
+ public int readAll(byte[] buf, int offset, int len) throws IOException {
+ return readFully(this, buf, offset, len);
+ }
+
+ /**
+ * When the reader reaches end of the read, it sends a status response
+ * (e.g. CHECKSUM_OK) to the DN. Failure to do so could lead to the DN
+ * closing our connection (which we will re-open), but won't affect
+ * data correctness.
+ */
+ void sendReadResult(Peer peer, Status statusCode) {
+ assert !sentStatusCode : "already sent status code to " + peer;
+ try {
+ RemoteBlockReader2.writeReadResult(peer.getOutputStream(), statusCode);
+ sentStatusCode = true;
+ } catch (IOException e) {
+ // It's ok not to be able to send this. But something is probably wrong.
+ LOG.info("Could not send read status (" + statusCode + ") to datanode " +
+ peer.getRemoteAddressString() + ": " + e.getMessage());
+ }
+ }
+
+ @Override
+ public int read(ByteBuffer buf) throws IOException {
+ throw new UnsupportedOperationException("readDirect unsupported in RemoteBlockReader");
+ }
+
+ @Override
+ public int available() throws IOException {
+ // An optimistic estimate of how much data is available
+ // to us without doing network I/O.
+ return RemoteBlockReader2.TCP_WINDOW_SIZE;
+ }
+
+ @Override
+ public boolean isLocal() {
+ return isLocal;
+ }
+
+ @Override
+ public boolean isShortCircuit() {
+ return false;
+ }
+
+ @Override
+ public ClientMmap getClientMmap(EnumSet<ReadOption> opts) {
+ return null;
+ }
++
++ @Override
++ public DataChecksum getDataChecksum() {
++ return checksum;
++ }
+ }
http://git-wip-us.apache.org/repos/asf/hadoop/blob/53358fe6/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java
index 0000000,5541e6d..9699442
mode 000000,100644..100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java
@@@ -1,0 -1,480 +1,485 @@@
+ /**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ package org.apache.hadoop.hdfs;
+
+ import java.io.BufferedOutputStream;
+ import java.io.DataInputStream;
+ import java.io.DataOutputStream;
+ import java.io.IOException;
+ import java.io.OutputStream;
+ import java.net.InetSocketAddress;
+ import java.nio.ByteBuffer;
+ import java.nio.channels.ReadableByteChannel;
+ import java.util.EnumSet;
+ import java.util.UUID;
+
+ import org.apache.hadoop.classification.InterfaceAudience;
+ import org.apache.hadoop.fs.ReadOption;
+ import org.apache.hadoop.hdfs.net.Peer;
+ import org.apache.hadoop.hdfs.protocol.DatanodeID;
+ import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+ import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
+ import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
+ import org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver;
+ import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
+ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
+ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientReadStatusProto;
+ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReadOpChecksumInfoProto;
+ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
+ import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
+ import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+ import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
+ import org.apache.hadoop.hdfs.shortcircuit.ClientMmap;
+ import org.apache.hadoop.net.NetUtils;
+ import org.apache.hadoop.security.token.Token;
+ import org.apache.hadoop.util.DataChecksum;
+ import org.apache.htrace.Sampler;
+ import org.apache.htrace.Trace;
+ import org.apache.htrace.TraceScope;
+
+ import com.google.common.annotations.VisibleForTesting;
+
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+
+ /**
+ * This is a wrapper around connection to datanode
+ * and understands checksum, offset etc.
+ *
+ * Terminology:
+ * <dl>
+ * <dt>block</dt>
+ * <dd>The hdfs block, typically large (~64MB).
+ * </dd>
+ * <dt>chunk</dt>
+ * <dd>A block is divided into chunks, each comes with a checksum.
+ * We want transfers to be chunk-aligned, to be able to
+ * verify checksums.
+ * </dd>
+ * <dt>packet</dt>
+ * <dd>A grouping of chunks used for transport. It contains a
+ * header, followed by checksum data, followed by real data.
+ * </dd>
+ * </dl>
+ * Please see DataNode for the RPC specification.
+ *
+ * This is a new implementation introduced in Hadoop 0.23 which
+ * is more efficient and simpler than the older BlockReader
+ * implementation. It should be renamed to RemoteBlockReader
+ * once we are confident in it.
+ */
+ @InterfaceAudience.Private
+ public class RemoteBlockReader2 implements BlockReader {
+
+ static final Logger LOG = LoggerFactory.getLogger(RemoteBlockReader2.class);
+ static final int TCP_WINDOW_SIZE = 128 * 1024; // 128 KB;
+
+ final private Peer peer;
+ final private DatanodeID datanodeID;
+ final private PeerCache peerCache;
+ final private long blockId;
+ private final ReadableByteChannel in;
+
+ private DataChecksum checksum;
+ private final PacketReceiver packetReceiver = new PacketReceiver(true);
+
+ private ByteBuffer curDataSlice = null;
+
+ /** offset in block of the last chunk received */
+ private long lastSeqNo = -1;
+
+ /** offset in block where reader wants to actually read */
+ private long startOffset;
+ private final String filename;
+
+ private final int bytesPerChecksum;
+ private final int checksumSize;
+
+ /**
+ * The total number of bytes we need to transfer from the DN.
+ * This is the amount that the user has requested plus some padding
+ * at the beginning so that the read can begin on a chunk boundary.
+ */
+ private long bytesNeededToFinish;
+
+ /**
+ * True if we are reading from a local DataNode.
+ */
+ private final boolean isLocal;
+
+ private final boolean verifyChecksum;
+
+ private boolean sentStatusCode = false;
+
+ @VisibleForTesting
+ public Peer getPeer() {
+ return peer;
+ }
+
+ @Override
+ public synchronized int read(byte[] buf, int off, int len)
+ throws IOException {
+
+ UUID randomId = null;
+ if (LOG.isTraceEnabled()) {
+ randomId = UUID.randomUUID();
+ LOG.trace(String.format("Starting read #%s file %s from datanode %s",
+ randomId.toString(), this.filename,
+ this.datanodeID.getHostName()));
+ }
+
+ if (curDataSlice == null || curDataSlice.remaining() == 0 && bytesNeededToFinish > 0) {
+ TraceScope scope = Trace.startSpan(
+ "RemoteBlockReader2#readNextPacket(" + blockId + ")", Sampler.NEVER);
+ try {
+ readNextPacket();
+ } finally {
+ scope.close();
+ }
+ }
+
+ if (LOG.isTraceEnabled()) {
+ LOG.trace(String.format("Finishing read #" + randomId));
+ }
+
+ if (curDataSlice.remaining() == 0) {
+ // we're at EOF now
+ return -1;
+ }
+
+ int nRead = Math.min(curDataSlice.remaining(), len);
+ curDataSlice.get(buf, off, nRead);
+
+ return nRead;
+ }
+
+
+ @Override
+ public synchronized int read(ByteBuffer buf) throws IOException {
+ if (curDataSlice == null || curDataSlice.remaining() == 0 && bytesNeededToFinish > 0) {
+ TraceScope scope = Trace.startSpan(
+ "RemoteBlockReader2#readNextPacket(" + blockId + ")", Sampler.NEVER);
+ try {
+ readNextPacket();
+ } finally {
+ scope.close();
+ }
+ }
+ if (curDataSlice.remaining() == 0) {
+ // we're at EOF now
+ return -1;
+ }
+
+ int nRead = Math.min(curDataSlice.remaining(), buf.remaining());
+ ByteBuffer writeSlice = curDataSlice.duplicate();
+ writeSlice.limit(writeSlice.position() + nRead);
+ buf.put(writeSlice);
+ curDataSlice.position(writeSlice.position());
+
+ return nRead;
+ }
+
+ private void readNextPacket() throws IOException {
+ //Read packet headers.
+ packetReceiver.receiveNextPacket(in);
+
+ PacketHeader curHeader = packetReceiver.getHeader();
+ curDataSlice = packetReceiver.getDataSlice();
+ assert curDataSlice.capacity() == curHeader.getDataLen();
+
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("DFSClient readNextPacket got header " + curHeader);
+ }
+
+ // Sanity check the lengths
+ if (!curHeader.sanityCheck(lastSeqNo)) {
+ throw new IOException("BlockReader: error in packet header " +
+ curHeader);
+ }
+
+ if (curHeader.getDataLen() > 0) {
+ int chunks = 1 + (curHeader.getDataLen() - 1) / bytesPerChecksum;
+ int checksumsLen = chunks * checksumSize;
+
+ assert packetReceiver.getChecksumSlice().capacity() == checksumsLen :
+ "checksum slice capacity=" + packetReceiver.getChecksumSlice().capacity() +
+ " checksumsLen=" + checksumsLen;
+
+ lastSeqNo = curHeader.getSeqno();
+ if (verifyChecksum && curDataSlice.remaining() > 0) {
+ // N.B.: the checksum error offset reported here is actually
+ // relative to the start of the block, not the start of the file.
+ // This is slightly misleading, but preserves the behavior from
+ // the older BlockReader.
+ checksum.verifyChunkedSums(curDataSlice,
+ packetReceiver.getChecksumSlice(),
+ filename, curHeader.getOffsetInBlock());
+ }
+ bytesNeededToFinish -= curHeader.getDataLen();
+ }
+
+ // First packet will include some data prior to the first byte
+ // the user requested. Skip it.
+ if (curHeader.getOffsetInBlock() < startOffset) {
+ int newPos = (int) (startOffset - curHeader.getOffsetInBlock());
+ curDataSlice.position(newPos);
+ }
+
+ // If we've now satisfied the whole client read, read one last packet
+ // header, which should be empty
+ if (bytesNeededToFinish <= 0) {
+ readTrailingEmptyPacket();
+ if (verifyChecksum) {
+ sendReadResult(Status.CHECKSUM_OK);
+ } else {
+ sendReadResult(Status.SUCCESS);
+ }
+ }
+ }
+
+ @Override
+ public synchronized long skip(long n) throws IOException {
+ /* How can we make sure we don't throw a ChecksumException, at least
+ * in majority of the cases?. This one throws. */
+ long skipped = 0;
+ while (skipped < n) {
+ long needToSkip = n - skipped;
+ if (curDataSlice == null || curDataSlice.remaining() == 0 && bytesNeededToFinish > 0) {
+ readNextPacket();
+ }
+ if (curDataSlice.remaining() == 0) {
+ // we're at EOF now
+ break;
+ }
+
+ int skip = (int)Math.min(curDataSlice.remaining(), needToSkip);
+ curDataSlice.position(curDataSlice.position() + skip);
+ skipped += skip;
+ }
+ return skipped;
+ }
+
+ private void readTrailingEmptyPacket() throws IOException {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Reading empty packet at end of read");
+ }
+
+ packetReceiver.receiveNextPacket(in);
+
+ PacketHeader trailer = packetReceiver.getHeader();
+ if (!trailer.isLastPacketInBlock() ||
+ trailer.getDataLen() != 0) {
+ throw new IOException("Expected empty end-of-read packet! Header: " +
+ trailer);
+ }
+ }
+
+ protected RemoteBlockReader2(String file, String bpid, long blockId,
+ DataChecksum checksum, boolean verifyChecksum,
+ long startOffset, long firstChunkOffset, long bytesToRead, Peer peer,
+ DatanodeID datanodeID, PeerCache peerCache) {
+ this.isLocal = DFSUtilClient.isLocalAddress(NetUtils.
+ createSocketAddr(datanodeID.getXferAddr()));
+ // Path is used only for printing block and file information in debug
+ this.peer = peer;
+ this.datanodeID = datanodeID;
+ this.in = peer.getInputStreamChannel();
+ this.checksum = checksum;
+ this.verifyChecksum = verifyChecksum;
+ this.startOffset = Math.max( startOffset, 0 );
+ this.filename = file;
+ this.peerCache = peerCache;
+ this.blockId = blockId;
+
+ // The total number of bytes that we need to transfer from the DN is
+ // the amount that the user wants (bytesToRead), plus the padding at
+ // the beginning in order to chunk-align. Note that the DN may elect
+ // to send more than this amount if the read starts/ends mid-chunk.
+ this.bytesNeededToFinish = bytesToRead + (startOffset - firstChunkOffset);
+ bytesPerChecksum = this.checksum.getBytesPerChecksum();
+ checksumSize = this.checksum.getChecksumSize();
+ }
+
+
+ @Override
+ public synchronized void close() throws IOException {
+ packetReceiver.close();
+ startOffset = -1;
+ checksum = null;
+ if (peerCache != null && sentStatusCode) {
+ peerCache.put(datanodeID, peer);
+ } else {
+ peer.close();
+ }
+
+ // in will be closed when its Socket is closed.
+ }
+
+ /**
+ * When the reader reaches end of the read, it sends a status response
+ * (e.g. CHECKSUM_OK) to the DN. Failure to do so could lead to the DN
+ * closing our connection (which we will re-open), but won't affect
+ * data correctness.
+ */
+ void sendReadResult(Status statusCode) {
+ assert !sentStatusCode : "already sent status code to " + peer;
+ try {
+ writeReadResult(peer.getOutputStream(), statusCode);
+ sentStatusCode = true;
+ } catch (IOException e) {
+ // It's ok not to be able to send this. But something is probably wrong.
+ LOG.info("Could not send read status (" + statusCode + ") to datanode " +
+ peer.getRemoteAddressString() + ": " + e.getMessage());
+ }
+ }
+
+ /**
+ * Serialize the actual read result on the wire.
+ */
+ static void writeReadResult(OutputStream out, Status statusCode)
+ throws IOException {
+
+ ClientReadStatusProto.newBuilder()
+ .setStatus(statusCode)
+ .build()
+ .writeDelimitedTo(out);
+
+ out.flush();
+ }
+
+ /**
+ * File name to print when accessing a block directly (from servlets)
+ * @param s Address of the block location
+ * @param poolId Block pool ID of the block
+ * @param blockId Block ID of the block
+ * @return string that has a file name for debug purposes
+ */
+ public static String getFileName(final InetSocketAddress s,
+ final String poolId, final long blockId) {
+ return s.toString() + ":" + poolId + ":" + blockId;
+ }
+
+ @Override
+ public int readAll(byte[] buf, int offset, int len) throws IOException {
+ return BlockReaderUtil.readAll(this, buf, offset, len);
+ }
+
+ @Override
+ public void readFully(byte[] buf, int off, int len) throws IOException {
+ BlockReaderUtil.readFully(this, buf, off, len);
+ }
+
+ /**
+ * Create a new BlockReader specifically to satisfy a read.
+ * This method also sends the OP_READ_BLOCK request.
+ *
+ * @param file File location
+ * @param block The block object
+ * @param blockToken The block token for security
+ * @param startOffset The read offset, relative to block head
+ * @param len The number of bytes to read
+ * @param verifyChecksum Whether to verify checksum
+ * @param clientName Client name
+ * @param peer The Peer to use
+ * @param datanodeID The DatanodeID this peer is connected to
+ * @return New BlockReader instance, or null on error.
+ */
+ public static BlockReader newBlockReader(String file,
+ ExtendedBlock block,
+ Token<BlockTokenIdentifier> blockToken,
+ long startOffset, long len,
+ boolean verifyChecksum,
+ String clientName,
+ Peer peer, DatanodeID datanodeID,
+ PeerCache peerCache,
+ CachingStrategy cachingStrategy) throws IOException {
+ // in and out will be closed when sock is closed (by the caller)
+ final DataOutputStream out = new DataOutputStream(new BufferedOutputStream(
+ peer.getOutputStream()));
+ new Sender(out).readBlock(block, blockToken, clientName, startOffset, len,
+ verifyChecksum, cachingStrategy);
+
+ //
+ // Get bytes in block
+ //
+ DataInputStream in = new DataInputStream(peer.getInputStream());
+
+ BlockOpResponseProto status = BlockOpResponseProto.parseFrom(
+ PBHelperClient.vintPrefixed(in));
+ checkSuccess(status, peer, block, file);
+ ReadOpChecksumInfoProto checksumInfo =
+ status.getReadOpChecksumInfo();
+ DataChecksum checksum = DataTransferProtoUtil.fromProto(
+ checksumInfo.getChecksum());
+ //Warning when we get CHECKSUM_NULL?
+
+ // Read the first chunk offset.
+ long firstChunkOffset = checksumInfo.getChunkOffset();
+
+ if ( firstChunkOffset < 0 || firstChunkOffset > startOffset ||
+ firstChunkOffset <= (startOffset - checksum.getBytesPerChecksum())) {
+ throw new IOException("BlockReader: error in first chunk offset (" +
+ firstChunkOffset + ") startOffset is " +
+ startOffset + " for file " + file);
+ }
+
+ return new RemoteBlockReader2(file, block.getBlockPoolId(), block.getBlockId(),
+ checksum, verifyChecksum, startOffset, firstChunkOffset, len, peer,
+ datanodeID, peerCache);
+ }
+
+ static void checkSuccess(
+ BlockOpResponseProto status, Peer peer,
+ ExtendedBlock block, String file)
+ throws IOException {
+ String logInfo = "for OP_READ_BLOCK"
+ + ", self=" + peer.getLocalAddressString()
+ + ", remote=" + peer.getRemoteAddressString()
+ + ", for file " + file
+ + ", for pool " + block.getBlockPoolId()
+ + " block " + block.getBlockId() + "_" + block.getGenerationStamp();
+ DataTransferProtoUtil.checkBlockOpStatus(status, logInfo);
+ }
+
+ @Override
+ public int available() throws IOException {
+ // An optimistic estimate of how much data is available
+ // to us without doing network I/O.
+ return TCP_WINDOW_SIZE;
+ }
+
+ @Override
+ public boolean isLocal() {
+ return isLocal;
+ }
+
+ @Override
+ public boolean isShortCircuit() {
+ return false;
+ }
+
+ @Override
+ public ClientMmap getClientMmap(EnumSet<ReadOption> opts) {
+ return null;
+ }
++
++ @Override
++ public DataChecksum getDataChecksum() {
++ return checksum;
++ }
+ }
http://git-wip-us.apache.org/repos/asf/hadoop/blob/53358fe6/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/53358fe6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/53358fe6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/53358fe6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/53358fe6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/53358fe6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java
index 36da863,9f26ca3..97445a6
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java
@@@ -961,8 -953,8 +961,8 @@@ public class ClientNamenodeProtocolServ
RpcController controller, UpdateBlockForPipelineRequestProto req)
throws ServiceException {
try {
- LocatedBlockProto result = PBHelper.convert(server
- .updateBlockForPipeline(PBHelperClient.convert(req.getBlock()),
+ LocatedBlockProto result = PBHelper.convertLocatedBlock(
- server.updateBlockForPipeline(PBHelper.convert(req.getBlock()),
++ server.updateBlockForPipeline(PBHelperClient.convert(req.getBlock()),
req.getClientName()));
return UpdateBlockForPipelineResponseProto.newBuilder().setBlock(result)
.build();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/53358fe6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/53358fe6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
index f292ee8,6f16d83..f419c46
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
@@@ -23,18 -23,11 +23,16 @@@ import static org.apache.hadoop.hdfs.pr
import static org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.CipherSuiteProto;
import static org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.CryptoProtocolVersionProto;
- import java.io.EOFException;
import java.io.IOException;
- import java.io.InputStream;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
import java.util.EnumSet;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
import org.apache.hadoop.fs.CacheFlag;
import org.apache.hadoop.fs.ContentSummary;
@@@ -134,13 -122,9 +131,14 @@@ import org.apache.hadoop.hdfs.protocol.
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.RegisterCommandProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.VolumeFailureSummaryProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockReportContextProto;
-import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos;
+import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.BlockECRecoveryInfoProto;
+import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.ErasureCodingZoneProto;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ECSchemaOptionEntryProto;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ECSchemaProto;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ErasureCodingPolicyProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockKeyProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockProto;
++import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ExtendedBlockProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockStoragePolicyProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockWithLocationsProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlocksWithLocationsProto;
@@@ -233,15 -214,11 +230,12 @@@ import org.apache.hadoop.hdfs.server.pr
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
- import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.ShmId;
import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.SlotId;
- import org.apache.hadoop.hdfs.util.ExactSizeInputStream;
import org.apache.hadoop.io.EnumSetWritable;
import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.erasurecode.ECSchema;
import org.apache.hadoop.security.proto.SecurityProtos.TokenProto;
import org.apache.hadoop.security.token.Token;
- import org.apache.hadoop.util.DataChecksum;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
@@@ -784,23 -726,9 +771,23 @@@ public class PBHelper
}
}
- LocatedBlock lb = new LocatedBlock(PBHelperClient.convert(proto.getB()), targets,
- storageIDs, storageTypes, proto.getOffset(), proto.getCorrupt(),
- cachedLocs.toArray(new DatanodeInfo[0]));
+ final LocatedBlock lb;
+ if (indices == null) {
- lb = new LocatedBlock(PBHelper.convert(proto.getB()), targets, storageIDs,
- storageTypes, proto.getOffset(), proto.getCorrupt(),
++ lb = new LocatedBlock(PBHelperClient.convert(proto.getB()), targets,
++ storageIDs, storageTypes, proto.getOffset(), proto.getCorrupt(),
+ cachedLocs.toArray(new DatanodeInfo[cachedLocs.size()]));
+ } else {
- lb = new LocatedStripedBlock(PBHelper.convert(proto.getB()), targets,
++ lb = new LocatedStripedBlock(PBHelperClient.convert(proto.getB()), targets,
+ storageIDs, storageTypes, indices, proto.getOffset(),
+ proto.getCorrupt(),
+ cachedLocs.toArray(new DatanodeInfo[cachedLocs.size()]));
+ List<TokenProto> tokenProtos = proto.getBlockTokensList();
+ Token<BlockTokenIdentifier>[] blockTokens = new Token[indices.length];
+ for (int i = 0; i < indices.length; i++) {
+ blockTokens[i] = PBHelper.convert(tokenProtos.get(i));
+ }
+ ((LocatedStripedBlock) lb).setBlockTokens(blockTokens);
+ }
lb.setBlockToken(PBHelper.convert(proto.getBlockToken()));
return lb;
@@@ -2954,192 -2860,4 +2935,192 @@@
setLeaseId(context.getLeaseId()).
build();
}
+
+ public static ECSchema convertECSchema(ECSchemaProto schema) {
+ List<ECSchemaOptionEntryProto> optionsList = schema.getOptionsList();
+ Map<String, String> options = new HashMap<>(optionsList.size());
+ for (ECSchemaOptionEntryProto option : optionsList) {
+ options.put(option.getKey(), option.getValue());
+ }
+ return new ECSchema(schema.getCodecName(), schema.getDataUnits(),
+ schema.getParityUnits(), options);
+ }
+
+ public static ECSchemaProto convertECSchema(ECSchema schema) {
+ ECSchemaProto.Builder builder = ECSchemaProto.newBuilder()
+ .setCodecName(schema.getCodecName())
+ .setDataUnits(schema.getNumDataUnits())
+ .setParityUnits(schema.getNumParityUnits());
+ Set<Entry<String, String>> entrySet = schema.getExtraOptions().entrySet();
+ for (Entry<String, String> entry : entrySet) {
+ builder.addOptions(ECSchemaOptionEntryProto.newBuilder()
+ .setKey(entry.getKey()).setValue(entry.getValue()).build());
+ }
+ return builder.build();
+ }
+
+ public static ErasureCodingPolicy convertErasureCodingPolicy(
+ ErasureCodingPolicyProto policy) {
+ return new ErasureCodingPolicy(policy.getName(),
+ convertECSchema(policy.getSchema()),
+ policy.getCellSize());
+ }
+
+ public static ErasureCodingPolicyProto convertErasureCodingPolicy(
+ ErasureCodingPolicy policy) {
+ ErasureCodingPolicyProto.Builder builder = ErasureCodingPolicyProto
+ .newBuilder()
+ .setName(policy.getName())
+ .setSchema(convertECSchema(policy.getSchema()))
+ .setCellSize(policy.getCellSize());
+ return builder.build();
+ }
+
+ public static ErasureCodingZoneProto convertErasureCodingZone(
+ ErasureCodingZone ecZone) {
+ return ErasureCodingZoneProto.newBuilder().setDir(ecZone.getDir())
+ .setEcPolicy(convertErasureCodingPolicy(ecZone.getErasureCodingPolicy()))
+ .build();
+ }
+
+ public static ErasureCodingZone convertErasureCodingZone(
+ ErasureCodingZoneProto ecZoneProto) {
+ return new ErasureCodingZone(ecZoneProto.getDir(),
+ convertErasureCodingPolicy(ecZoneProto.getEcPolicy()));
+ }
+
+ public static BlockECRecoveryInfo convertBlockECRecoveryInfo(
+ BlockECRecoveryInfoProto blockEcRecoveryInfoProto) {
+ ExtendedBlockProto blockProto = blockEcRecoveryInfoProto.getBlock();
- ExtendedBlock block = convert(blockProto);
++ ExtendedBlock block = PBHelperClient.convert(blockProto);
+
+ DatanodeInfosProto sourceDnInfosProto = blockEcRecoveryInfoProto
+ .getSourceDnInfos();
+ DatanodeInfo[] sourceDnInfos = convert(sourceDnInfosProto);
+
+ DatanodeInfosProto targetDnInfosProto = blockEcRecoveryInfoProto
+ .getTargetDnInfos();
+ DatanodeInfo[] targetDnInfos = convert(targetDnInfosProto);
+
+ StorageUuidsProto targetStorageUuidsProto = blockEcRecoveryInfoProto
+ .getTargetStorageUuids();
+ String[] targetStorageUuids = convert(targetStorageUuidsProto);
+
+ StorageTypesProto targetStorageTypesProto = blockEcRecoveryInfoProto
+ .getTargetStorageTypes();
+ StorageType[] convertStorageTypes = convertStorageTypes(
+ targetStorageTypesProto.getStorageTypesList(), targetStorageTypesProto
+ .getStorageTypesList().size());
+
+ List<Integer> liveBlockIndicesList = blockEcRecoveryInfoProto
+ .getLiveBlockIndicesList();
+ short[] liveBlkIndices = new short[liveBlockIndicesList.size()];
+ for (int i = 0; i < liveBlockIndicesList.size(); i++) {
+ liveBlkIndices[i] = liveBlockIndicesList.get(i).shortValue();
+ }
+
+ ErasureCodingPolicy ecPolicy =
+ convertErasureCodingPolicy(blockEcRecoveryInfoProto.getEcPolicy());
+
+ return new BlockECRecoveryInfo(block, sourceDnInfos, targetDnInfos,
+ targetStorageUuids, convertStorageTypes, liveBlkIndices, ecPolicy);
+ }
+
+ public static BlockECRecoveryInfoProto convertBlockECRecoveryInfo(
+ BlockECRecoveryInfo blockEcRecoveryInfo) {
+ BlockECRecoveryInfoProto.Builder builder = BlockECRecoveryInfoProto
+ .newBuilder();
+ builder.setBlock(PBHelperClient.convert(
+ blockEcRecoveryInfo.getExtendedBlock()));
+
+ DatanodeInfo[] sourceDnInfos = blockEcRecoveryInfo.getSourceDnInfos();
+ builder.setSourceDnInfos(convertToDnInfosProto(sourceDnInfos));
+
+ DatanodeInfo[] targetDnInfos = blockEcRecoveryInfo.getTargetDnInfos();
+ builder.setTargetDnInfos(convertToDnInfosProto(targetDnInfos));
+
+ String[] targetStorageIDs = blockEcRecoveryInfo.getTargetStorageIDs();
+ builder.setTargetStorageUuids(convertStorageIDs(targetStorageIDs));
+
+ StorageType[] targetStorageTypes = blockEcRecoveryInfo
+ .getTargetStorageTypes();
+ builder.setTargetStorageTypes(convertStorageTypesProto(targetStorageTypes));
+
+ short[] liveBlockIndices = blockEcRecoveryInfo.getLiveBlockIndices();
+ builder.addAllLiveBlockIndices(convertIntArray(liveBlockIndices));
+
+ builder.setEcPolicy(convertErasureCodingPolicy(blockEcRecoveryInfo
+ .getErasureCodingPolicy()));
+
+ return builder.build();
+ }
+
+ private static List<Integer> convertIntArray(short[] liveBlockIndices) {
+ List<Integer> liveBlockIndicesList = new ArrayList<Integer>();
+ for (short s : liveBlockIndices) {
+ liveBlockIndicesList.add((int) s);
+ }
+ return liveBlockIndicesList;
+ }
+
+ private static StorageTypesProto convertStorageTypesProto(
+ StorageType[] targetStorageTypes) {
+ StorageTypesProto.Builder builder = StorageTypesProto.newBuilder();
+ for (StorageType storageType : targetStorageTypes) {
+ builder.addStorageTypes(PBHelperClient.convertStorageType(storageType));
+ }
+ return builder.build();
+ }
+
+ private static StorageUuidsProto convertStorageIDs(String[] targetStorageIDs) {
+ StorageUuidsProto.Builder builder = StorageUuidsProto.newBuilder();
+ for (String storageUuid : targetStorageIDs) {
+ builder.addStorageUuids(storageUuid);
+ }
+ return builder.build();
+ }
+
+ private static DatanodeInfosProto convertToDnInfosProto(DatanodeInfo[] dnInfos) {
+ DatanodeInfosProto.Builder builder = DatanodeInfosProto.newBuilder();
+ for (DatanodeInfo datanodeInfo : dnInfos) {
+ builder.addDatanodes(PBHelperClient.convert(datanodeInfo));
+ }
+ return builder.build();
+ }
+
+ private static String[] convert(StorageUuidsProto targetStorageUuidsProto) {
+ List<String> storageUuidsList = targetStorageUuidsProto
+ .getStorageUuidsList();
+ String[] storageUuids = new String[storageUuidsList.size()];
+ for (int i = 0; i < storageUuidsList.size(); i++) {
+ storageUuids[i] = storageUuidsList.get(i);
+ }
+ return storageUuids;
+ }
+
+ public static BlockECRecoveryCommandProto convert(
+ BlockECRecoveryCommand blkECRecoveryCmd) {
+ BlockECRecoveryCommandProto.Builder builder = BlockECRecoveryCommandProto
+ .newBuilder();
+ Collection<BlockECRecoveryInfo> blockECRecoveryInfos = blkECRecoveryCmd
+ .getECTasks();
+ for (BlockECRecoveryInfo blkECRecoveryInfo : blockECRecoveryInfos) {
+ builder
+ .addBlockECRecoveryinfo(convertBlockECRecoveryInfo(blkECRecoveryInfo));
+ }
+ return builder.build();
+ }
+
+ public static BlockECRecoveryCommand convert(
+ BlockECRecoveryCommandProto blkECRecoveryCmdProto) {
+ Collection<BlockECRecoveryInfo> blkECRecoveryInfos = new ArrayList<BlockECRecoveryInfo>();
+ List<BlockECRecoveryInfoProto> blockECRecoveryinfoList = blkECRecoveryCmdProto
+ .getBlockECRecoveryinfoList();
+ for (BlockECRecoveryInfoProto blockECRecoveryInfoProto : blockECRecoveryinfoList) {
+ blkECRecoveryInfos
+ .add(convertBlockECRecoveryInfo(blockECRecoveryInfoProto));
+ }
+ return new BlockECRecoveryCommand(DatanodeProtocol.DNA_ERASURE_CODING_RECOVERY,
+ blkECRecoveryInfos);
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/53358fe6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java
index dc296ac,810784d..92a1135
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java
@@@ -30,11 -31,11 +31,12 @@@ import org.apache.hadoop.util.LightWeig
import static org.apache.hadoop.hdfs.server.namenode.INodeId.INVALID_INODE_ID;
/**
- * BlockInfo class maintains for a given block
- * the {@link BlockCollection} it is part of and datanodes where the replicas of
- * the block are stored.
+ * For a given block (or an erasure coding block group), BlockInfo class
+ * maintains 1) the {@link BlockCollection} it is part of, and 2) datanodes
+ * where the replicas of the block, or blocks belonging to the erasure coding
+ * block group, are stored.
*/
+ @InterfaceAudience.Private
public abstract class BlockInfo extends Block
implements LightWeightGSet.LinkedElement {
@@@ -203,17 -206,6 +205,11 @@@
*/
abstract boolean removeStorage(DatanodeStorageInfo storage);
- /**
- * Replace the current BlockInfo with the new one in corresponding
- * DatanodeStorageInfo's linked list
- */
- abstract void replaceBlock(BlockInfo newBlock);
-
+ public abstract boolean isStriped();
+
+ /** @return true if there is no datanode storage associated with the block */
+ abstract boolean hasNoStorage();
+
/**
* Find specified DatanodeStorageInfo.
* @return DatanodeStorageInfo or null if not found.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/53358fe6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguous.java
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguous.java
index b9d8486,94fb222..746e298
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguous.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguous.java
@@@ -95,29 -104,4 +95,14 @@@ public class BlockInfoContiguous extend
}
return 0;
}
+
+ @Override
- void replaceBlock(BlockInfo newBlock) {
- assert newBlock instanceof BlockInfoContiguous;
- for (int i = this.numNodes() - 1; i >= 0; i--) {
- final DatanodeStorageInfo storage = this.getStorageInfo(i);
- final boolean removed = storage.removeBlock(this);
- assert removed : "currentBlock not found.";
-
- final DatanodeStorageInfo.AddBlockResult result = storage.addBlock(
- newBlock, newBlock);
- assert result == DatanodeStorageInfo.AddBlockResult.ADDED :
- "newBlock already exists.";
- }
- }
-
- @Override
+ public final boolean isStriped() {
+ return false;
+ }
+
+ @Override
+ final boolean hasNoStorage() {
+ return getStorageInfo(0) == null;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/53358fe6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStriped.java
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStriped.java
index 7b21cbe,0000000..df48655
mode 100644,000000..100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStriped.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStriped.java
@@@ -1,253 -1,0 +1,234 @@@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.blockmanagement;
+
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
+import org.apache.hadoop.hdfs.util.StripedBlockUtil;
+import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
+
+import static org.apache.hadoop.hdfs.protocol.HdfsConstants.BLOCK_STRIPED_CELL_SIZE;
+
+/**
+ * Subclass of {@link BlockInfo}, presenting a block group in erasure coding.
+ *
+ * We still use triplets to store DatanodeStorageInfo for each block in the
+ * block group, as well as the previous/next block in the corresponding
+ * DatanodeStorageInfo. For a (m+k) block group, the first (m+k) triplet units
+ * are sorted and strictly mapped to the corresponding block.
+ *
+ * Normally each block belonging to group is stored in only one DataNode.
+ * However, it is possible that some block is over-replicated. Thus the triplet
+ * array's size can be larger than (m+k). Thus currently we use an extra byte
+ * array to record the block index for each triplet.
+ */
+public class BlockInfoStriped extends BlockInfo {
+ private final ErasureCodingPolicy ecPolicy;
+ /**
+ * Always the same size with triplets. Record the block index for each triplet
+ * TODO: actually this is only necessary for over-replicated block. Thus can
+ * be further optimized to save memory usage.
+ */
+ private byte[] indices;
+
+ public BlockInfoStriped(Block blk, ErasureCodingPolicy ecPolicy) {
+ super(blk, (short) (ecPolicy.getNumDataUnits() + ecPolicy.getNumParityUnits()));
+ indices = new byte[ecPolicy.getNumDataUnits() + ecPolicy.getNumParityUnits()];
+ initIndices();
+ this.ecPolicy = ecPolicy;
+ }
+
+ public short getTotalBlockNum() {
+ return (short) (ecPolicy.getNumDataUnits() + ecPolicy.getNumParityUnits());
+ }
+
+ public short getDataBlockNum() {
+ return (short) ecPolicy.getNumDataUnits();
+ }
+
+ public short getParityBlockNum() {
+ return (short) ecPolicy.getNumParityUnits();
+ }
+
+ /**
+ * If the block is committed/completed and its length is less than a full
+ * stripe, it returns the the number of actual data blocks.
+ * Otherwise it returns the number of data units specified by erasure coding policy.
+ */
+ public short getRealDataBlockNum() {
+ if (isComplete() || getBlockUCState() == BlockUCState.COMMITTED) {
+ return (short) Math.min(getDataBlockNum(),
+ (getNumBytes() - 1) / BLOCK_STRIPED_CELL_SIZE + 1);
+ } else {
+ return getDataBlockNum();
+ }
+ }
+
+ public short getRealTotalBlockNum() {
+ return (short) (getRealDataBlockNum() + getParityBlockNum());
+ }
+
+ public ErasureCodingPolicy getErasureCodingPolicy() {
+ return ecPolicy;
+ }
+
+ private void initIndices() {
+ for (int i = 0; i < indices.length; i++) {
+ indices[i] = -1;
+ }
+ }
+
+ private int findSlot() {
+ int i = getTotalBlockNum();
+ for (; i < getCapacity(); i++) {
+ if (getStorageInfo(i) == null) {
+ return i;
+ }
+ }
+ // need to expand the triplet size
+ ensureCapacity(i + 1, true);
+ return i;
+ }
+
+ @Override
+ boolean addStorage(DatanodeStorageInfo storage, Block reportedBlock) {
+ int blockIndex = BlockIdManager.getBlockIndex(reportedBlock);
+ int index = blockIndex;
+ DatanodeStorageInfo old = getStorageInfo(index);
+ if (old != null && !old.equals(storage)) { // over replicated
+ // check if the storage has been stored
+ int i = findStorageInfo(storage);
+ if (i == -1) {
+ index = findSlot();
+ } else {
+ return true;
+ }
+ }
+ addStorage(storage, index, blockIndex);
+ return true;
+ }
+
+ private void addStorage(DatanodeStorageInfo storage, int index,
+ int blockIndex) {
+ setStorageInfo(index, storage);
+ setNext(index, null);
+ setPrevious(index, null);
+ indices[index] = (byte) blockIndex;
+ }
+
+ private int findStorageInfoFromEnd(DatanodeStorageInfo storage) {
+ final int len = getCapacity();
+ for(int idx = len - 1; idx >= 0; idx--) {
+ DatanodeStorageInfo cur = getStorageInfo(idx);
+ if (storage.equals(cur)) {
+ return idx;
+ }
+ }
+ return -1;
+ }
+
+ int getStorageBlockIndex(DatanodeStorageInfo storage) {
+ int i = this.findStorageInfo(storage);
+ return i == -1 ? -1 : indices[i];
+ }
+
+ /**
+ * Identify the block stored in the given datanode storage. Note that
+ * the returned block has the same block Id with the one seen/reported by the
+ * DataNode.
+ */
+ Block getBlockOnStorage(DatanodeStorageInfo storage) {
+ int index = getStorageBlockIndex(storage);
+ if (index < 0) {
+ return null;
+ } else {
+ Block block = new Block(this);
+ block.setBlockId(this.getBlockId() + index);
+ return block;
+ }
+ }
+
+ @Override
+ boolean removeStorage(DatanodeStorageInfo storage) {
+ int dnIndex = findStorageInfoFromEnd(storage);
+ if (dnIndex < 0) { // the node is not found
+ return false;
+ }
+ assert getPrevious(dnIndex) == null && getNext(dnIndex) == null :
+ "Block is still in the list and must be removed first.";
+ // set the triplet to null
+ setStorageInfo(dnIndex, null);
+ setNext(dnIndex, null);
+ setPrevious(dnIndex, null);
+ indices[dnIndex] = -1;
+ return true;
+ }
+
+ private void ensureCapacity(int totalSize, boolean keepOld) {
+ if (getCapacity() < totalSize) {
+ Object[] old = triplets;
+ byte[] oldIndices = indices;
+ triplets = new Object[totalSize * 3];
+ indices = new byte[totalSize];
+ initIndices();
+
+ if (keepOld) {
+ System.arraycopy(old, 0, triplets, 0, old.length);
+ System.arraycopy(oldIndices, 0, indices, 0, oldIndices.length);
+ }
+ }
+ }
+
- @Override
- void replaceBlock(BlockInfo newBlock) {
- assert newBlock instanceof BlockInfoStriped;
- BlockInfoStriped newBlockGroup = (BlockInfoStriped) newBlock;
- final int size = getCapacity();
- newBlockGroup.ensureCapacity(size, false);
- for (int i = 0; i < size; i++) {
- final DatanodeStorageInfo storage = this.getStorageInfo(i);
- if (storage != null) {
- final int blockIndex = indices[i];
- final boolean removed = storage.removeBlock(this);
- assert removed : "currentBlock not found.";
-
- newBlockGroup.addStorage(storage, i, blockIndex);
- storage.insertToList(newBlockGroup);
- }
- }
- }
-
+ public long spaceConsumed() {
+ // In case striped blocks, total usage by this striped blocks should
+ // be the total of data blocks and parity blocks because
+ // `getNumBytes` is the total of actual data block size.
+ return StripedBlockUtil.spaceConsumedByStripedBlock(getNumBytes(),
+ ecPolicy.getNumDataUnits(), ecPolicy.getNumParityUnits(),
+ BLOCK_STRIPED_CELL_SIZE);
+ }
+
+ @Override
+ public final boolean isStriped() {
+ return true;
+ }
+
+ @Override
+ public int numNodes() {
+ assert this.triplets != null : "BlockInfo is not initialized";
+ assert triplets.length % 3 == 0 : "Malformed BlockInfo";
+ int num = 0;
+ for (int idx = getCapacity()-1; idx >= 0; idx--) {
+ if (getStorageInfo(idx) != null) {
+ num++;
+ }
+ }
+ return num;
+ }
+
+ @Override
+ final boolean hasNoStorage() {
+ final int len = getCapacity();
+ for(int idx = 0; idx < len; idx++) {
+ if (getStorageInfo(idx) != null) {
+ return false;
+ }
+ }
+ return true;
+ }
+}