You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by zh...@apache.org on 2015/09/02 07:59:01 UTC

[49/50] [abbrv] hadoop git commit: Merge remote-tracking branch 'apache/trunk' into HDFS-7285

http://git-wip-us.apache.org/repos/asf/hadoop/blob/53358fe6/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java
index 0000000,7509da5..6be94f3
mode 000000,100644..100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java
@@@ -1,0 -1,512 +1,517 @@@
+ /**
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.hadoop.hdfs;
+ 
+ import java.io.BufferedInputStream;
+ import java.io.BufferedOutputStream;
+ import java.io.DataInputStream;
+ import java.io.DataOutputStream;
+ import java.io.IOException;
+ import java.nio.ByteBuffer;
+ import java.util.EnumSet;
+ 
+ import org.apache.hadoop.classification.InterfaceAudience;
+ import org.apache.hadoop.fs.FSInputChecker;
+ import org.apache.hadoop.fs.Path;
+ import org.apache.hadoop.fs.ReadOption;
+ import org.apache.hadoop.hdfs.net.Peer;
+ import org.apache.hadoop.hdfs.protocol.Block;
+ import org.apache.hadoop.hdfs.protocol.DatanodeID;
+ import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+ import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
+ import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
+ import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
+ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
+ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReadOpChecksumInfoProto;
+ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
+ import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
+ import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+ import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
+ import org.apache.hadoop.hdfs.shortcircuit.ClientMmap;
+ import org.apache.hadoop.io.IOUtils;
+ import org.apache.hadoop.net.NetUtils;
+ import org.apache.hadoop.security.token.Token;
+ import org.apache.hadoop.util.DataChecksum;
+ import org.apache.htrace.Sampler;
+ import org.apache.htrace.Trace;
+ import org.apache.htrace.TraceScope;
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+ 
+ 
+ /**
+  * @deprecated this is an old implementation that is being left around
+  * in case any issues spring up with the new {@link RemoteBlockReader2} implementation.
+  * It will be removed in the next release.
+  */
+ @InterfaceAudience.Private
+ @Deprecated
+ public class RemoteBlockReader extends FSInputChecker implements BlockReader {
+   static final Logger LOG = LoggerFactory.getLogger(FSInputChecker.class);
+ 
+   private final Peer peer;
+   private final DatanodeID datanodeID;
+   private final DataInputStream in;
+   private DataChecksum checksum;
+ 
+   /** offset in block of the last chunk received */
+   private long lastChunkOffset = -1;
+   private long lastChunkLen = -1;
+   private long lastSeqNo = -1;
+ 
+   /** offset in block where reader wants to actually read */
+   private long startOffset;
+ 
+   private final long blockId;
+ 
+   /** offset in block of of first chunk - may be less than startOffset
+       if startOffset is not chunk-aligned */
+   private final long firstChunkOffset;
+ 
+   private final int bytesPerChecksum;
+   private final int checksumSize;
+ 
+   /**
+    * The total number of bytes we need to transfer from the DN.
+    * This is the amount that the user has requested plus some padding
+    * at the beginning so that the read can begin on a chunk boundary.
+    */
+   private final long bytesNeededToFinish;
+   
+   /**
+    * True if we are reading from a local DataNode.
+    */
+   private final boolean isLocal;
+ 
+   private boolean eos = false;
+   private boolean sentStatusCode = false;
+   
+   ByteBuffer checksumBytes = null;
+   /** Amount of unread data in the current received packet */
+   int dataLeft = 0;
+   
+   private final PeerCache peerCache;
+   
+   /* FSInputChecker interface */
+   
+   /* same interface as inputStream java.io.InputStream#read()
+    * used by DFSInputStream#read()
+    * This violates one rule when there is a checksum error:
+    * "Read should not modify user buffer before successful read"
+    * because it first reads the data to user buffer and then checks
+    * the checksum.
+    */
+   @Override
+   public synchronized int read(byte[] buf, int off, int len) 
+                                throws IOException {
+     
+     // This has to be set here, *before* the skip, since we can
+     // hit EOS during the skip, in the case that our entire read
+     // is smaller than the checksum chunk.
+     boolean eosBefore = eos;
+ 
+     //for the first read, skip the extra bytes at the front.
+     if (lastChunkLen < 0 && startOffset > firstChunkOffset && len > 0) {
+       // Skip these bytes. But don't call this.skip()!
+       int toSkip = (int)(startOffset - firstChunkOffset);
+       if ( super.readAndDiscard(toSkip) != toSkip ) {
+         // should never happen
+         throw new IOException("Could not skip required number of bytes");
+       }
+     }
+     
+     int nRead = super.read(buf, off, len);
+ 
+     // if eos was set in the previous read, send a status code to the DN
+     if (eos && !eosBefore && nRead >= 0) {
+       if (needChecksum()) {
+         sendReadResult(peer, Status.CHECKSUM_OK);
+       } else {
+         sendReadResult(peer, Status.SUCCESS);
+       }
+     }
+     return nRead;
+   }
+ 
+   @Override
+   public synchronized long skip(long n) throws IOException {
+     /* How can we make sure we don't throw a ChecksumException, at least
+      * in majority of the cases?. This one throws. */  
+     long nSkipped = 0;
+     while (nSkipped < n) {
+       int toSkip = (int)Math.min(n-nSkipped, Integer.MAX_VALUE);
+       int ret = readAndDiscard(toSkip);
+       if (ret <= 0) {
+         return nSkipped;
+       }
+       nSkipped += ret;
+     }
+     return nSkipped;
+   }
+ 
+   @Override
+   public int read() throws IOException {
+     throw new IOException("read() is not expected to be invoked. " +
+                           "Use read(buf, off, len) instead.");
+   }
+   
+   @Override
+   public boolean seekToNewSource(long targetPos) throws IOException {
+     /* Checksum errors are handled outside the BlockReader. 
+      * DFSInputStream does not always call 'seekToNewSource'. In the 
+      * case of pread(), it just tries a different replica without seeking.
+      */ 
+     return false;
+   }
+   
+   @Override
+   public void seek(long pos) throws IOException {
+     throw new IOException("Seek() is not supported in BlockInputChecker");
+   }
+ 
+   @Override
+   protected long getChunkPosition(long pos) {
+     throw new RuntimeException("getChunkPosition() is not supported, " +
+                                "since seek is not required");
+   }
+   
+   /**
+    * Makes sure that checksumBytes has enough capacity 
+    * and limit is set to the number of checksum bytes needed 
+    * to be read.
+    */
+   private void adjustChecksumBytes(int dataLen) {
+     int requiredSize = 
+       ((dataLen + bytesPerChecksum - 1)/bytesPerChecksum)*checksumSize;
+     if (checksumBytes == null || requiredSize > checksumBytes.capacity()) {
+       checksumBytes =  ByteBuffer.wrap(new byte[requiredSize]);
+     } else {
+       checksumBytes.clear();
+     }
+     checksumBytes.limit(requiredSize);
+   }
+   
+   @Override
+   protected synchronized int readChunk(long pos, byte[] buf, int offset, 
+                                        int len, byte[] checksumBuf) 
+                                        throws IOException {
+     TraceScope scope =
+         Trace.startSpan("RemoteBlockReader#readChunk(" + blockId + ")",
+             Sampler.NEVER);
+     try {
+       return readChunkImpl(pos, buf, offset, len, checksumBuf);
+     } finally {
+       scope.close();
+     }
+   }
+ 
+   private synchronized int readChunkImpl(long pos, byte[] buf, int offset,
+                                      int len, byte[] checksumBuf)
+                                      throws IOException {
+     // Read one chunk.
+     if (eos) {
+       // Already hit EOF
+       return -1;
+     }
+     
+     // Read one DATA_CHUNK.
+     long chunkOffset = lastChunkOffset;
+     if ( lastChunkLen > 0 ) {
+       chunkOffset += lastChunkLen;
+     }
+     
+     // pos is relative to the start of the first chunk of the read.
+     // chunkOffset is relative to the start of the block.
+     // This makes sure that the read passed from FSInputChecker is the
+     // for the same chunk we expect to be reading from the DN.
+     if ( (pos + firstChunkOffset) != chunkOffset ) {
+       throw new IOException("Mismatch in pos : " + pos + " + " + 
+                             firstChunkOffset + " != " + chunkOffset);
+     }
+ 
+     // Read next packet if the previous packet has been read completely.
+     if (dataLeft <= 0) {
+       //Read packet headers.
+       PacketHeader header = new PacketHeader();
+       header.readFields(in);
+ 
+       if (LOG.isDebugEnabled()) {
+         LOG.debug("DFSClient readChunk got header " + header);
+       }
+ 
+       // Sanity check the lengths
+       if (!header.sanityCheck(lastSeqNo)) {
+            throw new IOException("BlockReader: error in packet header " +
+                                  header);
+       }
+ 
+       lastSeqNo = header.getSeqno();
+       dataLeft = header.getDataLen();
+       adjustChecksumBytes(header.getDataLen());
+       if (header.getDataLen() > 0) {
+         IOUtils.readFully(in, checksumBytes.array(), 0,
+                           checksumBytes.limit());
+       }
+     }
+ 
+     // Sanity checks
+     assert len >= bytesPerChecksum;
+     assert checksum != null;
+     assert checksumSize == 0 || (checksumBuf.length % checksumSize == 0);
+ 
+ 
+     int checksumsToRead, bytesToRead;
+ 
+     if (checksumSize > 0) {
+ 
+       // How many chunks left in our packet - this is a ceiling
+       // since we may have a partial chunk at the end of the file
+       int chunksLeft = (dataLeft - 1) / bytesPerChecksum + 1;
+ 
+       // How many chunks we can fit in databuffer
+       //  - note this is a floor since we always read full chunks
+       int chunksCanFit = Math.min(len / bytesPerChecksum,
+                                   checksumBuf.length / checksumSize);
+ 
+       // How many chunks should we read
+       checksumsToRead = Math.min(chunksLeft, chunksCanFit);
+       // How many bytes should we actually read
+       bytesToRead = Math.min(
+         checksumsToRead * bytesPerChecksum, // full chunks
+         dataLeft); // in case we have a partial
+     } else {
+       // no checksum
+       bytesToRead = Math.min(dataLeft, len);
+       checksumsToRead = 0;
+     }
+ 
+     if ( bytesToRead > 0 ) {
+       // Assert we have enough space
+       assert bytesToRead <= len;
+       assert checksumBytes.remaining() >= checksumSize * checksumsToRead;
+       assert checksumBuf.length >= checksumSize * checksumsToRead;
+       IOUtils.readFully(in, buf, offset, bytesToRead);
+       checksumBytes.get(checksumBuf, 0, checksumSize * checksumsToRead);
+     }
+ 
+     dataLeft -= bytesToRead;
+     assert dataLeft >= 0;
+ 
+     lastChunkOffset = chunkOffset;
+     lastChunkLen = bytesToRead;
+ 
+     // If there's no data left in the current packet after satisfying
+     // this read, and we have satisfied the client read, we expect
+     // an empty packet header from the DN to signify this.
+     // Note that pos + bytesToRead may in fact be greater since the
+     // DN finishes off the entire last chunk.
+     if (dataLeft == 0 &&
+         pos + bytesToRead >= bytesNeededToFinish) {
+ 
+       // Read header
+       PacketHeader hdr = new PacketHeader();
+       hdr.readFields(in);
+ 
+       if (!hdr.isLastPacketInBlock() ||
+           hdr.getDataLen() != 0) {
+         throw new IOException("Expected empty end-of-read packet! Header: " +
+                               hdr);
+       }
+ 
+       eos = true;
+     }
+ 
+     if ( bytesToRead == 0 ) {
+       return -1;
+     }
+ 
+     return bytesToRead;
+   }
+   
+   private RemoteBlockReader(String file, String bpid, long blockId,
+       DataInputStream in, DataChecksum checksum, boolean verifyChecksum,
+       long startOffset, long firstChunkOffset, long bytesToRead, Peer peer,
+       DatanodeID datanodeID, PeerCache peerCache) {
+     // Path is used only for printing block and file information in debug
+     super(new Path("/" + Block.BLOCK_FILE_PREFIX + blockId +
+                     ":" + bpid + ":of:"+ file)/*too non path-like?*/,
+           1, verifyChecksum,
+           checksum.getChecksumSize() > 0? checksum : null, 
+           checksum.getBytesPerChecksum(),
+           checksum.getChecksumSize());
+ 
+     this.isLocal = DFSUtilClient.isLocalAddress(NetUtils.
+         createSocketAddr(datanodeID.getXferAddr()));
+     
+     this.peer = peer;
+     this.datanodeID = datanodeID;
+     this.in = in;
+     this.checksum = checksum;
+     this.startOffset = Math.max( startOffset, 0 );
+     this.blockId = blockId;
+ 
+     // The total number of bytes that we need to transfer from the DN is
+     // the amount that the user wants (bytesToRead), plus the padding at
+     // the beginning in order to chunk-align. Note that the DN may elect
+     // to send more than this amount if the read starts/ends mid-chunk.
+     this.bytesNeededToFinish = bytesToRead + (startOffset - firstChunkOffset);
+ 
+     this.firstChunkOffset = firstChunkOffset;
+     lastChunkOffset = firstChunkOffset;
+     lastChunkLen = -1;
+ 
+     bytesPerChecksum = this.checksum.getBytesPerChecksum();
+     checksumSize = this.checksum.getChecksumSize();
+     this.peerCache = peerCache;
+   }
+ 
+   /**
+    * Create a new BlockReader specifically to satisfy a read.
+    * This method also sends the OP_READ_BLOCK request.
+    *
+    * @param file  File location
+    * @param block  The block object
+    * @param blockToken  The block token for security
+    * @param startOffset  The read offset, relative to block head
+    * @param len  The number of bytes to read
+    * @param bufferSize  The IO buffer size (not the client buffer size)
+    * @param verifyChecksum  Whether to verify checksum
+    * @param clientName  Client name
+    * @return New BlockReader instance, or null on error.
+    */
+   public static RemoteBlockReader newBlockReader(String file,
+                                      ExtendedBlock block, 
+                                      Token<BlockTokenIdentifier> blockToken,
+                                      long startOffset, long len,
+                                      int bufferSize, boolean verifyChecksum,
+                                      String clientName, Peer peer,
+                                      DatanodeID datanodeID,
+                                      PeerCache peerCache,
+                                      CachingStrategy cachingStrategy)
+                                        throws IOException {
+     // in and out will be closed when sock is closed (by the caller)
+     final DataOutputStream out =
+         new DataOutputStream(new BufferedOutputStream(peer.getOutputStream()));
+     new Sender(out).readBlock(block, blockToken, clientName, startOffset, len,
+         verifyChecksum, cachingStrategy);
+     
+     //
+     // Get bytes in block, set streams
+     //
+ 
+     DataInputStream in = new DataInputStream(
+         new BufferedInputStream(peer.getInputStream(), bufferSize));
+     
+     BlockOpResponseProto status = BlockOpResponseProto.parseFrom(
+         PBHelperClient.vintPrefixed(in));
+     RemoteBlockReader2.checkSuccess(status, peer, block, file);
+     ReadOpChecksumInfoProto checksumInfo =
+       status.getReadOpChecksumInfo();
+     DataChecksum checksum = DataTransferProtoUtil.fromProto(
+         checksumInfo.getChecksum());
+     //Warning when we get CHECKSUM_NULL?
+     
+     // Read the first chunk offset.
+     long firstChunkOffset = checksumInfo.getChunkOffset();
+     
+     if ( firstChunkOffset < 0 || firstChunkOffset > startOffset ||
+         firstChunkOffset <= (startOffset - checksum.getBytesPerChecksum())) {
+       throw new IOException("BlockReader: error in first chunk offset (" +
+                             firstChunkOffset + ") startOffset is " + 
+                             startOffset + " for file " + file);
+     }
+ 
+     return new RemoteBlockReader(file, block.getBlockPoolId(), block.getBlockId(),
+         in, checksum, verifyChecksum, startOffset, firstChunkOffset, len,
+         peer, datanodeID, peerCache);
+   }
+ 
+   @Override
+   public synchronized void close() throws IOException {
+     startOffset = -1;
+     checksum = null;
+     if (peerCache != null & sentStatusCode) {
+       peerCache.put(datanodeID, peer);
+     } else {
+       peer.close();
+     }
+ 
+     // in will be closed when its Socket is closed.
+   }
+   
+   @Override
+   public void readFully(byte[] buf, int readOffset, int amtToRead)
+       throws IOException {
+     IOUtils.readFully(this, buf, readOffset, amtToRead);
+   }
+ 
+   @Override
+   public int readAll(byte[] buf, int offset, int len) throws IOException {
+     return readFully(this, buf, offset, len);
+   }
+ 
+   /**
+    * When the reader reaches end of the read, it sends a status response
+    * (e.g. CHECKSUM_OK) to the DN. Failure to do so could lead to the DN
+    * closing our connection (which we will re-open), but won't affect
+    * data correctness.
+    */
+   void sendReadResult(Peer peer, Status statusCode) {
+     assert !sentStatusCode : "already sent status code to " + peer;
+     try {
+       RemoteBlockReader2.writeReadResult(peer.getOutputStream(), statusCode);
+       sentStatusCode = true;
+     } catch (IOException e) {
+       // It's ok not to be able to send this. But something is probably wrong.
+       LOG.info("Could not send read status (" + statusCode + ") to datanode " +
+                peer.getRemoteAddressString() + ": " + e.getMessage());
+     }
+   }
+ 
+   @Override
+   public int read(ByteBuffer buf) throws IOException {
+     throw new UnsupportedOperationException("readDirect unsupported in RemoteBlockReader");
+   }
+   
+   @Override
+   public int available() throws IOException {
+     // An optimistic estimate of how much data is available
+     // to us without doing network I/O.
+     return RemoteBlockReader2.TCP_WINDOW_SIZE;
+   }
+ 
+   @Override
+   public boolean isLocal() {
+     return isLocal;
+   }
+   
+   @Override
+   public boolean isShortCircuit() {
+     return false;
+   }
+ 
+   @Override
+   public ClientMmap getClientMmap(EnumSet<ReadOption> opts) {
+     return null;
+   }
++
++  @Override
++  public DataChecksum getDataChecksum() {
++    return checksum;
++  }
+ }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/53358fe6/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java
index 0000000,5541e6d..9699442
mode 000000,100644..100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java
@@@ -1,0 -1,480 +1,485 @@@
+ /**
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.hadoop.hdfs;
+ 
+ import java.io.BufferedOutputStream;
+ import java.io.DataInputStream;
+ import java.io.DataOutputStream;
+ import java.io.IOException;
+ import java.io.OutputStream;
+ import java.net.InetSocketAddress;
+ import java.nio.ByteBuffer;
+ import java.nio.channels.ReadableByteChannel;
+ import java.util.EnumSet;
+ import java.util.UUID;
+ 
+ import org.apache.hadoop.classification.InterfaceAudience;
+ import org.apache.hadoop.fs.ReadOption;
+ import org.apache.hadoop.hdfs.net.Peer;
+ import org.apache.hadoop.hdfs.protocol.DatanodeID;
+ import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+ import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
+ import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
+ import org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver;
+ import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
+ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
+ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientReadStatusProto;
+ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReadOpChecksumInfoProto;
+ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
+ import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
+ import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+ import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
+ import org.apache.hadoop.hdfs.shortcircuit.ClientMmap;
+ import org.apache.hadoop.net.NetUtils;
+ import org.apache.hadoop.security.token.Token;
+ import org.apache.hadoop.util.DataChecksum;
+ import org.apache.htrace.Sampler;
+ import org.apache.htrace.Trace;
+ import org.apache.htrace.TraceScope;
+ 
+ import com.google.common.annotations.VisibleForTesting;
+ 
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+ 
+ /**
+  * This is a wrapper around connection to datanode
+  * and understands checksum, offset etc.
+  *
+  * Terminology:
+  * <dl>
+  * <dt>block</dt>
+  *   <dd>The hdfs block, typically large (~64MB).
+  *   </dd>
+  * <dt>chunk</dt>
+  *   <dd>A block is divided into chunks, each comes with a checksum.
+  *       We want transfers to be chunk-aligned, to be able to
+  *       verify checksums.
+  *   </dd>
+  * <dt>packet</dt>
+  *   <dd>A grouping of chunks used for transport. It contains a
+  *       header, followed by checksum data, followed by real data.
+  *   </dd>
+  * </dl>
+  * Please see DataNode for the RPC specification.
+  *
+  * This is a new implementation introduced in Hadoop 0.23 which
+  * is more efficient and simpler than the older BlockReader
+  * implementation. It should be renamed to RemoteBlockReader
+  * once we are confident in it.
+  */
+ @InterfaceAudience.Private
+ public class RemoteBlockReader2  implements BlockReader {
+ 
+   static final Logger LOG = LoggerFactory.getLogger(RemoteBlockReader2.class);
+   static final int TCP_WINDOW_SIZE = 128 * 1024; // 128 KB;
+ 
+   final private Peer peer;
+   final private DatanodeID datanodeID;
+   final private PeerCache peerCache;
+   final private long blockId;
+   private final ReadableByteChannel in;
+ 
+   private DataChecksum checksum;
+   private final PacketReceiver packetReceiver = new PacketReceiver(true);
+ 
+   private ByteBuffer curDataSlice = null;
+ 
+   /** offset in block of the last chunk received */
+   private long lastSeqNo = -1;
+ 
+   /** offset in block where reader wants to actually read */
+   private long startOffset;
+   private final String filename;
+ 
+   private final int bytesPerChecksum;
+   private final int checksumSize;
+ 
+   /**
+    * The total number of bytes we need to transfer from the DN.
+    * This is the amount that the user has requested plus some padding
+    * at the beginning so that the read can begin on a chunk boundary.
+    */
+   private long bytesNeededToFinish;
+ 
+   /**
+    * True if we are reading from a local DataNode.
+    */
+   private final boolean isLocal;
+ 
+   private final boolean verifyChecksum;
+ 
+   private boolean sentStatusCode = false;
+ 
+   @VisibleForTesting
+   public Peer getPeer() {
+     return peer;
+   }
+   
+   @Override
+   public synchronized int read(byte[] buf, int off, int len) 
+                                throws IOException {
+ 
+     UUID randomId = null;
+     if (LOG.isTraceEnabled()) {
+       randomId = UUID.randomUUID();
+       LOG.trace(String.format("Starting read #%s file %s from datanode %s",
+         randomId.toString(), this.filename,
+         this.datanodeID.getHostName()));
+     }
+ 
+     if (curDataSlice == null || curDataSlice.remaining() == 0 && bytesNeededToFinish > 0) {
+       TraceScope scope = Trace.startSpan(
+           "RemoteBlockReader2#readNextPacket(" + blockId + ")", Sampler.NEVER);
+       try {
+         readNextPacket();
+       } finally {
+         scope.close();
+       }
+     }
+ 
+     if (LOG.isTraceEnabled()) {
+       LOG.trace(String.format("Finishing read #" + randomId));
+     }
+ 
+     if (curDataSlice.remaining() == 0) {
+       // we're at EOF now
+       return -1;
+     }
+     
+     int nRead = Math.min(curDataSlice.remaining(), len);
+     curDataSlice.get(buf, off, nRead);
+     
+     return nRead;
+   }
+ 
+ 
+   @Override
+   public synchronized int read(ByteBuffer buf) throws IOException {
+     if (curDataSlice == null || curDataSlice.remaining() == 0 && bytesNeededToFinish > 0) {
+       TraceScope scope = Trace.startSpan(
+           "RemoteBlockReader2#readNextPacket(" + blockId + ")", Sampler.NEVER);
+       try {
+         readNextPacket();
+       } finally {
+         scope.close();
+       }
+     }
+     if (curDataSlice.remaining() == 0) {
+       // we're at EOF now
+       return -1;
+     }
+ 
+     int nRead = Math.min(curDataSlice.remaining(), buf.remaining());
+     ByteBuffer writeSlice = curDataSlice.duplicate();
+     writeSlice.limit(writeSlice.position() + nRead);
+     buf.put(writeSlice);
+     curDataSlice.position(writeSlice.position());
+ 
+     return nRead;
+   }
+ 
+   private void readNextPacket() throws IOException {
+     //Read packet headers.
+     packetReceiver.receiveNextPacket(in);
+ 
+     PacketHeader curHeader = packetReceiver.getHeader();
+     curDataSlice = packetReceiver.getDataSlice();
+     assert curDataSlice.capacity() == curHeader.getDataLen();
+     
+     if (LOG.isTraceEnabled()) {
+       LOG.trace("DFSClient readNextPacket got header " + curHeader);
+     }
+ 
+     // Sanity check the lengths
+     if (!curHeader.sanityCheck(lastSeqNo)) {
+          throw new IOException("BlockReader: error in packet header " +
+                                curHeader);
+     }
+     
+     if (curHeader.getDataLen() > 0) {
+       int chunks = 1 + (curHeader.getDataLen() - 1) / bytesPerChecksum;
+       int checksumsLen = chunks * checksumSize;
+ 
+       assert packetReceiver.getChecksumSlice().capacity() == checksumsLen :
+         "checksum slice capacity=" + packetReceiver.getChecksumSlice().capacity() + 
+           " checksumsLen=" + checksumsLen;
+       
+       lastSeqNo = curHeader.getSeqno();
+       if (verifyChecksum && curDataSlice.remaining() > 0) {
+         // N.B.: the checksum error offset reported here is actually
+         // relative to the start of the block, not the start of the file.
+         // This is slightly misleading, but preserves the behavior from
+         // the older BlockReader.
+         checksum.verifyChunkedSums(curDataSlice,
+             packetReceiver.getChecksumSlice(),
+             filename, curHeader.getOffsetInBlock());
+       }
+       bytesNeededToFinish -= curHeader.getDataLen();
+     }    
+     
+     // First packet will include some data prior to the first byte
+     // the user requested. Skip it.
+     if (curHeader.getOffsetInBlock() < startOffset) {
+       int newPos = (int) (startOffset - curHeader.getOffsetInBlock());
+       curDataSlice.position(newPos);
+     }
+ 
+     // If we've now satisfied the whole client read, read one last packet
+     // header, which should be empty
+     if (bytesNeededToFinish <= 0) {
+       readTrailingEmptyPacket();
+       if (verifyChecksum) {
+         sendReadResult(Status.CHECKSUM_OK);
+       } else {
+         sendReadResult(Status.SUCCESS);
+       }
+     }
+   }
+   
+   @Override
+   public synchronized long skip(long n) throws IOException {
+     /* How can we make sure we don't throw a ChecksumException, at least
+      * in majority of the cases?. This one throws. */
+     long skipped = 0;
+     while (skipped < n) {
+       long needToSkip = n - skipped;
+       if (curDataSlice == null || curDataSlice.remaining() == 0 && bytesNeededToFinish > 0) {
+         readNextPacket();
+       }
+       if (curDataSlice.remaining() == 0) {
+         // we're at EOF now
+         break;
+       }
+ 
+       int skip = (int)Math.min(curDataSlice.remaining(), needToSkip);
+       curDataSlice.position(curDataSlice.position() + skip);
+       skipped += skip;
+     }
+     return skipped;
+   }
+ 
+   private void readTrailingEmptyPacket() throws IOException {
+     if (LOG.isTraceEnabled()) {
+       LOG.trace("Reading empty packet at end of read");
+     }
+     
+     packetReceiver.receiveNextPacket(in);
+ 
+     PacketHeader trailer = packetReceiver.getHeader();
+     if (!trailer.isLastPacketInBlock() ||
+        trailer.getDataLen() != 0) {
+       throw new IOException("Expected empty end-of-read packet! Header: " +
+                             trailer);
+     }
+   }
+ 
+   protected RemoteBlockReader2(String file, String bpid, long blockId,
+       DataChecksum checksum, boolean verifyChecksum,
+       long startOffset, long firstChunkOffset, long bytesToRead, Peer peer,
+       DatanodeID datanodeID, PeerCache peerCache) {
+     this.isLocal = DFSUtilClient.isLocalAddress(NetUtils.
+         createSocketAddr(datanodeID.getXferAddr()));
+     // Path is used only for printing block and file information in debug
+     this.peer = peer;
+     this.datanodeID = datanodeID;
+     this.in = peer.getInputStreamChannel();
+     this.checksum = checksum;
+     this.verifyChecksum = verifyChecksum;
+     this.startOffset = Math.max( startOffset, 0 );
+     this.filename = file;
+     this.peerCache = peerCache;
+     this.blockId = blockId;
+ 
+     // The total number of bytes that we need to transfer from the DN is
+     // the amount that the user wants (bytesToRead), plus the padding at
+     // the beginning in order to chunk-align. Note that the DN may elect
+     // to send more than this amount if the read starts/ends mid-chunk.
+     this.bytesNeededToFinish = bytesToRead + (startOffset - firstChunkOffset);
+     bytesPerChecksum = this.checksum.getBytesPerChecksum();
+     checksumSize = this.checksum.getChecksumSize();
+   }
+ 
+ 
+   @Override
+   public synchronized void close() throws IOException {
+     packetReceiver.close();
+     startOffset = -1;
+     checksum = null;
+     if (peerCache != null && sentStatusCode) {
+       peerCache.put(datanodeID, peer);
+     } else {
+       peer.close();
+     }
+ 
+     // in will be closed when its Socket is closed.
+   }
+   
+   /**
+    * When the reader reaches end of the read, it sends a status response
+    * (e.g. CHECKSUM_OK) to the DN. Failure to do so could lead to the DN
+    * closing our connection (which we will re-open), but won't affect
+    * data correctness.
+    */
+   void sendReadResult(Status statusCode) {
+     assert !sentStatusCode : "already sent status code to " + peer;
+     try {
+       writeReadResult(peer.getOutputStream(), statusCode);
+       sentStatusCode = true;
+     } catch (IOException e) {
+       // It's ok not to be able to send this. But something is probably wrong.
+       LOG.info("Could not send read status (" + statusCode + ") to datanode " +
+                peer.getRemoteAddressString() + ": " + e.getMessage());
+     }
+   }
+ 
+   /**
+    * Serialize the actual read result on the wire.
+    */
+   static void writeReadResult(OutputStream out, Status statusCode)
+       throws IOException {
+     
+     ClientReadStatusProto.newBuilder()
+       .setStatus(statusCode)
+       .build()
+       .writeDelimitedTo(out);
+ 
+     out.flush();
+   }
+   
+   /**
+    * File name to print when accessing a block directly (from servlets)
+    * @param s Address of the block location
+    * @param poolId Block pool ID of the block
+    * @param blockId Block ID of the block
+    * @return string that has a file name for debug purposes
+    */
+   public static String getFileName(final InetSocketAddress s,
+       final String poolId, final long blockId) {
+     return s.toString() + ":" + poolId + ":" + blockId;
+   }
+ 
+   @Override
+   public int readAll(byte[] buf, int offset, int len) throws IOException {
+     return BlockReaderUtil.readAll(this, buf, offset, len);
+   }
+ 
+   @Override
+   public void readFully(byte[] buf, int off, int len) throws IOException {
+     BlockReaderUtil.readFully(this, buf, off, len);
+   }
+   
+   /**
+    * Create a new BlockReader specifically to satisfy a read.
+    * This method also sends the OP_READ_BLOCK request.
+    *
+    * @param file  File location
+    * @param block  The block object
+    * @param blockToken  The block token for security
+    * @param startOffset  The read offset, relative to block head
+    * @param len  The number of bytes to read
+    * @param verifyChecksum  Whether to verify checksum
+    * @param clientName  Client name
+    * @param peer  The Peer to use
+    * @param datanodeID  The DatanodeID this peer is connected to
+    * @return New BlockReader instance, or null on error.
+    */
+   public static BlockReader newBlockReader(String file,
+                                      ExtendedBlock block,
+                                      Token<BlockTokenIdentifier> blockToken,
+                                      long startOffset, long len,
+                                      boolean verifyChecksum,
+                                      String clientName,
+                                      Peer peer, DatanodeID datanodeID,
+                                      PeerCache peerCache,
+                                      CachingStrategy cachingStrategy) throws IOException {
+     // in and out will be closed when sock is closed (by the caller)
+     final DataOutputStream out = new DataOutputStream(new BufferedOutputStream(
+           peer.getOutputStream()));
+     new Sender(out).readBlock(block, blockToken, clientName, startOffset, len,
+         verifyChecksum, cachingStrategy);
+ 
+     //
+     // Get bytes in block
+     //
+     DataInputStream in = new DataInputStream(peer.getInputStream());
+ 
+     BlockOpResponseProto status = BlockOpResponseProto.parseFrom(
+         PBHelperClient.vintPrefixed(in));
+     checkSuccess(status, peer, block, file);
+     ReadOpChecksumInfoProto checksumInfo =
+       status.getReadOpChecksumInfo();
+     DataChecksum checksum = DataTransferProtoUtil.fromProto(
+         checksumInfo.getChecksum());
+     //Warning when we get CHECKSUM_NULL?
+ 
+     // Read the first chunk offset.
+     long firstChunkOffset = checksumInfo.getChunkOffset();
+ 
+     if ( firstChunkOffset < 0 || firstChunkOffset > startOffset ||
+         firstChunkOffset <= (startOffset - checksum.getBytesPerChecksum())) {
+       throw new IOException("BlockReader: error in first chunk offset (" +
+                             firstChunkOffset + ") startOffset is " +
+                             startOffset + " for file " + file);
+     }
+ 
+     return new RemoteBlockReader2(file, block.getBlockPoolId(), block.getBlockId(),
+         checksum, verifyChecksum, startOffset, firstChunkOffset, len, peer,
+         datanodeID, peerCache);
+   }
+ 
+   static void checkSuccess(
+       BlockOpResponseProto status, Peer peer,
+       ExtendedBlock block, String file)
+       throws IOException {
+     String logInfo = "for OP_READ_BLOCK"
+       + ", self=" + peer.getLocalAddressString()
+       + ", remote=" + peer.getRemoteAddressString()
+       + ", for file " + file
+       + ", for pool " + block.getBlockPoolId()
+       + " block " + block.getBlockId() + "_" + block.getGenerationStamp();
+     DataTransferProtoUtil.checkBlockOpStatus(status, logInfo);
+   }
+   
+   @Override
+   public int available() throws IOException {
+     // An optimistic estimate of how much data is available
+     // to us without doing network I/O.
+     return TCP_WINDOW_SIZE;
+   }
+   
+   @Override
+   public boolean isLocal() {
+     return isLocal;
+   }
+   
+   @Override
+   public boolean isShortCircuit() {
+     return false;
+   }
+ 
+   @Override
+   public ClientMmap getClientMmap(EnumSet<ReadOption> opts) {
+     return null;
+   }
++
++  @Override
++  public DataChecksum getDataChecksum() {
++    return checksum;
++  }
+ }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/53358fe6/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hadoop/blob/53358fe6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hadoop/blob/53358fe6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hadoop/blob/53358fe6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hadoop/blob/53358fe6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hadoop/blob/53358fe6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java
index 36da863,9f26ca3..97445a6
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java
@@@ -961,8 -953,8 +961,8 @@@ public class ClientNamenodeProtocolServ
        RpcController controller, UpdateBlockForPipelineRequestProto req)
        throws ServiceException {
      try {
 -      LocatedBlockProto result = PBHelper.convert(server
 -          .updateBlockForPipeline(PBHelperClient.convert(req.getBlock()),
 +      LocatedBlockProto result = PBHelper.convertLocatedBlock(
-           server.updateBlockForPipeline(PBHelper.convert(req.getBlock()),
++          server.updateBlockForPipeline(PBHelperClient.convert(req.getBlock()),
                req.getClientName()));
        return UpdateBlockForPipelineResponseProto.newBuilder().setBlock(result)
            .build();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/53358fe6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hadoop/blob/53358fe6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
index f292ee8,6f16d83..f419c46
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
@@@ -23,18 -23,11 +23,16 @@@ import static org.apache.hadoop.hdfs.pr
  import static org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.CipherSuiteProto;
  import static org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.CryptoProtocolVersionProto;
  
- import java.io.EOFException;
  import java.io.IOException;
- import java.io.InputStream;
  import java.util.ArrayList;
  import java.util.Arrays;
 +import java.util.Collection;
  import java.util.EnumSet;
 +import java.util.HashMap;
  import java.util.List;
 +import java.util.Map;
 +import java.util.Map.Entry;
 +import java.util.Set;
  
  import org.apache.hadoop.fs.CacheFlag;
  import org.apache.hadoop.fs.ContentSummary;
@@@ -134,13 -122,9 +131,14 @@@ import org.apache.hadoop.hdfs.protocol.
  import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.RegisterCommandProto;
  import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.VolumeFailureSummaryProto;
  import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockReportContextProto;
 -import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos;
 +import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.BlockECRecoveryInfoProto;
 +import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.ErasureCodingZoneProto;
 +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ECSchemaOptionEntryProto;
 +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ECSchemaProto;
 +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ErasureCodingPolicyProto;
  import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockKeyProto;
  import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockProto;
++import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ExtendedBlockProto;
  import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockStoragePolicyProto;
  import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockWithLocationsProto;
  import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlocksWithLocationsProto;
@@@ -233,15 -214,11 +230,12 @@@ import org.apache.hadoop.hdfs.server.pr
  import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
  import org.apache.hadoop.hdfs.server.protocol.StorageReport;
  import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
- import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.ShmId;
  import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.SlotId;
- import org.apache.hadoop.hdfs.util.ExactSizeInputStream;
  import org.apache.hadoop.io.EnumSetWritable;
  import org.apache.hadoop.io.Text;
 +import org.apache.hadoop.io.erasurecode.ECSchema;
  import org.apache.hadoop.security.proto.SecurityProtos.TokenProto;
  import org.apache.hadoop.security.token.Token;
- import org.apache.hadoop.util.DataChecksum;
  
  import com.google.common.base.Preconditions;
  import com.google.common.collect.Lists;
@@@ -784,23 -726,9 +771,23 @@@ public class PBHelper 
        }
      }
  
 -    LocatedBlock lb = new LocatedBlock(PBHelperClient.convert(proto.getB()), targets,
 -        storageIDs, storageTypes, proto.getOffset(), proto.getCorrupt(),
 -        cachedLocs.toArray(new DatanodeInfo[0]));
 +    final LocatedBlock lb;
 +    if (indices == null) {
-       lb = new LocatedBlock(PBHelper.convert(proto.getB()), targets, storageIDs,
-           storageTypes, proto.getOffset(), proto.getCorrupt(),
++      lb = new LocatedBlock(PBHelperClient.convert(proto.getB()), targets,
++          storageIDs, storageTypes, proto.getOffset(), proto.getCorrupt(),
 +          cachedLocs.toArray(new DatanodeInfo[cachedLocs.size()]));
 +    } else {
-       lb = new LocatedStripedBlock(PBHelper.convert(proto.getB()), targets,
++      lb = new LocatedStripedBlock(PBHelperClient.convert(proto.getB()), targets,
 +          storageIDs, storageTypes, indices, proto.getOffset(),
 +          proto.getCorrupt(),
 +          cachedLocs.toArray(new DatanodeInfo[cachedLocs.size()]));
 +      List<TokenProto> tokenProtos = proto.getBlockTokensList();
 +      Token<BlockTokenIdentifier>[] blockTokens = new Token[indices.length];
 +      for (int i = 0; i < indices.length; i++) {
 +        blockTokens[i] = PBHelper.convert(tokenProtos.get(i));
 +      }
 +      ((LocatedStripedBlock) lb).setBlockTokens(blockTokens);
 +    }
      lb.setBlockToken(PBHelper.convert(proto.getBlockToken()));
  
      return lb;
@@@ -2954,192 -2860,4 +2935,192 @@@
          setLeaseId(context.getLeaseId()).
          build();
    }
 +
 +  public static ECSchema convertECSchema(ECSchemaProto schema) {
 +    List<ECSchemaOptionEntryProto> optionsList = schema.getOptionsList();
 +    Map<String, String> options = new HashMap<>(optionsList.size());
 +    for (ECSchemaOptionEntryProto option : optionsList) {
 +      options.put(option.getKey(), option.getValue());
 +    }
 +    return new ECSchema(schema.getCodecName(), schema.getDataUnits(),
 +        schema.getParityUnits(), options);
 +  }
 +
 +  public static ECSchemaProto convertECSchema(ECSchema schema) {
 +    ECSchemaProto.Builder builder = ECSchemaProto.newBuilder()
 +        .setCodecName(schema.getCodecName())
 +        .setDataUnits(schema.getNumDataUnits())
 +        .setParityUnits(schema.getNumParityUnits());
 +    Set<Entry<String, String>> entrySet = schema.getExtraOptions().entrySet();
 +    for (Entry<String, String> entry : entrySet) {
 +      builder.addOptions(ECSchemaOptionEntryProto.newBuilder()
 +          .setKey(entry.getKey()).setValue(entry.getValue()).build());
 +    }
 +    return builder.build();
 +  }
 +
 +  public static ErasureCodingPolicy convertErasureCodingPolicy(
 +      ErasureCodingPolicyProto policy) {
 +    return new ErasureCodingPolicy(policy.getName(),
 +        convertECSchema(policy.getSchema()),
 +        policy.getCellSize());
 +  }
 +
 +  public static ErasureCodingPolicyProto convertErasureCodingPolicy(
 +      ErasureCodingPolicy policy) {
 +    ErasureCodingPolicyProto.Builder builder = ErasureCodingPolicyProto
 +        .newBuilder()
 +        .setName(policy.getName())
 +        .setSchema(convertECSchema(policy.getSchema()))
 +        .setCellSize(policy.getCellSize());
 +    return builder.build();
 +  }
 +
 +  public static ErasureCodingZoneProto convertErasureCodingZone(
 +      ErasureCodingZone ecZone) {
 +    return ErasureCodingZoneProto.newBuilder().setDir(ecZone.getDir())
 +        .setEcPolicy(convertErasureCodingPolicy(ecZone.getErasureCodingPolicy()))
 +        .build();
 +  }
 +
 +  public static ErasureCodingZone convertErasureCodingZone(
 +      ErasureCodingZoneProto ecZoneProto) {
 +    return new ErasureCodingZone(ecZoneProto.getDir(),
 +        convertErasureCodingPolicy(ecZoneProto.getEcPolicy()));
 +  }
 +  
 +  public static BlockECRecoveryInfo convertBlockECRecoveryInfo(
 +      BlockECRecoveryInfoProto blockEcRecoveryInfoProto) {
 +    ExtendedBlockProto blockProto = blockEcRecoveryInfoProto.getBlock();
-     ExtendedBlock block = convert(blockProto);
++    ExtendedBlock block = PBHelperClient.convert(blockProto);
 +
 +    DatanodeInfosProto sourceDnInfosProto = blockEcRecoveryInfoProto
 +        .getSourceDnInfos();
 +    DatanodeInfo[] sourceDnInfos = convert(sourceDnInfosProto);
 +
 +    DatanodeInfosProto targetDnInfosProto = blockEcRecoveryInfoProto
 +        .getTargetDnInfos();
 +    DatanodeInfo[] targetDnInfos = convert(targetDnInfosProto);
 +
 +    StorageUuidsProto targetStorageUuidsProto = blockEcRecoveryInfoProto
 +        .getTargetStorageUuids();
 +    String[] targetStorageUuids = convert(targetStorageUuidsProto);
 +
 +    StorageTypesProto targetStorageTypesProto = blockEcRecoveryInfoProto
 +        .getTargetStorageTypes();
 +    StorageType[] convertStorageTypes = convertStorageTypes(
 +        targetStorageTypesProto.getStorageTypesList(), targetStorageTypesProto
 +            .getStorageTypesList().size());
 +
 +    List<Integer> liveBlockIndicesList = blockEcRecoveryInfoProto
 +        .getLiveBlockIndicesList();
 +    short[] liveBlkIndices = new short[liveBlockIndicesList.size()];
 +    for (int i = 0; i < liveBlockIndicesList.size(); i++) {
 +      liveBlkIndices[i] = liveBlockIndicesList.get(i).shortValue();
 +    }
 +
 +    ErasureCodingPolicy ecPolicy =
 +        convertErasureCodingPolicy(blockEcRecoveryInfoProto.getEcPolicy());
 +
 +    return new BlockECRecoveryInfo(block, sourceDnInfos, targetDnInfos,
 +        targetStorageUuids, convertStorageTypes, liveBlkIndices, ecPolicy);
 +  }
 +
 +  public static BlockECRecoveryInfoProto convertBlockECRecoveryInfo(
 +      BlockECRecoveryInfo blockEcRecoveryInfo) {
 +    BlockECRecoveryInfoProto.Builder builder = BlockECRecoveryInfoProto
 +        .newBuilder();
 +    builder.setBlock(PBHelperClient.convert(
 +        blockEcRecoveryInfo.getExtendedBlock()));
 +
 +    DatanodeInfo[] sourceDnInfos = blockEcRecoveryInfo.getSourceDnInfos();
 +    builder.setSourceDnInfos(convertToDnInfosProto(sourceDnInfos));
 +
 +    DatanodeInfo[] targetDnInfos = blockEcRecoveryInfo.getTargetDnInfos();
 +    builder.setTargetDnInfos(convertToDnInfosProto(targetDnInfos));
 +
 +    String[] targetStorageIDs = blockEcRecoveryInfo.getTargetStorageIDs();
 +    builder.setTargetStorageUuids(convertStorageIDs(targetStorageIDs));
 +
 +    StorageType[] targetStorageTypes = blockEcRecoveryInfo
 +        .getTargetStorageTypes();
 +    builder.setTargetStorageTypes(convertStorageTypesProto(targetStorageTypes));
 +
 +    short[] liveBlockIndices = blockEcRecoveryInfo.getLiveBlockIndices();
 +    builder.addAllLiveBlockIndices(convertIntArray(liveBlockIndices));
 +
 +    builder.setEcPolicy(convertErasureCodingPolicy(blockEcRecoveryInfo
 +        .getErasureCodingPolicy()));
 +
 +    return builder.build();
 +  }
 +
 +  private static List<Integer> convertIntArray(short[] liveBlockIndices) {
 +    List<Integer> liveBlockIndicesList = new ArrayList<Integer>();
 +    for (short s : liveBlockIndices) {
 +      liveBlockIndicesList.add((int) s);
 +    }
 +    return liveBlockIndicesList;
 +  }
 +
 +  private static StorageTypesProto convertStorageTypesProto(
 +      StorageType[] targetStorageTypes) {
 +    StorageTypesProto.Builder builder = StorageTypesProto.newBuilder();
 +    for (StorageType storageType : targetStorageTypes) {
 +      builder.addStorageTypes(PBHelperClient.convertStorageType(storageType));
 +    }
 +    return builder.build();
 +  }
 +
 +  private static StorageUuidsProto convertStorageIDs(String[] targetStorageIDs) {
 +    StorageUuidsProto.Builder builder = StorageUuidsProto.newBuilder();
 +    for (String storageUuid : targetStorageIDs) {
 +      builder.addStorageUuids(storageUuid);
 +    }
 +    return builder.build();
 +  }
 +
 +  private static DatanodeInfosProto convertToDnInfosProto(DatanodeInfo[] dnInfos) {
 +    DatanodeInfosProto.Builder builder = DatanodeInfosProto.newBuilder();
 +    for (DatanodeInfo datanodeInfo : dnInfos) {
 +      builder.addDatanodes(PBHelperClient.convert(datanodeInfo));
 +    }
 +    return builder.build();
 +  }
 +
 +  private static String[] convert(StorageUuidsProto targetStorageUuidsProto) {
 +    List<String> storageUuidsList = targetStorageUuidsProto
 +        .getStorageUuidsList();
 +    String[] storageUuids = new String[storageUuidsList.size()];
 +    for (int i = 0; i < storageUuidsList.size(); i++) {
 +      storageUuids[i] = storageUuidsList.get(i);
 +    }
 +    return storageUuids;
 +  }
 +  
 +  public static BlockECRecoveryCommandProto convert(
 +      BlockECRecoveryCommand blkECRecoveryCmd) {
 +    BlockECRecoveryCommandProto.Builder builder = BlockECRecoveryCommandProto
 +        .newBuilder();
 +    Collection<BlockECRecoveryInfo> blockECRecoveryInfos = blkECRecoveryCmd
 +        .getECTasks();
 +    for (BlockECRecoveryInfo blkECRecoveryInfo : blockECRecoveryInfos) {
 +      builder
 +          .addBlockECRecoveryinfo(convertBlockECRecoveryInfo(blkECRecoveryInfo));
 +    }
 +    return builder.build();
 +  }
 +  
 +  public static BlockECRecoveryCommand convert(
 +      BlockECRecoveryCommandProto blkECRecoveryCmdProto) {
 +    Collection<BlockECRecoveryInfo> blkECRecoveryInfos = new ArrayList<BlockECRecoveryInfo>();
 +    List<BlockECRecoveryInfoProto> blockECRecoveryinfoList = blkECRecoveryCmdProto
 +        .getBlockECRecoveryinfoList();
 +    for (BlockECRecoveryInfoProto blockECRecoveryInfoProto : blockECRecoveryinfoList) {
 +      blkECRecoveryInfos
 +          .add(convertBlockECRecoveryInfo(blockECRecoveryInfoProto));
 +    }
 +    return new BlockECRecoveryCommand(DatanodeProtocol.DNA_ERASURE_CODING_RECOVERY,
 +        blkECRecoveryInfos);
 +  }
  }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/53358fe6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java
index dc296ac,810784d..92a1135
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java
@@@ -30,11 -31,11 +31,12 @@@ import org.apache.hadoop.util.LightWeig
  import static org.apache.hadoop.hdfs.server.namenode.INodeId.INVALID_INODE_ID;
  
  /**
 - * BlockInfo class maintains for a given block
 - * the {@link BlockCollection} it is part of and datanodes where the replicas of
 - * the block are stored.
 + * For a given block (or an erasure coding block group), BlockInfo class
 + * maintains 1) the {@link BlockCollection} it is part of, and 2) datanodes
 + * where the replicas of the block, or blocks belonging to the erasure coding
 + * block group, are stored.
   */
+ @InterfaceAudience.Private
  public abstract class BlockInfo extends Block
      implements LightWeightGSet.LinkedElement {
  
@@@ -203,17 -206,6 +205,11 @@@
     */
    abstract boolean removeStorage(DatanodeStorageInfo storage);
  
-   /**
-    * Replace the current BlockInfo with the new one in corresponding
-    * DatanodeStorageInfo's linked list
-    */
-   abstract void replaceBlock(BlockInfo newBlock);
- 
 +  public abstract boolean isStriped();
 +
 +  /** @return true if there is no datanode storage associated with the block */
 +  abstract boolean hasNoStorage();
 +
    /**
     * Find specified DatanodeStorageInfo.
     * @return DatanodeStorageInfo or null if not found.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/53358fe6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguous.java
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguous.java
index b9d8486,94fb222..746e298
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguous.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguous.java
@@@ -95,29 -104,4 +95,14 @@@ public class BlockInfoContiguous extend
      }
      return 0;
    }
 +
 +  @Override
-   void replaceBlock(BlockInfo newBlock) {
-     assert newBlock instanceof BlockInfoContiguous;
-     for (int i = this.numNodes() - 1; i >= 0; i--) {
-       final DatanodeStorageInfo storage = this.getStorageInfo(i);
-       final boolean removed = storage.removeBlock(this);
-       assert removed : "currentBlock not found.";
- 
-       final DatanodeStorageInfo.AddBlockResult result = storage.addBlock(
-           newBlock, newBlock);
-       assert result == DatanodeStorageInfo.AddBlockResult.ADDED :
-           "newBlock already exists.";
-     }
-   }
- 
-   @Override
 +  public final boolean isStriped() {
 +    return false;
 +  }
 +
 +  @Override
 +  final boolean hasNoStorage() {
 +    return getStorageInfo(0) == null;
 +  }
  }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/53358fe6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStriped.java
----------------------------------------------------------------------
diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStriped.java
index 7b21cbe,0000000..df48655
mode 100644,000000..100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStriped.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStriped.java
@@@ -1,253 -1,0 +1,234 @@@
 +/**
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.hadoop.hdfs.server.blockmanagement;
 +
 +import org.apache.hadoop.hdfs.protocol.Block;
 +import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
 +import org.apache.hadoop.hdfs.util.StripedBlockUtil;
 +import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
 +
 +import static org.apache.hadoop.hdfs.protocol.HdfsConstants.BLOCK_STRIPED_CELL_SIZE;
 +
 +/**
 + * Subclass of {@link BlockInfo}, presenting a block group in erasure coding.
 + *
 + * We still use triplets to store DatanodeStorageInfo for each block in the
 + * block group, as well as the previous/next block in the corresponding
 + * DatanodeStorageInfo. For a (m+k) block group, the first (m+k) triplet units
 + * are sorted and strictly mapped to the corresponding block.
 + *
 + * Normally each block belonging to group is stored in only one DataNode.
 + * However, it is possible that some block is over-replicated. Thus the triplet
 + * array's size can be larger than (m+k). Thus currently we use an extra byte
 + * array to record the block index for each triplet.
 + */
 +public class BlockInfoStriped extends BlockInfo {
 +  private final ErasureCodingPolicy ecPolicy;
 +  /**
 +   * Always the same size with triplets. Record the block index for each triplet
 +   * TODO: actually this is only necessary for over-replicated block. Thus can
 +   * be further optimized to save memory usage.
 +   */
 +  private byte[] indices;
 +
 +  public BlockInfoStriped(Block blk, ErasureCodingPolicy ecPolicy) {
 +    super(blk, (short) (ecPolicy.getNumDataUnits() + ecPolicy.getNumParityUnits()));
 +    indices = new byte[ecPolicy.getNumDataUnits() + ecPolicy.getNumParityUnits()];
 +    initIndices();
 +    this.ecPolicy = ecPolicy;
 +  }
 +
 +  public short getTotalBlockNum() {
 +    return (short) (ecPolicy.getNumDataUnits() + ecPolicy.getNumParityUnits());
 +  }
 +
 +  public short getDataBlockNum() {
 +    return (short) ecPolicy.getNumDataUnits();
 +  }
 +
 +  public short getParityBlockNum() {
 +    return (short) ecPolicy.getNumParityUnits();
 +  }
 +
 +  /**
 +   * If the block is committed/completed and its length is less than a full
 +   * stripe, it returns the the number of actual data blocks.
 +   * Otherwise it returns the number of data units specified by erasure coding policy.
 +   */
 +  public short getRealDataBlockNum() {
 +    if (isComplete() || getBlockUCState() == BlockUCState.COMMITTED) {
 +      return (short) Math.min(getDataBlockNum(),
 +          (getNumBytes() - 1) / BLOCK_STRIPED_CELL_SIZE + 1);
 +    } else {
 +      return getDataBlockNum();
 +    }
 +  }
 +
 +  public short getRealTotalBlockNum() {
 +    return (short) (getRealDataBlockNum() + getParityBlockNum());
 +  }
 +
 +  public ErasureCodingPolicy getErasureCodingPolicy() {
 +    return ecPolicy;
 +  }
 +
 +  private void initIndices() {
 +    for (int i = 0; i < indices.length; i++) {
 +      indices[i] = -1;
 +    }
 +  }
 +
 +  private int findSlot() {
 +    int i = getTotalBlockNum();
 +    for (; i < getCapacity(); i++) {
 +      if (getStorageInfo(i) == null) {
 +        return i;
 +      }
 +    }
 +    // need to expand the triplet size
 +    ensureCapacity(i + 1, true);
 +    return i;
 +  }
 +
 +  @Override
 +  boolean addStorage(DatanodeStorageInfo storage, Block reportedBlock) {
 +    int blockIndex = BlockIdManager.getBlockIndex(reportedBlock);
 +    int index = blockIndex;
 +    DatanodeStorageInfo old = getStorageInfo(index);
 +    if (old != null && !old.equals(storage)) { // over replicated
 +      // check if the storage has been stored
 +      int i = findStorageInfo(storage);
 +      if (i == -1) {
 +        index = findSlot();
 +      } else {
 +        return true;
 +      }
 +    }
 +    addStorage(storage, index, blockIndex);
 +    return true;
 +  }
 +
 +  private void addStorage(DatanodeStorageInfo storage, int index,
 +      int blockIndex) {
 +    setStorageInfo(index, storage);
 +    setNext(index, null);
 +    setPrevious(index, null);
 +    indices[index] = (byte) blockIndex;
 +  }
 +
 +  private int findStorageInfoFromEnd(DatanodeStorageInfo storage) {
 +    final int len = getCapacity();
 +    for(int idx = len - 1; idx >= 0; idx--) {
 +      DatanodeStorageInfo cur = getStorageInfo(idx);
 +      if (storage.equals(cur)) {
 +        return idx;
 +      }
 +    }
 +    return -1;
 +  }
 +
 +  int getStorageBlockIndex(DatanodeStorageInfo storage) {
 +    int i = this.findStorageInfo(storage);
 +    return i == -1 ? -1 : indices[i];
 +  }
 +
 +  /**
 +   * Identify the block stored in the given datanode storage. Note that
 +   * the returned block has the same block Id with the one seen/reported by the
 +   * DataNode.
 +   */
 +  Block getBlockOnStorage(DatanodeStorageInfo storage) {
 +    int index = getStorageBlockIndex(storage);
 +    if (index < 0) {
 +      return null;
 +    } else {
 +      Block block = new Block(this);
 +      block.setBlockId(this.getBlockId() + index);
 +      return block;
 +    }
 +  }
 +
 +  @Override
 +  boolean removeStorage(DatanodeStorageInfo storage) {
 +    int dnIndex = findStorageInfoFromEnd(storage);
 +    if (dnIndex < 0) { // the node is not found
 +      return false;
 +    }
 +    assert getPrevious(dnIndex) == null && getNext(dnIndex) == null :
 +        "Block is still in the list and must be removed first.";
 +    // set the triplet to null
 +    setStorageInfo(dnIndex, null);
 +    setNext(dnIndex, null);
 +    setPrevious(dnIndex, null);
 +    indices[dnIndex] = -1;
 +    return true;
 +  }
 +
 +  private void ensureCapacity(int totalSize, boolean keepOld) {
 +    if (getCapacity() < totalSize) {
 +      Object[] old = triplets;
 +      byte[] oldIndices = indices;
 +      triplets = new Object[totalSize * 3];
 +      indices = new byte[totalSize];
 +      initIndices();
 +
 +      if (keepOld) {
 +        System.arraycopy(old, 0, triplets, 0, old.length);
 +        System.arraycopy(oldIndices, 0, indices, 0, oldIndices.length);
 +      }
 +    }
 +  }
 +
-   @Override
-   void replaceBlock(BlockInfo newBlock) {
-     assert newBlock instanceof BlockInfoStriped;
-     BlockInfoStriped newBlockGroup = (BlockInfoStriped) newBlock;
-     final int size = getCapacity();
-     newBlockGroup.ensureCapacity(size, false);
-     for (int i = 0; i < size; i++) {
-       final DatanodeStorageInfo storage = this.getStorageInfo(i);
-       if (storage != null) {
-         final int blockIndex = indices[i];
-         final boolean removed = storage.removeBlock(this);
-         assert removed : "currentBlock not found.";
- 
-         newBlockGroup.addStorage(storage, i, blockIndex);
-         storage.insertToList(newBlockGroup);
-       }
-     }
-   }
- 
 +  public long spaceConsumed() {
 +    // In case striped blocks, total usage by this striped blocks should
 +    // be the total of data blocks and parity blocks because
 +    // `getNumBytes` is the total of actual data block size.
 +    return StripedBlockUtil.spaceConsumedByStripedBlock(getNumBytes(),
 +        ecPolicy.getNumDataUnits(), ecPolicy.getNumParityUnits(),
 +        BLOCK_STRIPED_CELL_SIZE);
 +    }
 +
 +  @Override
 +  public final boolean isStriped() {
 +    return true;
 +  }
 +
 +  @Override
 +  public int numNodes() {
 +    assert this.triplets != null : "BlockInfo is not initialized";
 +    assert triplets.length % 3 == 0 : "Malformed BlockInfo";
 +    int num = 0;
 +    for (int idx = getCapacity()-1; idx >= 0; idx--) {
 +      if (getStorageInfo(idx) != null) {
 +        num++;
 +      }
 +    }
 +    return num;
 +  }
 +
 +  @Override
 +  final boolean hasNoStorage() {
 +    final int len = getCapacity();
 +    for(int idx = 0; idx < len; idx++) {
 +      if (getStorageInfo(idx) != null) {
 +        return false;
 +      }
 +    }
 +    return true;
 +  }
 +}