You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by zh...@apache.org on 2015/03/16 21:18:21 UTC

[14/50] [abbrv] hadoop git commit: HDFS-7435. PB encoding of block reports is very inefficient. Contributed by Daryn Sharp.

HDFS-7435. PB encoding of block reports is very inefficient. Contributed by Daryn Sharp.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/d324164a
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/d324164a
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/d324164a

Branch: refs/heads/HDFS-7285
Commit: d324164a51a43d72c02567248bd9f0f12b244a40
Parents: f446669
Author: Kihwal Lee <ki...@apache.org>
Authored: Fri Mar 13 14:13:55 2015 -0500
Committer: Kihwal Lee <ki...@apache.org>
Committed: Fri Mar 13 14:23:37 2015 -0500

----------------------------------------------------------------------
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt     |   3 +
 .../hadoop/hdfs/protocol/BlockListAsLongs.java  | 660 +++++++++++--------
 .../DatanodeProtocolClientSideTranslatorPB.java |  22 +-
 .../DatanodeProtocolServerSideTranslatorPB.java |  14 +-
 .../apache/hadoop/hdfs/protocolPB/PBHelper.java |   6 +-
 .../server/blockmanagement/BlockManager.java    |  16 +-
 .../hdfs/server/datanode/BPServiceActor.java    |  13 +-
 .../datanode/fsdataset/impl/FsDatasetImpl.java  |  20 +-
 .../hdfs/server/namenode/NameNodeRpcServer.java |   2 +-
 .../server/protocol/DatanodeRegistration.java   |   9 +
 .../hdfs/server/protocol/NamespaceInfo.java     |  52 ++
 .../server/protocol/StorageBlockReport.java     |   8 +-
 .../src/main/proto/DatanodeProtocol.proto       |   2 +
 .../hadoop-hdfs/src/main/proto/hdfs.proto       |   1 +
 .../hdfs/protocol/TestBlockListAsLongs.java     | 237 +++++++
 .../blockmanagement/TestBlockManager.java       |   8 +-
 .../server/datanode/BlockReportTestBase.java    |  27 +-
 .../server/datanode/SimulatedFSDataset.java     |  11 +-
 .../TestBlockHasMultipleReplicasOnSameDN.java   |   9 +-
 .../datanode/TestDataNodeVolumeFailure.java     |   4 +-
 ...TestDnRespectsBlockReportSplitThreshold.java |   2 +-
 .../extdataset/ExternalDatasetImpl.java         |   2 +-
 .../server/namenode/NNThroughputBenchmark.java  |  23 +-
 .../hdfs/server/namenode/TestDeadDatanode.java  |   3 +-
 .../hdfs/server/namenode/TestFSImage.java       |   2 +
 .../TestOfflineEditsViewer.java                 |   9 +-
 26 files changed, 811 insertions(+), 354 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/d324164a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 909182b..ac7e096 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -743,6 +743,9 @@ Release 2.7.0 - UNRELEASED
     HDFS-7491. Add incremental blockreport latency to DN metrics.
     (Ming Ma via cnauroth)
 
+    HDFS-7435. PB encoding of block reports is very inefficient.
+    (Daryn Sharp via kihwal)
+
   OPTIMIZATIONS
 
     HDFS-7454. Reduce memory footprint for AclEntries in NameNode.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d324164a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/BlockListAsLongs.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/BlockListAsLongs.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/BlockListAsLongs.java
index 4389714..1c89ee4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/BlockListAsLongs.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/BlockListAsLongs.java
@@ -17,342 +17,458 @@
  */
 package org.apache.hadoop.hdfs.protocol;
 
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
-import java.util.Random;
 
-import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hdfs.protocol.BlockListAsLongs.BlockReportReplica;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
 import org.apache.hadoop.hdfs.server.datanode.Replica;
+import com.google.common.base.Preconditions;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.CodedInputStream;
+import com.google.protobuf.CodedOutputStream;
 
-/**
- * This class provides an interface for accessing list of blocks that
- * has been implemented as long[].
- * This class is useful for block report. Rather than send block reports
- * as a Block[] we can send it as a long[].
- *
- * The structure of the array is as follows:
- * 0: the length of the finalized replica list;
- * 1: the length of the under-construction replica list;
- * - followed by finalized replica list where each replica is represented by
- *   3 longs: one for the blockId, one for the block length, and one for
- *   the generation stamp;
- * - followed by the invalid replica represented with three -1s;
- * - followed by the under-construction replica list where each replica is
- *   represented by 4 longs: three for the block id, length, generation 
- *   stamp, and the fourth for the replica state.
- */
 @InterfaceAudience.Private
 @InterfaceStability.Evolving
-public class BlockListAsLongs implements Iterable<Block> {
+public abstract class BlockListAsLongs implements Iterable<BlockReportReplica> {
+  private final static int CHUNK_SIZE = 64*1024; // 64K
+  private static long[] EMPTY_LONGS = new long[]{0, 0};
+
+  public static BlockListAsLongs EMPTY = new BlockListAsLongs() {
+    @Override
+    public int getNumberOfBlocks() {
+      return 0;
+    }
+    @Override
+    public ByteString getBlocksBuffer() {
+      return ByteString.EMPTY;
+    }
+    @Override
+    public long[] getBlockListAsLongs() {
+      return EMPTY_LONGS;
+    }
+    @Override
+    public Iterator<BlockReportReplica> iterator() {
+      return Collections.emptyIterator();
+    }
+  };
+
   /**
-   * A finalized block as 3 longs
-   *   block-id and block length and generation stamp
+   * Prepare an instance to in-place decode the given ByteString buffer
+   * @param numBlocks - blocks in the buffer
+   * @param blocksBuf - ByteString encoded varints
+   * @return BlockListAsLongs
    */
-  private static final int LONGS_PER_FINALIZED_BLOCK = 3;
+  public static BlockListAsLongs decodeBuffer(final int numBlocks,
+      final ByteString blocksBuf) {
+    return new BufferDecoder(numBlocks, blocksBuf);
+  }
 
   /**
-   * An under-construction block as 4 longs
-   *   block-id and block length, generation stamp and replica state
+   * Prepare an instance to in-place decode the given ByteString buffers
+   * @param numBlocks - blocks in the buffers
+   * @param blocksBufs - list of ByteString encoded varints
+   * @return BlockListAsLongs
    */
-  private static final int LONGS_PER_UC_BLOCK = 4;
-
-  /** Number of longs in the header */
-  private static final int HEADER_SIZE = 2;
+  public static BlockListAsLongs decodeBuffers(final int numBlocks,
+      final List<ByteString> blocksBufs) {
+    // this doesn't actually copy the data
+    return decodeBuffer(numBlocks, ByteString.copyFrom(blocksBufs));
+  }
 
   /**
-   * Returns the index of the first long in blockList
-   * belonging to the specified block.
-   * The first long contains the block id.
+   * Prepare an instance to in-place decode the given list of Longs.  Note
+   * it's much more efficient to decode ByteString buffers and only exists
+   * for compatibility.
+   * @param blocksList - list of longs
+   * @return BlockListAsLongs
    */
-  private int index2BlockId(int blockIndex) {
-    if(blockIndex < 0 || blockIndex > getNumberOfBlocks())
-      return -1;
-    int finalizedSize = getNumberOfFinalizedReplicas();
-    if(blockIndex < finalizedSize)
-      return HEADER_SIZE + blockIndex * LONGS_PER_FINALIZED_BLOCK;
-    return HEADER_SIZE + (finalizedSize + 1) * LONGS_PER_FINALIZED_BLOCK
-            + (blockIndex - finalizedSize) * LONGS_PER_UC_BLOCK;
+  public static BlockListAsLongs decodeLongs(List<Long> blocksList) {
+    return blocksList.isEmpty() ? EMPTY : new LongsDecoder(blocksList);
   }
 
-  private final long[] blockList;
-  
   /**
-   * Create block report from finalized and under construction lists of blocks.
-   * 
-   * @param finalized - list of finalized blocks
-   * @param uc - list of under construction blocks
+   * Prepare an instance to encode the collection of replicas into an
+   * efficient ByteString.
+   * @param replicas - replicas to encode
+   * @return BlockListAsLongs
    */
-  public BlockListAsLongs(final List<? extends Replica> finalized,
-                          final List<? extends Replica> uc) {
-    int finalizedSize = finalized == null ? 0 : finalized.size();
-    int ucSize = uc == null ? 0 : uc.size();
-    int len = HEADER_SIZE
-              + (finalizedSize + 1) * LONGS_PER_FINALIZED_BLOCK
-              + ucSize * LONGS_PER_UC_BLOCK;
-
-    blockList = new long[len];
+  public static BlockListAsLongs encode(
+      final Collection<? extends Replica> replicas) {
+    BlockListAsLongs.Builder builder = builder();
+    for (Replica replica : replicas) {
+      builder.add(replica);
+    }
+    return builder.build();
+  }
 
-    // set the header
-    blockList[0] = finalizedSize;
-    blockList[1] = ucSize;
+  public static Builder builder() {
+    return new BlockListAsLongs.Builder();
+  }
 
-    // set finalized blocks
-    for (int i = 0; i < finalizedSize; i++) {
-      setBlock(i, finalized.get(i));
-    }
+  /**
+   * The number of blocks
+   * @return - the number of blocks
+   */
+  abstract public int getNumberOfBlocks();
 
-    // set invalid delimiting block
-    setDelimitingBlock(finalizedSize);
+  /**
+   * Very efficient encoding of the block report into a ByteString to avoid
+   * the overhead of protobuf repeating fields.  Primitive repeating fields
+   * require re-allocs of an ArrayList<Long> and the associated (un)boxing
+   * overhead which puts pressure on GC.
+   * 
+   * The structure of the buffer is as follows:
+   * - each replica is represented by 4 longs:
+   *   blockId, block length, genstamp, replica state
+   *
+   * @return ByteString encoded block report
+   */
+  abstract public ByteString getBlocksBuffer();
 
-    // set under construction blocks
-    for (int i = 0; i < ucSize; i++) {
-      setBlock(finalizedSize + i, uc.get(i));
+  /**
+   * List of ByteStrings that encode this block report
+   *
+   * @return ByteStrings
+   */
+  public List<ByteString> getBlocksBuffers() {
+    final ByteString blocksBuf = getBlocksBuffer();
+    final List<ByteString> buffers;
+    final int size = blocksBuf.size();
+    if (size <= CHUNK_SIZE) {
+      buffers = Collections.singletonList(blocksBuf);
+    } else {
+      buffers = new ArrayList<ByteString>();
+      for (int pos=0; pos < size; pos += CHUNK_SIZE) {
+        // this doesn't actually copy the data
+        buffers.add(blocksBuf.substring(pos, Math.min(pos+CHUNK_SIZE, size)));
+      }
     }
+    return buffers;
   }
 
   /**
-   * Create block report from a list of finalized blocks.  Used by
-   * NNThroughputBenchmark.
-   *
-   * @param blocks - list of finalized blocks
+   * Convert block report to old-style list of longs.  Only used to
+   * re-encode the block report when the DN detects an older NN. This is
+   * inefficient, but in practice a DN is unlikely to be upgraded first
+   * 
+   * The structure of the array is as follows:
+   * 0: the length of the finalized replica list;
+   * 1: the length of the under-construction replica list;
+   * - followed by finalized replica list where each replica is represented by
+   *   3 longs: one for the blockId, one for the block length, and one for
+   *   the generation stamp;
+   * - followed by the invalid replica represented with three -1s;
+   * - followed by the under-construction replica list where each replica is
+   *   represented by 4 longs: three for the block id, length, generation 
+   *   stamp, and the fourth for the replica state.
+   * @return list of longs
    */
-  public BlockListAsLongs(final List<? extends Block> blocks) {
-    int finalizedSize = blocks == null ? 0 : blocks.size();
-    int len = HEADER_SIZE
-              + (finalizedSize + 1) * LONGS_PER_FINALIZED_BLOCK;
+  abstract public long[] getBlockListAsLongs();
 
-    blockList = new long[len];
+  /**
+   * Returns a singleton iterator over blocks in the block report.  Do not
+   * add the returned blocks to a collection.
+   * @return Iterator
+   */
+  abstract public Iterator<BlockReportReplica> iterator();
 
-    // set the header
-    blockList[0] = finalizedSize;
-    blockList[1] = 0;
+  public static class Builder {
+    private final ByteString.Output out;
+    private final CodedOutputStream cos;
+    private int numBlocks = 0;
+    private int numFinalized = 0;
 
-    // set finalized blocks
-    for (int i = 0; i < finalizedSize; i++) {
-      setBlock(i, blocks.get(i));
+    Builder() {
+      out = ByteString.newOutput(64*1024);
+      cos = CodedOutputStream.newInstance(out);
     }
 
-    // set invalid delimiting block
-    setDelimitingBlock(finalizedSize);
-  }
-
-  public BlockListAsLongs() {
-    this((long[])null);
-  }
+    public void add(Replica replica) {
+      try {
+        // zig-zag to reduce size of legacy blocks
+        cos.writeSInt64NoTag(replica.getBlockId());
+        cos.writeRawVarint64(replica.getBytesOnDisk());
+        cos.writeRawVarint64(replica.getGenerationStamp());
+        ReplicaState state = replica.getState();
+        // although state is not a 64-bit value, using a long varint to
+        // allow for future use of the upper bits
+        cos.writeRawVarint64(state.getValue());
+        if (state == ReplicaState.FINALIZED) {
+          numFinalized++;
+        }
+        numBlocks++;
+      } catch (IOException ioe) {
+        // shouldn't happen, ByteString.Output doesn't throw IOE
+        throw new IllegalStateException(ioe);
+      }
+    }
 
-  /**
-   * Constructor
-   * @param iBlockList - BlockListALongs create from this long[] parameter
-   */
-  public BlockListAsLongs(final long[] iBlockList) {
-    if (iBlockList == null) {
-      blockList = new long[HEADER_SIZE];
-      return;
+    public int getNumberOfBlocks() {
+      return numBlocks;
+    }
+    
+    public BlockListAsLongs build() {
+      try {
+        cos.flush();
+      } catch (IOException ioe) {
+        // shouldn't happen, ByteString.Output doesn't throw IOE
+        throw new IllegalStateException(ioe);
+      }
+      return new BufferDecoder(numBlocks, numFinalized, out.toByteString());
     }
-    blockList = iBlockList;
   }
 
-  public long[] getBlockListAsLongs() {
-    return blockList;
-  }
+  // decode new-style ByteString buffer based block report
+  private static class BufferDecoder extends BlockListAsLongs {
+    // reserve upper bits for future use.  decoding masks off these bits to
+    // allow compatibility for the current through future release that may
+    // start using the bits
+    private static long NUM_BYTES_MASK = (-1L) >>> (64 - 48);
+    private static long REPLICA_STATE_MASK = (-1L) >>> (64 - 4);
 
-  /**
-   * Iterates over blocks in the block report.
-   * Avoids object allocation on each iteration.
-   */
-  @InterfaceAudience.Private
-  @InterfaceStability.Evolving
-  public class BlockReportIterator implements Iterator<Block> {
-    private int currentBlockIndex;
-    private final Block block;
-    private ReplicaState currentReplicaState;
-
-    BlockReportIterator() {
-      this.currentBlockIndex = 0;
-      this.block = new Block();
-      this.currentReplicaState = null;
+    private final ByteString buffer;
+    private final int numBlocks;
+    private int numFinalized;
+
+    BufferDecoder(final int numBlocks, final ByteString buf) {
+      this(numBlocks, -1, buf);
     }
 
-    @Override
-    public boolean hasNext() {
-      return currentBlockIndex < getNumberOfBlocks();
+    BufferDecoder(final int numBlocks, final int numFinalized,
+        final ByteString buf) {
+      this.numBlocks = numBlocks;
+      this.numFinalized = numFinalized;
+      this.buffer = buf;
     }
 
     @Override
-    public Block next() {
-      block.set(blockId(currentBlockIndex),
-                blockLength(currentBlockIndex),
-                blockGenerationStamp(currentBlockIndex));
-      currentReplicaState = blockReplicaState(currentBlockIndex);
-      currentBlockIndex++;
-      return block;
+    public int getNumberOfBlocks() {
+      return numBlocks;
     }
 
     @Override
-    public void remove() {
-      throw new UnsupportedOperationException("Sorry. can't remove.");
+    public ByteString getBlocksBuffer() {
+      return buffer;
     }
 
-    /**
-     * Get the state of the current replica.
-     * The state corresponds to the replica returned
-     * by the latest {@link #next()}. 
-     */
-    public ReplicaState getCurrentReplicaState() {
-      return currentReplicaState;
+    @Override
+    public long[] getBlockListAsLongs() {
+      // terribly inefficient but only occurs if server tries to transcode
+      // an undecoded buffer into longs - ie. it will never happen but let's
+      // handle it anyway
+      if (numFinalized == -1) {
+        int n = 0;
+        for (Replica replica : this) {
+          if (replica.getState() == ReplicaState.FINALIZED) {
+            n++;
+          }
+        }
+        numFinalized = n;
+      }
+      int numUc = numBlocks - numFinalized;
+      int size = 2 + 3*(numFinalized+1) + 4*(numUc);
+      long[] longs = new long[size];
+      longs[0] = numFinalized;
+      longs[1] = numUc;
+
+      int idx = 2;
+      int ucIdx = idx + 3*numFinalized;
+      // delimiter block
+      longs[ucIdx++] = -1;
+      longs[ucIdx++] = -1;
+      longs[ucIdx++] = -1;
+
+      for (BlockReportReplica block : this) {
+        switch (block.getState()) {
+          case FINALIZED: {
+            longs[idx++] = block.getBlockId();
+            longs[idx++] = block.getNumBytes();
+            longs[idx++] = block.getGenerationStamp();
+            break;
+          }
+          default: {
+            longs[ucIdx++] = block.getBlockId();
+            longs[ucIdx++] = block.getNumBytes();
+            longs[ucIdx++] = block.getGenerationStamp();
+            longs[ucIdx++] = block.getState().getValue();
+            break;
+          }
+        }
+      }
+      return longs;
     }
-  }
 
-  /**
-   * Returns an iterator over blocks in the block report. 
-   */
-  @Override
-  public Iterator<Block> iterator() {
-    return getBlockReportIterator();
-  }
-
-  /**
-   * Returns {@link BlockReportIterator}. 
-   */
-  public BlockReportIterator getBlockReportIterator() {
-    return new BlockReportIterator();
-  }
-
-  /**
-   * The number of blocks
-   * @return - the number of blocks
-   */
-  public int getNumberOfBlocks() {
-    assert blockList.length == HEADER_SIZE + 
-            (blockList[0] + 1) * LONGS_PER_FINALIZED_BLOCK +
-            blockList[1] * LONGS_PER_UC_BLOCK :
-              "Number of blocks is inconcistent with the array length";
-    return getNumberOfFinalizedReplicas() + getNumberOfUCReplicas();
-  }
-
-  /**
-   * Returns the number of finalized replicas in the block report.
-   */
-  private int getNumberOfFinalizedReplicas() {
-    return (int)blockList[0];
-  }
-
-  /**
-   * Returns the number of under construction replicas in the block report.
-   */
-  private int getNumberOfUCReplicas() {
-    return (int)blockList[1];
+    @Override
+    public Iterator<BlockReportReplica> iterator() {
+      return new Iterator<BlockReportReplica>() {
+        final BlockReportReplica block = new BlockReportReplica();
+        final CodedInputStream cis = buffer.newCodedInput();
+        private int currentBlockIndex = 0;
+
+        @Override
+        public boolean hasNext() {
+          return currentBlockIndex < numBlocks;
+        }
+
+        @Override
+        public BlockReportReplica next() {
+          currentBlockIndex++;
+          try {
+            // zig-zag to reduce size of legacy blocks and mask off bits
+            // we don't (yet) understand
+            block.setBlockId(cis.readSInt64());
+            block.setNumBytes(cis.readRawVarint64() & NUM_BYTES_MASK);
+            block.setGenerationStamp(cis.readRawVarint64());
+            long state = cis.readRawVarint64() & REPLICA_STATE_MASK;
+            block.setState(ReplicaState.getState((int)state));
+          } catch (IOException e) {
+            throw new IllegalStateException(e);
+          }
+          return block;
+        }
+
+        @Override
+        public void remove() {
+          throw new UnsupportedOperationException();
+        }
+      };
+    }
   }
 
-  /**
-   * Returns the id of the specified replica of the block report.
-   */
-  private long blockId(int index) {
-    return blockList[index2BlockId(index)];
-  }
+  // decode old style block report of longs
+  private static class LongsDecoder extends BlockListAsLongs {
+    private final List<Long> values;
+    private final int finalizedBlocks;
+    private final int numBlocks;
 
-  /**
-   * Returns the length of the specified replica of the block report.
-   */
-  private long blockLength(int index) {
-    return blockList[index2BlockId(index) + 1];
-  }
+    // set the header
+    LongsDecoder(List<Long> values) {
+      this.values = values.subList(2, values.size());
+      this.finalizedBlocks = values.get(0).intValue();
+      this.numBlocks = finalizedBlocks + values.get(1).intValue();
+    }
 
-  /**
-   * Returns the generation stamp of the specified replica of the block report.
-   */
-  private long blockGenerationStamp(int index) {
-    return blockList[index2BlockId(index) + 2];
-  }
+    @Override
+    public int getNumberOfBlocks() {
+      return numBlocks;
+    }
 
-  /**
-   * Returns the state of the specified replica of the block report.
-   */
-  private ReplicaState blockReplicaState(int index) {
-    if(index < getNumberOfFinalizedReplicas())
-      return ReplicaState.FINALIZED;
-    return ReplicaState.getState((int)blockList[index2BlockId(index) + 3]);
-  }
+    @Override
+    public ByteString getBlocksBuffer() {
+      Builder builder = builder();
+      for (Replica replica : this) {
+        builder.add(replica);
+      }
+      return builder.build().getBlocksBuffer();
+    }
 
-  /**
-   * Corrupt the generation stamp of the block with the given index.
-   * Not meant to be used outside of tests.
-   */
-  @VisibleForTesting
-  public long corruptBlockGSForTesting(final int blockIndex, Random rand) {
-    long oldGS = blockList[index2BlockId(blockIndex) + 2];
-    while (blockList[index2BlockId(blockIndex) + 2] == oldGS) {
-      blockList[index2BlockId(blockIndex) + 2] = rand.nextInt();
+    @Override
+    public long[] getBlockListAsLongs() {
+      long[] longs = new long[2+values.size()];
+      longs[0] = finalizedBlocks;
+      longs[1] = numBlocks - finalizedBlocks;
+      for (int i=0; i < longs.length; i++) {
+        longs[i] = values.get(i);
+      }
+      return longs;
     }
-    return oldGS;
-  }
 
-  /**
-   * Corrupt the length of the block with the given index by truncation.
-   * Not meant to be used outside of tests.
-   */
-  @VisibleForTesting
-  public long corruptBlockLengthForTesting(final int blockIndex, Random rand) {
-    long oldLength = blockList[index2BlockId(blockIndex) + 1];
-    blockList[index2BlockId(blockIndex) + 1] =
-        rand.nextInt((int) oldLength - 1);
-    return oldLength;
+    @Override
+    public Iterator<BlockReportReplica> iterator() {
+      return new Iterator<BlockReportReplica>() {
+        private final BlockReportReplica block = new BlockReportReplica();
+        final Iterator<Long> iter = values.iterator();
+        private int currentBlockIndex = 0;
+
+        @Override
+        public boolean hasNext() {
+          return currentBlockIndex < numBlocks;
+        }
+
+        @Override
+        public BlockReportReplica next() {
+          if (currentBlockIndex == finalizedBlocks) {
+            // verify the presence of the delimiter block
+            readBlock();
+            Preconditions.checkArgument(block.getBlockId() == -1 &&
+                                        block.getNumBytes() == -1 &&
+                                        block.getGenerationStamp() == -1,
+                                        "Invalid delimiter block");
+          }
+
+          readBlock();
+          if (currentBlockIndex++ < finalizedBlocks) {
+            block.setState(ReplicaState.FINALIZED);
+          } else {
+            block.setState(ReplicaState.getState(iter.next().intValue()));
+          }
+          return block;
+        }
+
+        private void readBlock() {
+          block.setBlockId(iter.next());
+          block.setNumBytes(iter.next());
+          block.setGenerationStamp(iter.next());
+        }
+
+        @Override
+        public void remove() {
+          throw new UnsupportedOperationException();
+        }
+      };
+    }
   }
   
-  /**
-   * Set the indexTh block
-   * @param index - the index of the block to set
-   * @param r - the block is set to the value of the this Replica
-   */
-  private void setBlock(final int index, final Replica r) {
-    int pos = index2BlockId(index);
-    blockList[pos] = r.getBlockId();
-    blockList[pos + 1] = r.getNumBytes();
-    blockList[pos + 2] = r.getGenerationStamp();
-    if(index < getNumberOfFinalizedReplicas())
-      return;
-    assert r.getState() != ReplicaState.FINALIZED :
-      "Must be under-construction replica.";
-    blockList[pos + 3] = r.getState().getValue();
-  }
-
-  /**
-   * Set the indexTh block
-   * @param index - the index of the block to set
-   * @param b - the block is set to the value of the this Block
-   */
-  private void setBlock(final int index, final Block b) {
-    int pos = index2BlockId(index);
-    blockList[pos] = b.getBlockId();
-    blockList[pos + 1] = b.getNumBytes();
-    blockList[pos + 2] = b.getGenerationStamp();
-  }
-
-  /**
-   * Set the invalid delimiting block between the finalized and
-   * the under-construction lists.
-   * The invalid block has all three fields set to -1.
-   * @param finalizedSzie - the size of the finalized list
-   */
-  private void setDelimitingBlock(final int finalizedSzie) {
-    int idx = HEADER_SIZE + finalizedSzie * LONGS_PER_FINALIZED_BLOCK;
-    blockList[idx] = -1;
-    blockList[idx+1] = -1;
-    blockList[idx+2] = -1;
-  }
-
-  public long getMaxGsInBlockList() {
-    long maxGs = -1;
-    Iterator<Block> iter = getBlockReportIterator();
-    while (iter.hasNext()) {
-      Block b = iter.next();
-      if (b.getGenerationStamp() > maxGs) {
-        maxGs = b.getGenerationStamp();
+  @InterfaceAudience.Private
+  public static class BlockReportReplica extends Block implements Replica {
+    private ReplicaState state;
+    private BlockReportReplica() {
+    }
+    public BlockReportReplica(Block block) {
+      super(block);
+      if (block instanceof BlockReportReplica) {
+        this.state = ((BlockReportReplica)block).getState();
+      } else {
+        this.state = ReplicaState.FINALIZED;
       }
     }
-    return maxGs;
+    public void setState(ReplicaState state) {
+      this.state = state;
+    }
+    @Override
+    public ReplicaState getState() {
+      return state;
+    }
+    @Override
+    public long getBytesOnDisk() {
+      return getNumBytes();
+    }
+    @Override
+    public long getVisibleLength() {
+      throw new UnsupportedOperationException();
+    }
+    @Override
+    public String getStorageUuid() {
+      throw new UnsupportedOperationException();
+    }
+    @Override
+    public boolean isOnTransientStorage() {
+      throw new UnsupportedOperationException();
+    }
+    @Override
+    public boolean equals(Object o) {
+      return super.equals(o);
+    }
+    @Override
+    public int hashCode() {
+      return super.hashCode();
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d324164a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java
index 192916f..c4003f1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java
@@ -26,6 +26,7 @@ import java.util.List;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
@@ -51,6 +52,7 @@ import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
+import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo.Capability;
 import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
 import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
 import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
@@ -64,6 +66,7 @@ import org.apache.hadoop.ipc.RpcClientUtil;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.UserGroupInformation;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.protobuf.RpcController;
 import com.google.protobuf.ServiceException;
 
@@ -83,6 +86,11 @@ public class DatanodeProtocolClientSideTranslatorPB implements
       VersionRequestProto.newBuilder().build();
   private final static RpcController NULL_CONTROLLER = null;
   
+  @VisibleForTesting
+  public DatanodeProtocolClientSideTranslatorPB(DatanodeProtocolPB rpcProxy) {
+    this.rpcProxy = rpcProxy;
+  }
+
   public DatanodeProtocolClientSideTranslatorPB(InetSocketAddress nameNodeAddr,
       Configuration conf) throws IOException {
     RPC.setProtocolEngine(conf, DatanodeProtocolPB.class,
@@ -166,12 +174,20 @@ public class DatanodeProtocolClientSideTranslatorPB implements
         .newBuilder().setRegistration(PBHelper.convert(registration))
         .setBlockPoolId(poolId);
     
+    boolean useBlocksBuffer = registration.getNamespaceInfo()
+        .isCapabilitySupported(Capability.STORAGE_BLOCK_REPORT_BUFFERS);
+
     for (StorageBlockReport r : reports) {
       StorageBlockReportProto.Builder reportBuilder = StorageBlockReportProto
           .newBuilder().setStorage(PBHelper.convert(r.getStorage()));
-      long[] blocks = r.getBlocks();
-      for (int i = 0; i < blocks.length; i++) {
-        reportBuilder.addBlocks(blocks[i]);
+      BlockListAsLongs blocks = r.getBlocks();
+      if (useBlocksBuffer) {
+        reportBuilder.setNumberOfBlocks(blocks.getNumberOfBlocks());
+        reportBuilder.addAllBlocksBuffers(blocks.getBlocksBuffers());
+      } else {
+        for (long value : blocks.getBlockListAsLongs()) {
+          reportBuilder.addBlocks(value);
+        }
       }
       builder.addReports(reportBuilder.build());
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d324164a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java
index 1a89090..e18081f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hdfs.protocolPB;
 import java.io.IOException;
 import java.util.List;
 
+import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.RollingUpgradeStatus;
@@ -58,6 +59,7 @@ import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
 import org.apache.hadoop.hdfs.server.protocol.StorageReport;
 import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
 
+import com.google.common.base.Preconditions;
 import com.google.protobuf.RpcController;
 import com.google.protobuf.ServiceException;
 
@@ -145,10 +147,14 @@ public class DatanodeProtocolServerSideTranslatorPB implements
     
     int index = 0;
     for (StorageBlockReportProto s : request.getReportsList()) {
-      List<Long> blockIds = s.getBlocksList();
-      long[] blocks = new long[blockIds.size()];
-      for (int i = 0; i < blockIds.size(); i++) {
-        blocks[i] = blockIds.get(i);
+      final BlockListAsLongs blocks;
+      if (s.hasNumberOfBlocks()) { // new style buffer based reports
+        int num = (int)s.getNumberOfBlocks();
+        Preconditions.checkState(s.getBlocksCount() == 0,
+            "cannot send both blocks list and buffers");
+        blocks = BlockListAsLongs.decodeBuffers(num, s.getBlocksBuffersList());
+      } else {
+        blocks = BlockListAsLongs.decodeLongs(s.getBlocksList());
       }
       report[index++] = new StorageBlockReport(PBHelper.convert(s.getStorage()),
           blocks);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d324164a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
index ee1603c..c428c2b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
@@ -573,7 +573,7 @@ public class PBHelper {
     StorageInfoProto storage = info.getStorageInfo();
     return new NamespaceInfo(storage.getNamespceID(), storage.getClusterID(),
         info.getBlockPoolID(), storage.getCTime(), info.getBuildVersion(),
-        info.getSoftwareVersion());
+        info.getSoftwareVersion(), info.getCapabilities());
   }
 
   public static NamenodeCommand convert(NamenodeCommandProto cmd) {
@@ -1233,7 +1233,9 @@ public class PBHelper {
         .setBuildVersion(info.getBuildVersion())
         .setUnused(0)
         .setStorageInfo(PBHelper.convert((StorageInfo)info))
-        .setSoftwareVersion(info.getSoftwareVersion()).build();
+        .setSoftwareVersion(info.getSoftwareVersion())
+        .setCapabilities(info.getCapabilities())
+        .build();
   }
   
   // Located Block Arrays and Lists

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d324164a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index c1a3e05..f155375 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -47,7 +47,7 @@ import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.HAUtil;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
-import org.apache.hadoop.hdfs.protocol.BlockListAsLongs.BlockReportIterator;
+import org.apache.hadoop.hdfs.protocol.BlockListAsLongs.BlockReportReplica;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
@@ -1968,11 +1968,9 @@ public class BlockManager {
     if (report == null) return;
     assert (namesystem.hasWriteLock());
     assert (storageInfo.numBlocks() == 0);
-    BlockReportIterator itBR = report.getBlockReportIterator();
 
-    while(itBR.hasNext()) {
-      Block iblk = itBR.next();
-      ReplicaState reportedState = itBR.getCurrentReplicaState();
+    for (BlockReportReplica iblk : report) {
+      ReplicaState reportedState = iblk.getState();
       
       if (shouldPostponeBlocksFromFuture &&
           namesystem.isGenStampInFuture(iblk)) {
@@ -2042,13 +2040,11 @@ public class BlockManager {
     int curIndex;
 
     if (newReport == null) {
-      newReport = new BlockListAsLongs();
+      newReport = BlockListAsLongs.EMPTY;
     }
     // scan the report and process newly reported blocks
-    BlockReportIterator itBR = newReport.getBlockReportIterator();
-    while(itBR.hasNext()) {
-      Block iblk = itBR.next();
-      ReplicaState iState = itBR.getCurrentReplicaState();
+    for (BlockReportReplica iblk : newReport) {
+      ReplicaState iState = iblk.getState();
       BlockInfoContiguous storedBlock = processReportedBlock(storageInfo,
           iblk, iState, toAdd, toInvalidate, toCorrupt, toUC);
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d324164a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
index 3ba2f54..3c20f6b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
@@ -228,7 +228,7 @@ class BPServiceActor implements Runnable {
     bpos.verifyAndSetNamespaceInfo(nsInfo);
     
     // Second phase of the handshake with the NN.
-    register();
+    register(nsInfo);
   }
 
   // This is useful to make sure NN gets Heartbeat before Blockreport
@@ -468,8 +468,7 @@ class BPServiceActor implements Runnable {
 
     for(Map.Entry<DatanodeStorage, BlockListAsLongs> kvPair : perVolumeBlockLists.entrySet()) {
       BlockListAsLongs blockList = kvPair.getValue();
-      reports[i++] = new StorageBlockReport(
-          kvPair.getKey(), blockList.getBlockListAsLongs());
+      reports[i++] = new StorageBlockReport(kvPair.getKey(), blockList);
       totalBlockCount += blockList.getNumberOfBlocks();
     }
 
@@ -774,10 +773,11 @@ class BPServiceActor implements Runnable {
    *  
    * issued by the namenode to recognize registered datanodes.
    * 
+   * @param nsInfo current NamespaceInfo
    * @see FSNamesystem#registerDatanode(DatanodeRegistration)
    * @throws IOException
    */
-  void register() throws IOException {
+  void register(NamespaceInfo nsInfo) throws IOException {
     // The handshake() phase loaded the block pool storage
     // off disk - so update the bpRegistration object from that info
     bpRegistration = bpos.createRegistration();
@@ -788,6 +788,7 @@ class BPServiceActor implements Runnable {
       try {
         // Use returned registration from namenode with updated fields
         bpRegistration = bpNamenode.registerDatanode(bpRegistration);
+        bpRegistration.setNamespaceInfo(nsInfo);
         break;
       } catch(EOFException e) {  // namenode might have just restarted
         LOG.info("Problem connecting to server: " + nnAddr + " :"
@@ -915,9 +916,9 @@ class BPServiceActor implements Runnable {
     if (shouldRun()) {
       // re-retrieve namespace info to make sure that, if the NN
       // was restarted, we still match its version (HDFS-2120)
-      retrieveNamespaceInfo();
+      NamespaceInfo nsInfo = retrieveNamespaceInfo();
       // and re-register
-      register();
+      register(nsInfo);
       scheduleHeartbeat();
     }
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d324164a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
index 486acbc..d42c00c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
@@ -1575,30 +1575,26 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
     Map<DatanodeStorage, BlockListAsLongs> blockReportsMap =
         new HashMap<DatanodeStorage, BlockListAsLongs>();
 
-    Map<String, ArrayList<ReplicaInfo>> finalized =
-        new HashMap<String, ArrayList<ReplicaInfo>>();
-    Map<String, ArrayList<ReplicaInfo>> uc =
-        new HashMap<String, ArrayList<ReplicaInfo>>();
+    Map<String, BlockListAsLongs.Builder> builders =
+        new HashMap<String, BlockListAsLongs.Builder>();
 
     List<FsVolumeImpl> curVolumes = getVolumes();
     for (FsVolumeSpi v : curVolumes) {
-      finalized.put(v.getStorageID(), new ArrayList<ReplicaInfo>());
-      uc.put(v.getStorageID(), new ArrayList<ReplicaInfo>());
+      builders.put(v.getStorageID(), BlockListAsLongs.builder());
     }
 
     synchronized(this) {
       for (ReplicaInfo b : volumeMap.replicas(bpid)) {
         switch(b.getState()) {
           case FINALIZED:
-            finalized.get(b.getVolume().getStorageID()).add(b);
-            break;
           case RBW:
           case RWR:
-            uc.get(b.getVolume().getStorageID()).add(b);
+            builders.get(b.getVolume().getStorageID()).add(b);
             break;
           case RUR:
             ReplicaUnderRecovery rur = (ReplicaUnderRecovery)b;
-            uc.get(rur.getVolume().getStorageID()).add(rur.getOriginalReplica());
+            builders.get(rur.getVolume().getStorageID())
+                .add(rur.getOriginalReplica());
             break;
           case TEMPORARY:
             break;
@@ -1609,10 +1605,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
     }
 
     for (FsVolumeImpl v : curVolumes) {
-      ArrayList<ReplicaInfo> finalizedList = finalized.get(v.getStorageID());
-      ArrayList<ReplicaInfo> ucList = uc.get(v.getStorageID());
       blockReportsMap.put(v.toDatanodeStorage(),
-                          new BlockListAsLongs(finalizedList, ucList));
+                          builders.get(v.getStorageID()).build());
     }
 
     return blockReportsMap;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d324164a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
index f20fb35..059bd28 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
@@ -1302,7 +1302,7 @@ class NameNodeRpcServer implements NamenodeProtocols {
     final BlockManager bm = namesystem.getBlockManager(); 
     boolean noStaleStorages = false;
     for(StorageBlockReport r : reports) {
-      final BlockListAsLongs blocks = new BlockListAsLongs(r.getBlocks());
+      final BlockListAsLongs blocks = r.getBlocks();
       //
       // BlockManager.processReport accumulates information of prior calls
       // for the same node and storage, so the value returned by the last

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d324164a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeRegistration.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeRegistration.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeRegistration.java
index 9db2fca..e788137 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeRegistration.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeRegistration.java
@@ -40,6 +40,7 @@ public class DatanodeRegistration extends DatanodeID
   private final StorageInfo storageInfo;
   private ExportedBlockKeys exportedKeys;
   private final String softwareVersion;
+  private NamespaceInfo nsInfo;
 
   @VisibleForTesting
   public DatanodeRegistration(String uuid, DatanodeRegistration dnr) {
@@ -77,6 +78,14 @@ public class DatanodeRegistration extends DatanodeID
   public int getVersion() {
     return storageInfo.getLayoutVersion();
   }
+
+  public void setNamespaceInfo(NamespaceInfo nsInfo) {
+    this.nsInfo = nsInfo;
+  }
+
+  public NamespaceInfo getNamespaceInfo() {
+    return nsInfo;
+  }
   
   @Override // NodeRegistration
   public String getRegistrationID() {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d324164a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NamespaceInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NamespaceInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NamespaceInfo.java
index 0733743..a7439a0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NamespaceInfo.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NamespaceInfo.java
@@ -29,6 +29,9 @@ import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NodeType;
 import org.apache.hadoop.hdfs.server.namenode.NNStorage;
 import org.apache.hadoop.util.VersionInfo;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+
 /**
  * NamespaceInfo is returned by the name-node in reply 
  * to a data-node handshake.
@@ -40,19 +43,52 @@ public class NamespaceInfo extends StorageInfo {
   final String  buildVersion;
   String blockPoolID = "";    // id of the block pool
   String softwareVersion;
+  long capabilities;
+
+  // only authoritative on the server-side to determine advertisement to
+  // clients.  enum will update the supported values
+  private static long CAPABILITIES_SUPPORTED = 0;
+
+  public enum Capability {
+    UNKNOWN(false),
+    STORAGE_BLOCK_REPORT_BUFFERS(true); // use optimized ByteString buffers
+    private final long mask;
+    Capability(boolean isSupported) {
+      int bits = ordinal() - 1;
+      mask = (bits < 0) ? 0 : (1L << bits);
+      if (isSupported) {
+        CAPABILITIES_SUPPORTED |= mask;
+      }
+    }
+    public long getMask() {
+      return mask;
+    }
+  }
 
+  // defaults to enabled capabilites since this ctor is for server
   public NamespaceInfo() {
     super(NodeType.NAME_NODE);
     buildVersion = null;
+    capabilities = CAPABILITIES_SUPPORTED;
   }
 
+  // defaults to enabled capabilites since this ctor is for server
   public NamespaceInfo(int nsID, String clusterID, String bpID,
       long cT, String buildVersion, String softwareVersion) {
+    this(nsID, clusterID, bpID, cT, buildVersion, softwareVersion,
+        CAPABILITIES_SUPPORTED);
+  }
+
+  // for use by server and/or client
+  public NamespaceInfo(int nsID, String clusterID, String bpID,
+      long cT, String buildVersion, String softwareVersion,
+      long capabilities) {
     super(HdfsConstants.NAMENODE_LAYOUT_VERSION, nsID, clusterID, cT,
         NodeType.NAME_NODE);
     blockPoolID = bpID;
     this.buildVersion = buildVersion;
     this.softwareVersion = softwareVersion;
+    this.capabilities = capabilities;
   }
 
   public NamespaceInfo(int nsID, String clusterID, String bpID, 
@@ -61,6 +97,22 @@ public class NamespaceInfo extends StorageInfo {
         VersionInfo.getVersion());
   }
   
+  public long getCapabilities() {
+    return capabilities;
+  }
+
+  @VisibleForTesting
+  public void setCapabilities(long capabilities) {
+    this.capabilities = capabilities;
+  }
+
+  public boolean isCapabilitySupported(Capability capability) {
+    Preconditions.checkArgument(capability != Capability.UNKNOWN,
+        "cannot test for unknown capability");
+    long mask = capability.getMask();
+    return (capabilities & mask) == mask;
+  }
+
   public String getBuildVersion() {
     return buildVersion;
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d324164a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/StorageBlockReport.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/StorageBlockReport.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/StorageBlockReport.java
index 1521693..4ef5ebc 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/StorageBlockReport.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/StorageBlockReport.java
@@ -18,14 +18,16 @@
 
 package org.apache.hadoop.hdfs.server.protocol;
 
+import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
+
 /**
  * Block report for a Datanode storage
  */
 public class StorageBlockReport {
   private final DatanodeStorage storage;
-  private final long[] blocks;
+  private final BlockListAsLongs blocks;
   
-  public StorageBlockReport(DatanodeStorage storage, long[] blocks) {
+  public StorageBlockReport(DatanodeStorage storage, BlockListAsLongs blocks) {
     this.storage = storage;
     this.blocks = blocks;
   }
@@ -34,7 +36,7 @@ public class StorageBlockReport {
     return storage;
   }
 
-  public long[] getBlocks() {
+  public BlockListAsLongs getBlocks() {
     return blocks;
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d324164a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto
index 348f346..7b3a4a9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto
@@ -237,6 +237,8 @@ message BlockReportRequestProto {
 message StorageBlockReportProto {
   required DatanodeStorageProto storage = 1;    // Storage
   repeated uint64 blocks = 2 [packed=true];
+  optional uint64 numberOfBlocks = 3;
+  repeated bytes blocksBuffers = 4;
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d324164a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto
index 97906b1..31e5585 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto
@@ -517,6 +517,7 @@ message NamespaceInfoProto {
   required string blockPoolID = 3;          // block pool used by the namespace
   required StorageInfoProto storageInfo = 4;// Node information
   required string softwareVersion = 5;      // Software version number (e.g. 2.0.0)
+  optional uint64 capabilities = 6 [default = 0]; // feature flags
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d324164a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocol/TestBlockListAsLongs.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocol/TestBlockListAsLongs.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocol/TestBlockListAsLongs.java
new file mode 100644
index 0000000..bebde18
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocol/TestBlockListAsLongs.java
@@ -0,0 +1,237 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.protocol;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.protocol.BlockListAsLongs.BlockReportReplica;
+import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockReportRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockReportResponseProto;
+import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
+import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolPB;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
+import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica;
+import org.apache.hadoop.hdfs.server.datanode.Replica;
+import org.apache.hadoop.hdfs.server.datanode.ReplicaBeingWritten;
+import org.apache.hadoop.hdfs.server.datanode.ReplicaWaitingToBeRecovered;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
+import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
+import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo.Capability;
+import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import com.google.protobuf.ByteString;
+import com.google.protobuf.RpcController;
+import com.google.protobuf.ServiceException;
+
+public class TestBlockListAsLongs {
+  static Block b1 = new Block(1, 11, 111);
+  static Block b2 = new Block(2, 22, 222);
+  static Block b3 = new Block(3, 33, 333);
+  static Block b4 = new Block(4, 44, 444);
+
+  @Test
+  public void testEmptyReport() {
+    BlockListAsLongs blocks = checkReport();
+    assertArrayEquals(
+        new long[] {
+            0, 0,
+            -1, -1, -1 }, 
+        blocks.getBlockListAsLongs());
+  }
+
+  @Test
+  public void testFinalized() {
+    BlockListAsLongs blocks = checkReport(
+        new FinalizedReplica(b1, null, null));
+    assertArrayEquals(
+        new long[] {
+            1, 0,
+            1, 11, 111,
+            -1, -1, -1 }, 
+        blocks.getBlockListAsLongs());
+  }
+
+  @Test
+  public void testUc() {
+    BlockListAsLongs blocks = checkReport(
+      new ReplicaBeingWritten(b1, null, null, null));
+    assertArrayEquals(
+        new long[] {
+            0, 1,
+            -1, -1, -1,
+            1, 11, 111, ReplicaState.RBW.getValue() }, 
+        blocks.getBlockListAsLongs());
+  }
+  
+  @Test
+  public void testMix() {
+    BlockListAsLongs blocks = checkReport(
+        new FinalizedReplica(b1, null, null),
+        new FinalizedReplica(b2, null, null),
+        new ReplicaBeingWritten(b3, null, null, null),
+        new ReplicaWaitingToBeRecovered(b4, null, null));
+    assertArrayEquals(
+        new long[] {
+            2, 2,
+            1, 11, 111,
+            2, 22, 222,
+            -1, -1, -1,
+            3, 33, 333, ReplicaState.RBW.getValue(),
+            4, 44, 444, ReplicaState.RWR.getValue() },
+        blocks.getBlockListAsLongs());
+  }
+
+  @Test
+  public void testFuzz() throws InterruptedException {
+    Replica[] replicas = new Replica[100000];
+    Random rand = new Random(0);
+    for (int i=0; i<replicas.length; i++) {
+      Block b = new Block(rand.nextLong(), i, i<<4);
+      switch (rand.nextInt(2)) {
+        case 0:
+          replicas[i] = new FinalizedReplica(b, null, null);
+          break;
+        case 1:
+          replicas[i] = new ReplicaBeingWritten(b, null, null, null);
+          break;
+        case 2:
+          replicas[i] = new ReplicaWaitingToBeRecovered(b, null, null);
+          break;
+      }
+    }
+    checkReport(replicas);
+  }
+
+  private BlockListAsLongs checkReport(Replica...replicas) {
+    Map<Long, Replica> expectedReplicas = new HashMap<>();
+    for (Replica replica : replicas) {
+      expectedReplicas.put(replica.getBlockId(), replica);
+    }
+    expectedReplicas = Collections.unmodifiableMap(expectedReplicas);
+    
+    // encode the blocks and extract the buffers
+    BlockListAsLongs blocks =
+        BlockListAsLongs.encode(expectedReplicas.values());
+    List<ByteString> buffers = blocks.getBlocksBuffers();
+    
+    // convert to old-style list of longs
+    List<Long> longs = new ArrayList<Long>();
+    for (long value : blocks.getBlockListAsLongs()) {
+      longs.add(value);
+    }
+
+    // decode the buffers and verify its contents
+    BlockListAsLongs decodedBlocks =
+        BlockListAsLongs.decodeBuffers(expectedReplicas.size(), buffers);
+    checkReplicas(expectedReplicas, decodedBlocks);
+
+    // decode the long and verify its contents
+    BlockListAsLongs decodedList = BlockListAsLongs.decodeLongs(longs);
+    checkReplicas(expectedReplicas, decodedList);
+    return blocks;
+  }
+  
+  private void checkReplicas(Map<Long,Replica> expectedReplicas,
+                             BlockListAsLongs decodedBlocks) {
+    assertEquals(expectedReplicas.size(), decodedBlocks.getNumberOfBlocks());
+
+    Map<Long, Replica> reportReplicas = new HashMap<>(expectedReplicas);
+    for (BlockReportReplica replica : decodedBlocks) {
+      assertNotNull(replica);
+      Replica expected = reportReplicas.remove(replica.getBlockId());
+      assertNotNull(expected);
+      assertEquals("wrong bytes",
+          expected.getNumBytes(), replica.getNumBytes());
+      assertEquals("wrong genstamp",
+          expected.getGenerationStamp(), replica.getGenerationStamp());
+      assertEquals("wrong replica state",
+          expected.getState(), replica.getState());
+    }
+    assertTrue(reportReplicas.isEmpty());
+  }
+  
+  @Test
+  public void testDatanodeDetect() throws ServiceException, IOException {
+    final AtomicReference<BlockReportRequestProto> request =
+        new AtomicReference<>();
+
+    // just capture the outgoing PB
+    DatanodeProtocolPB mockProxy = mock(DatanodeProtocolPB.class);
+    doAnswer(new Answer<BlockReportResponseProto>() {
+      public BlockReportResponseProto answer(InvocationOnMock invocation) {
+        Object[] args = invocation.getArguments();
+        request.set((BlockReportRequestProto) args[1]);
+        return BlockReportResponseProto.newBuilder().build();
+      }
+    }).when(mockProxy).blockReport(any(RpcController.class),
+                                   any(BlockReportRequestProto.class));
+    
+    @SuppressWarnings("resource")
+    DatanodeProtocolClientSideTranslatorPB nn =
+        new DatanodeProtocolClientSideTranslatorPB(mockProxy);
+
+    DatanodeRegistration reg = DFSTestUtil.getLocalDatanodeRegistration();
+    NamespaceInfo nsInfo = new NamespaceInfo(1, "cluster", "bp", 1);
+    reg.setNamespaceInfo(nsInfo);
+
+    Replica r = new FinalizedReplica(new Block(1, 2, 3), null, null);
+    BlockListAsLongs bbl = BlockListAsLongs.encode(Collections.singleton(r));
+    DatanodeStorage storage = new DatanodeStorage("s1");
+    StorageBlockReport[] sbr = { new StorageBlockReport(storage, bbl) };    
+
+    // check DN sends new-style BR
+    request.set(null);
+    nsInfo.setCapabilities(Capability.STORAGE_BLOCK_REPORT_BUFFERS.getMask());
+    nn.blockReport(reg, "pool", sbr);
+    BlockReportRequestProto proto = request.get();
+    assertNotNull(proto);
+    assertTrue(proto.getReports(0).getBlocksList().isEmpty());
+    assertFalse(proto.getReports(0).getBlocksBuffersList().isEmpty());
+    
+    // back up to prior version and check DN sends old-style BR
+    request.set(null);
+    nsInfo.setCapabilities(Capability.UNKNOWN.getMask());
+    nn.blockReport(reg, "pool", sbr);
+    proto = request.get();
+    assertNotNull(proto);
+    assertFalse(proto.getReports(0).getBlocksList().isEmpty());
+    assertTrue(proto.getReports(0).getBlocksBuffersList().isEmpty());
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d324164a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java
index 6d67c7d..d9ac9e5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java
@@ -555,12 +555,12 @@ public class TestBlockManager {
     reset(node);
     
     bm.processReport(node, new DatanodeStorage(ds.getStorageID()),
-        new BlockListAsLongs(null, null));
+        BlockListAsLongs.EMPTY);
     assertEquals(1, ds.getBlockReportCount());
     // send block report again, should NOT be processed
     reset(node);
     bm.processReport(node, new DatanodeStorage(ds.getStorageID()),
-        new BlockListAsLongs(null, null));
+        BlockListAsLongs.EMPTY);
     assertEquals(1, ds.getBlockReportCount());
 
     // re-register as if node restarted, should update existing node
@@ -571,7 +571,7 @@ public class TestBlockManager {
     // send block report, should be processed after restart
     reset(node);
     bm.processReport(node, new DatanodeStorage(ds.getStorageID()),
-                     new BlockListAsLongs(null, null));
+                     BlockListAsLongs.EMPTY);
     // Reinitialize as registration with empty storage list pruned
     // node.storageMap.
     ds = node.getStorageInfos()[0];
@@ -600,7 +600,7 @@ public class TestBlockManager {
     reset(node);
     doReturn(1).when(node).numBlocks();
     bm.processReport(node, new DatanodeStorage(ds.getStorageID()),
-        new BlockListAsLongs(null, null));
+        BlockListAsLongs.EMPTY);
     assertEquals(1, ds.getBlockReportCount());
   }
   

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d324164a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/BlockReportTestBase.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/BlockReportTestBase.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/BlockReportTestBase.java
index 8d9de7b..37c503c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/BlockReportTestBase.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/BlockReportTestBase.java
@@ -46,6 +46,7 @@ import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
+import org.apache.hadoop.hdfs.protocol.BlockListAsLongs.BlockReportReplica;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
@@ -146,22 +147,32 @@ public abstract class BlockReportTestBase {
 
       // Walk the list of blocks until we find one each to corrupt the
       // generation stamp and length, if so requested.
-      for (int i = 0; i < blockList.getNumberOfBlocks(); ++i) {
+      BlockListAsLongs.Builder builder = BlockListAsLongs.builder();
+      for (BlockReportReplica block : blockList) {
         if (corruptOneBlockGs && !corruptedGs) {
-          blockList.corruptBlockGSForTesting(i, rand);
-          LOG.info("Corrupted the GS for block ID " + i);
+          long gsOld = block.getGenerationStamp();
+          long gsNew;
+          do {
+            gsNew = rand.nextInt();
+          } while (gsNew == gsOld);
+          block.setGenerationStamp(gsNew);
+          LOG.info("Corrupted the GS for block ID " + block);
           corruptedGs = true;
         } else if (corruptOneBlockLen && !corruptedLen) {
-          blockList.corruptBlockLengthForTesting(i, rand);
-          LOG.info("Corrupted the length for block ID " + i);
+          long lenOld = block.getNumBytes();
+          long lenNew;
+          do {
+            lenNew = rand.nextInt((int)lenOld - 1);
+          } while (lenNew == lenOld);
+          block.setNumBytes(lenNew);
+          LOG.info("Corrupted the length for block ID " + block);
           corruptedLen = true;
-        } else {
-          break;
         }
+        builder.add(new BlockReportReplica(block));
       }
 
       reports[reportIndex++] =
-          new StorageBlockReport(dnStorage, blockList.getBlockListAsLongs());
+          new StorageBlockReport(dnStorage, builder.build());
     }
 
     return reports;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d324164a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
index a4ec8d5..5c7b4ac 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
@@ -52,7 +52,6 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams;
-import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl;
 import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
 import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
@@ -271,7 +270,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
 
     @Override
     public ReplicaState getState() {
-      return null;
+      return finalized ? ReplicaState.FINALIZED : ReplicaState.RBW;
     }
 
     @Override
@@ -529,7 +528,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
   }
 
   public synchronized void injectBlocks(String bpid,
-      Iterable<Block> injectBlocks) throws IOException {
+      Iterable<? extends Block> injectBlocks) throws IOException {
     ExtendedBlock blk = new ExtendedBlock();
     if (injectBlocks != null) {
       for (Block b: injectBlocks) { // if any blocks in list is bad, reject list
@@ -582,16 +581,16 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
   }
 
   synchronized BlockListAsLongs getBlockReport(String bpid) {
-    final List<Replica> blocks = new ArrayList<Replica>();
+    BlockListAsLongs.Builder report = BlockListAsLongs.builder();
     final Map<Block, BInfo> map = blockMap.get(bpid);
     if (map != null) {
       for (BInfo b : map.values()) {
         if (b.isFinalized()) {
-          blocks.add(b);
+          report.add(b);
         }
       }
     }
-    return new BlockListAsLongs(blocks, null);
+    return report.build();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d324164a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockHasMultipleReplicasOnSameDN.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockHasMultipleReplicasOnSameDN.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockHasMultipleReplicasOnSameDN.java
index 1152c74..3238d6a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockHasMultipleReplicasOnSameDN.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockHasMultipleReplicasOnSameDN.java
@@ -107,17 +107,18 @@ public class TestBlockHasMultipleReplicasOnSameDN {
     StorageBlockReport reports[] =
         new StorageBlockReport[cluster.getStoragesPerDatanode()];
 
-    ArrayList<Block> blocks = new ArrayList<Block>();
+    ArrayList<Replica> blocks = new ArrayList<Replica>();
 
     for (LocatedBlock locatedBlock : locatedBlocks.getLocatedBlocks()) {
-      blocks.add(locatedBlock.getBlock().getLocalBlock());
+      Block localBlock = locatedBlock.getBlock().getLocalBlock();
+      blocks.add(new FinalizedReplica(localBlock, null, null));
     }
 
+    BlockListAsLongs bll = BlockListAsLongs.encode(blocks);
     for (int i = 0; i < cluster.getStoragesPerDatanode(); ++i) {
-      BlockListAsLongs bll = new BlockListAsLongs(blocks);
       FsVolumeSpi v = dn.getFSDataset().getVolumes().get(i);
       DatanodeStorage dns = new DatanodeStorage(v.getStorageID());
-      reports[i] = new StorageBlockReport(dns, bll.getBlockListAsLongs());
+      reports[i] = new StorageBlockReport(dns, bll);
     }
 
     // Should not assert!

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d324164a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java
index ba786d1..9cbad6d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java
@@ -25,11 +25,9 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assume.assumeTrue;
 
 import java.io.File;
-import java.io.FilenameFilter;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.Socket;
-import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -188,7 +186,7 @@ public class TestDataNodeVolumeFailure {
         DatanodeStorage dnStorage = kvPair.getKey();
         BlockListAsLongs blockList = kvPair.getValue();
         reports[reportIndex++] =
-            new StorageBlockReport(dnStorage, blockList.getBlockListAsLongs());
+            new StorageBlockReport(dnStorage, blockList);
     }
     
     cluster.getNameNodeRpc().blockReport(dnR, bpid, reports);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d324164a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDnRespectsBlockReportSplitThreshold.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDnRespectsBlockReportSplitThreshold.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDnRespectsBlockReportSplitThreshold.java
index 7058d71..a5e4d4e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDnRespectsBlockReportSplitThreshold.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDnRespectsBlockReportSplitThreshold.java
@@ -98,7 +98,7 @@ public class TestDnRespectsBlockReportSplitThreshold {
       assertThat(reports.length, is(expectedReportsPerCall));
 
       for (StorageBlockReport report : reports) {
-        BlockListAsLongs blockList = new BlockListAsLongs(report.getBlocks());
+        BlockListAsLongs blockList = report.getBlocks();
         numBlocksReported += blockList.getNumberOfBlocks();
       }
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d324164a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java
index 5a440c4..0865e11 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java
@@ -195,7 +195,7 @@ public class ExternalDatasetImpl implements FsDatasetSpi<ExternalVolumeImpl> {
     final Map<DatanodeStorage, BlockListAsLongs> result =
 	new HashMap<DatanodeStorage, BlockListAsLongs>();
 
-    result.put(storage, new BlockListAsLongs(null, null));
+    result.put(storage, BlockListAsLongs.EMPTY);
     return result;
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d324164a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java
index c11abfc..bc3c6b5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java
@@ -26,6 +26,7 @@ import java.util.EnumSet;
 import java.util.List;
 
 import com.google.common.base.Preconditions;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -36,6 +37,7 @@ import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
+import org.apache.hadoop.hdfs.protocol.BlockListAsLongs.BlockReportReplica;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
@@ -891,9 +893,9 @@ public class NNThroughputBenchmark implements Tool {
     NamespaceInfo nsInfo;
     DatanodeRegistration dnRegistration;
     DatanodeStorage storage; //only one storage 
-    final ArrayList<Block> blocks;
+    final ArrayList<BlockReportReplica> blocks;
     int nrBlocks; // actual number of blocks
-    long[] blockReportList;
+    BlockListAsLongs blockReportList;
     final int dnIdx;
 
     private static int getNodePort(int num) throws IOException {
@@ -904,7 +906,7 @@ public class NNThroughputBenchmark implements Tool {
 
     TinyDatanode(int dnIdx, int blockCapacity) throws IOException {
       this.dnIdx = dnIdx;
-      this.blocks = new ArrayList<Block>(blockCapacity);
+      this.blocks = new ArrayList<BlockReportReplica>(blockCapacity);
       this.nrBlocks = 0;
     }
 
@@ -934,8 +936,7 @@ public class NNThroughputBenchmark implements Tool {
       //first block reports
       storage = new DatanodeStorage(DatanodeStorage.generateUuid());
       final StorageBlockReport[] reports = {
-          new StorageBlockReport(storage,
-              new BlockListAsLongs(null, null).getBlockListAsLongs())
+          new StorageBlockReport(storage, BlockListAsLongs.EMPTY)
       };
       nameNodeProto.blockReport(dnRegistration, 
           nameNode.getNamesystem().getBlockPoolId(), reports);
@@ -968,19 +969,21 @@ public class NNThroughputBenchmark implements Tool {
         }
         return false;
       }
-      blocks.set(nrBlocks, blk);
+      blocks.set(nrBlocks, new BlockReportReplica(blk));
       nrBlocks++;
       return true;
     }
 
     void formBlockReport() {
       // fill remaining slots with blocks that do not exist
-      for(int idx = blocks.size()-1; idx >= nrBlocks; idx--)
-        blocks.set(idx, new Block(blocks.size() - idx, 0, 0));
-      blockReportList = new BlockListAsLongs(blocks).getBlockListAsLongs();
+      for (int idx = blocks.size()-1; idx >= nrBlocks; idx--) {
+        Block block = new Block(blocks.size() - idx, 0, 0);
+        blocks.set(idx, new BlockReportReplica(block));
+      }
+      blockReportList = BlockListAsLongs.EMPTY;
     }
 
-    long[] getBlockReportList() {
+    BlockListAsLongs getBlockReportList() {
       return blockReportList;
     }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d324164a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java
index fb1418a..ee80b33 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
@@ -104,7 +105,7 @@ public class TestDeadDatanode {
     // Ensure blockReport from dead datanode is rejected with IOException
     StorageBlockReport[] report = { new StorageBlockReport(
         new DatanodeStorage(reg.getDatanodeUuid()),
-        new long[] { 0L, 0L, 0L }) };
+        BlockListAsLongs.EMPTY) };
     try {
       dnp.blockReport(reg, poolId, report);
       fail("Expected IOException is not thrown");

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d324164a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImage.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImage.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImage.java
index f7dad18..7b9ea93 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImage.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImage.java
@@ -41,6 +41,7 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
 import org.apache.hadoop.hdfs.server.namenode.LeaseManager.Lease;
 import org.apache.hadoop.hdfs.util.MD5FileUtils;
 import org.apache.hadoop.test.GenericTestUtils;
@@ -219,6 +220,7 @@ public class TestFSImage {
         .manageDataDfsDirs(false)
         .manageNameDfsDirs(false)
         .waitSafeMode(false)
+        .startupOption(StartupOption.UPGRADE)
         .build();
     try {
       FileSystem fs = cluster.getFileSystem();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d324164a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/offlineEditsViewer/TestOfflineEditsViewer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/offlineEditsViewer/TestOfflineEditsViewer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/offlineEditsViewer/TestOfflineEditsViewer.java
index 0e605ac..2ad7b60 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/offlineEditsViewer/TestOfflineEditsViewer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/offlineEditsViewer/TestOfflineEditsViewer.java
@@ -32,6 +32,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes;
+import org.apache.hadoop.hdfs.server.namenode.NameNodeLayoutVersion;
 import org.apache.hadoop.hdfs.server.namenode.OfflineEditsViewerHelper;
 import org.apache.hadoop.hdfs.tools.offlineEditsViewer.OfflineEditsViewer.Flags;
 import org.apache.hadoop.test.PathUtils;
@@ -140,8 +141,8 @@ public class TestOfflineEditsViewer {
     assertEquals(0, runOev(editsReparsed, editsParsedXml2, "xml", false));
 
     // judgment time
-    assertTrue("Test round trip",
-        filesEqualIgnoreTrailingZeros(editsParsedXml, editsParsedXml2));
+    assertTrue("Test round trip", FileUtils.contentEqualsIgnoreEOL(
+        new File(editsParsedXml), new File(editsParsedXml2), "UTF-8"));
 
     os.close();
   }
@@ -238,6 +239,10 @@ public class TestOfflineEditsViewer {
 
     ByteBuffer small = ByteBuffer.wrap(DFSTestUtil.loadFile(filenameSmall));
     ByteBuffer large = ByteBuffer.wrap(DFSTestUtil.loadFile(filenameLarge));
+    // OEV outputs with the latest layout version, so tweak the old file's
+    // contents to have latest version so checkedin binary files don't
+    // require frequent updates
+    small.put(3, (byte)NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
 
     // now correct if it's otherwise
     if (small.capacity() > large.capacity()) {