You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cm...@apache.org on 2015/03/17 21:25:27 UTC
[10/50] [abbrv] hadoop git commit: HDFS-7435. PB encoding of block
reports is very inefficient. Contributed by Daryn Sharp.
HDFS-7435. PB encoding of block reports is very inefficient. Contributed by Daryn Sharp.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/d324164a
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/d324164a
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/d324164a
Branch: refs/heads/HDFS-7836
Commit: d324164a51a43d72c02567248bd9f0f12b244a40
Parents: f446669
Author: Kihwal Lee <ki...@apache.org>
Authored: Fri Mar 13 14:13:55 2015 -0500
Committer: Kihwal Lee <ki...@apache.org>
Committed: Fri Mar 13 14:23:37 2015 -0500
----------------------------------------------------------------------
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 +
.../hadoop/hdfs/protocol/BlockListAsLongs.java | 660 +++++++++++--------
.../DatanodeProtocolClientSideTranslatorPB.java | 22 +-
.../DatanodeProtocolServerSideTranslatorPB.java | 14 +-
.../apache/hadoop/hdfs/protocolPB/PBHelper.java | 6 +-
.../server/blockmanagement/BlockManager.java | 16 +-
.../hdfs/server/datanode/BPServiceActor.java | 13 +-
.../datanode/fsdataset/impl/FsDatasetImpl.java | 20 +-
.../hdfs/server/namenode/NameNodeRpcServer.java | 2 +-
.../server/protocol/DatanodeRegistration.java | 9 +
.../hdfs/server/protocol/NamespaceInfo.java | 52 ++
.../server/protocol/StorageBlockReport.java | 8 +-
.../src/main/proto/DatanodeProtocol.proto | 2 +
.../hadoop-hdfs/src/main/proto/hdfs.proto | 1 +
.../hdfs/protocol/TestBlockListAsLongs.java | 237 +++++++
.../blockmanagement/TestBlockManager.java | 8 +-
.../server/datanode/BlockReportTestBase.java | 27 +-
.../server/datanode/SimulatedFSDataset.java | 11 +-
.../TestBlockHasMultipleReplicasOnSameDN.java | 9 +-
.../datanode/TestDataNodeVolumeFailure.java | 4 +-
...TestDnRespectsBlockReportSplitThreshold.java | 2 +-
.../extdataset/ExternalDatasetImpl.java | 2 +-
.../server/namenode/NNThroughputBenchmark.java | 23 +-
.../hdfs/server/namenode/TestDeadDatanode.java | 3 +-
.../hdfs/server/namenode/TestFSImage.java | 2 +
.../TestOfflineEditsViewer.java | 9 +-
26 files changed, 811 insertions(+), 354 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d324164a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 909182b..ac7e096 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -743,6 +743,9 @@ Release 2.7.0 - UNRELEASED
HDFS-7491. Add incremental blockreport latency to DN metrics.
(Ming Ma via cnauroth)
+ HDFS-7435. PB encoding of block reports is very inefficient.
+ (Daryn Sharp via kihwal)
+
OPTIMIZATIONS
HDFS-7454. Reduce memory footprint for AclEntries in NameNode.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d324164a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/BlockListAsLongs.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/BlockListAsLongs.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/BlockListAsLongs.java
index 4389714..1c89ee4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/BlockListAsLongs.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/BlockListAsLongs.java
@@ -17,342 +17,458 @@
*/
package org.apache.hadoop.hdfs.protocol;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
import java.util.Iterator;
import java.util.List;
-import java.util.Random;
-import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hdfs.protocol.BlockListAsLongs.BlockReportReplica;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
import org.apache.hadoop.hdfs.server.datanode.Replica;
+import com.google.common.base.Preconditions;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.CodedInputStream;
+import com.google.protobuf.CodedOutputStream;
-/**
- * This class provides an interface for accessing list of blocks that
- * has been implemented as long[].
- * This class is useful for block report. Rather than send block reports
- * as a Block[] we can send it as a long[].
- *
- * The structure of the array is as follows:
- * 0: the length of the finalized replica list;
- * 1: the length of the under-construction replica list;
- * - followed by finalized replica list where each replica is represented by
- * 3 longs: one for the blockId, one for the block length, and one for
- * the generation stamp;
- * - followed by the invalid replica represented with three -1s;
- * - followed by the under-construction replica list where each replica is
- * represented by 4 longs: three for the block id, length, generation
- * stamp, and the fourth for the replica state.
- */
@InterfaceAudience.Private
@InterfaceStability.Evolving
-public class BlockListAsLongs implements Iterable<Block> {
+public abstract class BlockListAsLongs implements Iterable<BlockReportReplica> {
+ private final static int CHUNK_SIZE = 64*1024; // 64K
+ private static long[] EMPTY_LONGS = new long[]{0, 0};
+
+ public static BlockListAsLongs EMPTY = new BlockListAsLongs() {
+ @Override
+ public int getNumberOfBlocks() {
+ return 0;
+ }
+ @Override
+ public ByteString getBlocksBuffer() {
+ return ByteString.EMPTY;
+ }
+ @Override
+ public long[] getBlockListAsLongs() {
+ return EMPTY_LONGS;
+ }
+ @Override
+ public Iterator<BlockReportReplica> iterator() {
+ return Collections.emptyIterator();
+ }
+ };
+
/**
- * A finalized block as 3 longs
- * block-id and block length and generation stamp
+ * Prepare an instance to in-place decode the given ByteString buffer
+ * @param numBlocks - blocks in the buffer
+ * @param blocksBuf - ByteString encoded varints
+ * @return BlockListAsLongs
*/
- private static final int LONGS_PER_FINALIZED_BLOCK = 3;
+ public static BlockListAsLongs decodeBuffer(final int numBlocks,
+ final ByteString blocksBuf) {
+ return new BufferDecoder(numBlocks, blocksBuf);
+ }
/**
- * An under-construction block as 4 longs
- * block-id and block length, generation stamp and replica state
+ * Prepare an instance to in-place decode the given ByteString buffers
+ * @param numBlocks - blocks in the buffers
+ * @param blocksBufs - list of ByteString encoded varints
+ * @return BlockListAsLongs
*/
- private static final int LONGS_PER_UC_BLOCK = 4;
-
- /** Number of longs in the header */
- private static final int HEADER_SIZE = 2;
+ public static BlockListAsLongs decodeBuffers(final int numBlocks,
+ final List<ByteString> blocksBufs) {
+ // this doesn't actually copy the data
+ return decodeBuffer(numBlocks, ByteString.copyFrom(blocksBufs));
+ }
/**
- * Returns the index of the first long in blockList
- * belonging to the specified block.
- * The first long contains the block id.
+ * Prepare an instance to in-place decode the given list of Longs. Note
+ * it's much more efficient to decode ByteString buffers and only exists
+ * for compatibility.
+ * @param blocksList - list of longs
+ * @return BlockListAsLongs
*/
- private int index2BlockId(int blockIndex) {
- if(blockIndex < 0 || blockIndex > getNumberOfBlocks())
- return -1;
- int finalizedSize = getNumberOfFinalizedReplicas();
- if(blockIndex < finalizedSize)
- return HEADER_SIZE + blockIndex * LONGS_PER_FINALIZED_BLOCK;
- return HEADER_SIZE + (finalizedSize + 1) * LONGS_PER_FINALIZED_BLOCK
- + (blockIndex - finalizedSize) * LONGS_PER_UC_BLOCK;
+ public static BlockListAsLongs decodeLongs(List<Long> blocksList) {
+ return blocksList.isEmpty() ? EMPTY : new LongsDecoder(blocksList);
}
- private final long[] blockList;
-
/**
- * Create block report from finalized and under construction lists of blocks.
- *
- * @param finalized - list of finalized blocks
- * @param uc - list of under construction blocks
+ * Prepare an instance to encode the collection of replicas into an
+ * efficient ByteString.
+ * @param replicas - replicas to encode
+ * @return BlockListAsLongs
*/
- public BlockListAsLongs(final List<? extends Replica> finalized,
- final List<? extends Replica> uc) {
- int finalizedSize = finalized == null ? 0 : finalized.size();
- int ucSize = uc == null ? 0 : uc.size();
- int len = HEADER_SIZE
- + (finalizedSize + 1) * LONGS_PER_FINALIZED_BLOCK
- + ucSize * LONGS_PER_UC_BLOCK;
-
- blockList = new long[len];
+ public static BlockListAsLongs encode(
+ final Collection<? extends Replica> replicas) {
+ BlockListAsLongs.Builder builder = builder();
+ for (Replica replica : replicas) {
+ builder.add(replica);
+ }
+ return builder.build();
+ }
- // set the header
- blockList[0] = finalizedSize;
- blockList[1] = ucSize;
+ public static Builder builder() {
+ return new BlockListAsLongs.Builder();
+ }
- // set finalized blocks
- for (int i = 0; i < finalizedSize; i++) {
- setBlock(i, finalized.get(i));
- }
+ /**
+ * The number of blocks
+ * @return - the number of blocks
+ */
+ abstract public int getNumberOfBlocks();
- // set invalid delimiting block
- setDelimitingBlock(finalizedSize);
+ /**
+ * Very efficient encoding of the block report into a ByteString to avoid
+ * the overhead of protobuf repeating fields. Primitive repeating fields
+ * require re-allocs of an ArrayList<Long> and the associated (un)boxing
+ * overhead which puts pressure on GC.
+ *
+ * The structure of the buffer is as follows:
+ * - each replica is represented by 4 longs:
+ * blockId, block length, genstamp, replica state
+ *
+ * @return ByteString encoded block report
+ */
+ abstract public ByteString getBlocksBuffer();
- // set under construction blocks
- for (int i = 0; i < ucSize; i++) {
- setBlock(finalizedSize + i, uc.get(i));
+ /**
+ * List of ByteStrings that encode this block report
+ *
+ * @return ByteStrings
+ */
+ public List<ByteString> getBlocksBuffers() {
+ final ByteString blocksBuf = getBlocksBuffer();
+ final List<ByteString> buffers;
+ final int size = blocksBuf.size();
+ if (size <= CHUNK_SIZE) {
+ buffers = Collections.singletonList(blocksBuf);
+ } else {
+ buffers = new ArrayList<ByteString>();
+ for (int pos=0; pos < size; pos += CHUNK_SIZE) {
+ // this doesn't actually copy the data
+ buffers.add(blocksBuf.substring(pos, Math.min(pos+CHUNK_SIZE, size)));
+ }
}
+ return buffers;
}
/**
- * Create block report from a list of finalized blocks. Used by
- * NNThroughputBenchmark.
- *
- * @param blocks - list of finalized blocks
+ * Convert block report to old-style list of longs. Only used to
+ * re-encode the block report when the DN detects an older NN. This is
+ * inefficient, but in practice a DN is unlikely to be upgraded first
+ *
+ * The structure of the array is as follows:
+ * 0: the length of the finalized replica list;
+ * 1: the length of the under-construction replica list;
+ * - followed by finalized replica list where each replica is represented by
+ * 3 longs: one for the blockId, one for the block length, and one for
+ * the generation stamp;
+ * - followed by the invalid replica represented with three -1s;
+ * - followed by the under-construction replica list where each replica is
+ * represented by 4 longs: three for the block id, length, generation
+ * stamp, and the fourth for the replica state.
+ * @return list of longs
*/
- public BlockListAsLongs(final List<? extends Block> blocks) {
- int finalizedSize = blocks == null ? 0 : blocks.size();
- int len = HEADER_SIZE
- + (finalizedSize + 1) * LONGS_PER_FINALIZED_BLOCK;
+ abstract public long[] getBlockListAsLongs();
- blockList = new long[len];
+ /**
+ * Returns a singleton iterator over blocks in the block report. Do not
+ * add the returned blocks to a collection.
+ * @return Iterator
+ */
+ abstract public Iterator<BlockReportReplica> iterator();
- // set the header
- blockList[0] = finalizedSize;
- blockList[1] = 0;
+ public static class Builder {
+ private final ByteString.Output out;
+ private final CodedOutputStream cos;
+ private int numBlocks = 0;
+ private int numFinalized = 0;
- // set finalized blocks
- for (int i = 0; i < finalizedSize; i++) {
- setBlock(i, blocks.get(i));
+ Builder() {
+ out = ByteString.newOutput(64*1024);
+ cos = CodedOutputStream.newInstance(out);
}
- // set invalid delimiting block
- setDelimitingBlock(finalizedSize);
- }
-
- public BlockListAsLongs() {
- this((long[])null);
- }
+ public void add(Replica replica) {
+ try {
+ // zig-zag to reduce size of legacy blocks
+ cos.writeSInt64NoTag(replica.getBlockId());
+ cos.writeRawVarint64(replica.getBytesOnDisk());
+ cos.writeRawVarint64(replica.getGenerationStamp());
+ ReplicaState state = replica.getState();
+ // although state is not a 64-bit value, using a long varint to
+ // allow for future use of the upper bits
+ cos.writeRawVarint64(state.getValue());
+ if (state == ReplicaState.FINALIZED) {
+ numFinalized++;
+ }
+ numBlocks++;
+ } catch (IOException ioe) {
+ // shouldn't happen, ByteString.Output doesn't throw IOE
+ throw new IllegalStateException(ioe);
+ }
+ }
- /**
- * Constructor
- * @param iBlockList - BlockListALongs create from this long[] parameter
- */
- public BlockListAsLongs(final long[] iBlockList) {
- if (iBlockList == null) {
- blockList = new long[HEADER_SIZE];
- return;
+ public int getNumberOfBlocks() {
+ return numBlocks;
+ }
+
+ public BlockListAsLongs build() {
+ try {
+ cos.flush();
+ } catch (IOException ioe) {
+ // shouldn't happen, ByteString.Output doesn't throw IOE
+ throw new IllegalStateException(ioe);
+ }
+ return new BufferDecoder(numBlocks, numFinalized, out.toByteString());
}
- blockList = iBlockList;
}
- public long[] getBlockListAsLongs() {
- return blockList;
- }
+ // decode new-style ByteString buffer based block report
+ private static class BufferDecoder extends BlockListAsLongs {
+ // reserve upper bits for future use. decoding masks off these bits to
+ // allow compatibility for the current through future release that may
+ // start using the bits
+ private static long NUM_BYTES_MASK = (-1L) >>> (64 - 48);
+ private static long REPLICA_STATE_MASK = (-1L) >>> (64 - 4);
- /**
- * Iterates over blocks in the block report.
- * Avoids object allocation on each iteration.
- */
- @InterfaceAudience.Private
- @InterfaceStability.Evolving
- public class BlockReportIterator implements Iterator<Block> {
- private int currentBlockIndex;
- private final Block block;
- private ReplicaState currentReplicaState;
-
- BlockReportIterator() {
- this.currentBlockIndex = 0;
- this.block = new Block();
- this.currentReplicaState = null;
+ private final ByteString buffer;
+ private final int numBlocks;
+ private int numFinalized;
+
+ BufferDecoder(final int numBlocks, final ByteString buf) {
+ this(numBlocks, -1, buf);
}
- @Override
- public boolean hasNext() {
- return currentBlockIndex < getNumberOfBlocks();
+ BufferDecoder(final int numBlocks, final int numFinalized,
+ final ByteString buf) {
+ this.numBlocks = numBlocks;
+ this.numFinalized = numFinalized;
+ this.buffer = buf;
}
@Override
- public Block next() {
- block.set(blockId(currentBlockIndex),
- blockLength(currentBlockIndex),
- blockGenerationStamp(currentBlockIndex));
- currentReplicaState = blockReplicaState(currentBlockIndex);
- currentBlockIndex++;
- return block;
+ public int getNumberOfBlocks() {
+ return numBlocks;
}
@Override
- public void remove() {
- throw new UnsupportedOperationException("Sorry. can't remove.");
+ public ByteString getBlocksBuffer() {
+ return buffer;
}
- /**
- * Get the state of the current replica.
- * The state corresponds to the replica returned
- * by the latest {@link #next()}.
- */
- public ReplicaState getCurrentReplicaState() {
- return currentReplicaState;
+ @Override
+ public long[] getBlockListAsLongs() {
+ // terribly inefficient but only occurs if server tries to transcode
+ // an undecoded buffer into longs - ie. it will never happen but let's
+ // handle it anyway
+ if (numFinalized == -1) {
+ int n = 0;
+ for (Replica replica : this) {
+ if (replica.getState() == ReplicaState.FINALIZED) {
+ n++;
+ }
+ }
+ numFinalized = n;
+ }
+ int numUc = numBlocks - numFinalized;
+ int size = 2 + 3*(numFinalized+1) + 4*(numUc);
+ long[] longs = new long[size];
+ longs[0] = numFinalized;
+ longs[1] = numUc;
+
+ int idx = 2;
+ int ucIdx = idx + 3*numFinalized;
+ // delimiter block
+ longs[ucIdx++] = -1;
+ longs[ucIdx++] = -1;
+ longs[ucIdx++] = -1;
+
+ for (BlockReportReplica block : this) {
+ switch (block.getState()) {
+ case FINALIZED: {
+ longs[idx++] = block.getBlockId();
+ longs[idx++] = block.getNumBytes();
+ longs[idx++] = block.getGenerationStamp();
+ break;
+ }
+ default: {
+ longs[ucIdx++] = block.getBlockId();
+ longs[ucIdx++] = block.getNumBytes();
+ longs[ucIdx++] = block.getGenerationStamp();
+ longs[ucIdx++] = block.getState().getValue();
+ break;
+ }
+ }
+ }
+ return longs;
}
- }
- /**
- * Returns an iterator over blocks in the block report.
- */
- @Override
- public Iterator<Block> iterator() {
- return getBlockReportIterator();
- }
-
- /**
- * Returns {@link BlockReportIterator}.
- */
- public BlockReportIterator getBlockReportIterator() {
- return new BlockReportIterator();
- }
-
- /**
- * The number of blocks
- * @return - the number of blocks
- */
- public int getNumberOfBlocks() {
- assert blockList.length == HEADER_SIZE +
- (blockList[0] + 1) * LONGS_PER_FINALIZED_BLOCK +
- blockList[1] * LONGS_PER_UC_BLOCK :
- "Number of blocks is inconcistent with the array length";
- return getNumberOfFinalizedReplicas() + getNumberOfUCReplicas();
- }
-
- /**
- * Returns the number of finalized replicas in the block report.
- */
- private int getNumberOfFinalizedReplicas() {
- return (int)blockList[0];
- }
-
- /**
- * Returns the number of under construction replicas in the block report.
- */
- private int getNumberOfUCReplicas() {
- return (int)blockList[1];
+ @Override
+ public Iterator<BlockReportReplica> iterator() {
+ return new Iterator<BlockReportReplica>() {
+ final BlockReportReplica block = new BlockReportReplica();
+ final CodedInputStream cis = buffer.newCodedInput();
+ private int currentBlockIndex = 0;
+
+ @Override
+ public boolean hasNext() {
+ return currentBlockIndex < numBlocks;
+ }
+
+ @Override
+ public BlockReportReplica next() {
+ currentBlockIndex++;
+ try {
+ // zig-zag to reduce size of legacy blocks and mask off bits
+ // we don't (yet) understand
+ block.setBlockId(cis.readSInt64());
+ block.setNumBytes(cis.readRawVarint64() & NUM_BYTES_MASK);
+ block.setGenerationStamp(cis.readRawVarint64());
+ long state = cis.readRawVarint64() & REPLICA_STATE_MASK;
+ block.setState(ReplicaState.getState((int)state));
+ } catch (IOException e) {
+ throw new IllegalStateException(e);
+ }
+ return block;
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ };
+ }
}
- /**
- * Returns the id of the specified replica of the block report.
- */
- private long blockId(int index) {
- return blockList[index2BlockId(index)];
- }
+ // decode old style block report of longs
+ private static class LongsDecoder extends BlockListAsLongs {
+ private final List<Long> values;
+ private final int finalizedBlocks;
+ private final int numBlocks;
- /**
- * Returns the length of the specified replica of the block report.
- */
- private long blockLength(int index) {
- return blockList[index2BlockId(index) + 1];
- }
+ // set the header
+ LongsDecoder(List<Long> values) {
+ this.values = values.subList(2, values.size());
+ this.finalizedBlocks = values.get(0).intValue();
+ this.numBlocks = finalizedBlocks + values.get(1).intValue();
+ }
- /**
- * Returns the generation stamp of the specified replica of the block report.
- */
- private long blockGenerationStamp(int index) {
- return blockList[index2BlockId(index) + 2];
- }
+ @Override
+ public int getNumberOfBlocks() {
+ return numBlocks;
+ }
- /**
- * Returns the state of the specified replica of the block report.
- */
- private ReplicaState blockReplicaState(int index) {
- if(index < getNumberOfFinalizedReplicas())
- return ReplicaState.FINALIZED;
- return ReplicaState.getState((int)blockList[index2BlockId(index) + 3]);
- }
+ @Override
+ public ByteString getBlocksBuffer() {
+ Builder builder = builder();
+ for (Replica replica : this) {
+ builder.add(replica);
+ }
+ return builder.build().getBlocksBuffer();
+ }
- /**
- * Corrupt the generation stamp of the block with the given index.
- * Not meant to be used outside of tests.
- */
- @VisibleForTesting
- public long corruptBlockGSForTesting(final int blockIndex, Random rand) {
- long oldGS = blockList[index2BlockId(blockIndex) + 2];
- while (blockList[index2BlockId(blockIndex) + 2] == oldGS) {
- blockList[index2BlockId(blockIndex) + 2] = rand.nextInt();
+ @Override
+ public long[] getBlockListAsLongs() {
+ long[] longs = new long[2+values.size()];
+ longs[0] = finalizedBlocks;
+ longs[1] = numBlocks - finalizedBlocks;
+ for (int i=0; i < longs.length; i++) {
+ longs[i] = values.get(i);
+ }
+ return longs;
}
- return oldGS;
- }
- /**
- * Corrupt the length of the block with the given index by truncation.
- * Not meant to be used outside of tests.
- */
- @VisibleForTesting
- public long corruptBlockLengthForTesting(final int blockIndex, Random rand) {
- long oldLength = blockList[index2BlockId(blockIndex) + 1];
- blockList[index2BlockId(blockIndex) + 1] =
- rand.nextInt((int) oldLength - 1);
- return oldLength;
+ @Override
+ public Iterator<BlockReportReplica> iterator() {
+ return new Iterator<BlockReportReplica>() {
+ private final BlockReportReplica block = new BlockReportReplica();
+ final Iterator<Long> iter = values.iterator();
+ private int currentBlockIndex = 0;
+
+ @Override
+ public boolean hasNext() {
+ return currentBlockIndex < numBlocks;
+ }
+
+ @Override
+ public BlockReportReplica next() {
+ if (currentBlockIndex == finalizedBlocks) {
+ // verify the presence of the delimiter block
+ readBlock();
+ Preconditions.checkArgument(block.getBlockId() == -1 &&
+ block.getNumBytes() == -1 &&
+ block.getGenerationStamp() == -1,
+ "Invalid delimiter block");
+ }
+
+ readBlock();
+ if (currentBlockIndex++ < finalizedBlocks) {
+ block.setState(ReplicaState.FINALIZED);
+ } else {
+ block.setState(ReplicaState.getState(iter.next().intValue()));
+ }
+ return block;
+ }
+
+ private void readBlock() {
+ block.setBlockId(iter.next());
+ block.setNumBytes(iter.next());
+ block.setGenerationStamp(iter.next());
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ };
+ }
}
- /**
- * Set the indexTh block
- * @param index - the index of the block to set
- * @param r - the block is set to the value of the this Replica
- */
- private void setBlock(final int index, final Replica r) {
- int pos = index2BlockId(index);
- blockList[pos] = r.getBlockId();
- blockList[pos + 1] = r.getNumBytes();
- blockList[pos + 2] = r.getGenerationStamp();
- if(index < getNumberOfFinalizedReplicas())
- return;
- assert r.getState() != ReplicaState.FINALIZED :
- "Must be under-construction replica.";
- blockList[pos + 3] = r.getState().getValue();
- }
-
- /**
- * Set the indexTh block
- * @param index - the index of the block to set
- * @param b - the block is set to the value of the this Block
- */
- private void setBlock(final int index, final Block b) {
- int pos = index2BlockId(index);
- blockList[pos] = b.getBlockId();
- blockList[pos + 1] = b.getNumBytes();
- blockList[pos + 2] = b.getGenerationStamp();
- }
-
- /**
- * Set the invalid delimiting block between the finalized and
- * the under-construction lists.
- * The invalid block has all three fields set to -1.
- * @param finalizedSzie - the size of the finalized list
- */
- private void setDelimitingBlock(final int finalizedSzie) {
- int idx = HEADER_SIZE + finalizedSzie * LONGS_PER_FINALIZED_BLOCK;
- blockList[idx] = -1;
- blockList[idx+1] = -1;
- blockList[idx+2] = -1;
- }
-
- public long getMaxGsInBlockList() {
- long maxGs = -1;
- Iterator<Block> iter = getBlockReportIterator();
- while (iter.hasNext()) {
- Block b = iter.next();
- if (b.getGenerationStamp() > maxGs) {
- maxGs = b.getGenerationStamp();
+ @InterfaceAudience.Private
+ public static class BlockReportReplica extends Block implements Replica {
+ private ReplicaState state;
+ private BlockReportReplica() {
+ }
+ public BlockReportReplica(Block block) {
+ super(block);
+ if (block instanceof BlockReportReplica) {
+ this.state = ((BlockReportReplica)block).getState();
+ } else {
+ this.state = ReplicaState.FINALIZED;
}
}
- return maxGs;
+ public void setState(ReplicaState state) {
+ this.state = state;
+ }
+ @Override
+ public ReplicaState getState() {
+ return state;
+ }
+ @Override
+ public long getBytesOnDisk() {
+ return getNumBytes();
+ }
+ @Override
+ public long getVisibleLength() {
+ throw new UnsupportedOperationException();
+ }
+ @Override
+ public String getStorageUuid() {
+ throw new UnsupportedOperationException();
+ }
+ @Override
+ public boolean isOnTransientStorage() {
+ throw new UnsupportedOperationException();
+ }
+ @Override
+ public boolean equals(Object o) {
+ return super.equals(o);
+ }
+ @Override
+ public int hashCode() {
+ return super.hashCode();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d324164a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java
index 192916f..c4003f1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java
@@ -26,6 +26,7 @@ import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
@@ -51,6 +52,7 @@ import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
+import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo.Capability;
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
@@ -64,6 +66,7 @@ import org.apache.hadoop.ipc.RpcClientUtil;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.UserGroupInformation;
+import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
@@ -83,6 +86,11 @@ public class DatanodeProtocolClientSideTranslatorPB implements
VersionRequestProto.newBuilder().build();
private final static RpcController NULL_CONTROLLER = null;
+ @VisibleForTesting
+ public DatanodeProtocolClientSideTranslatorPB(DatanodeProtocolPB rpcProxy) {
+ this.rpcProxy = rpcProxy;
+ }
+
public DatanodeProtocolClientSideTranslatorPB(InetSocketAddress nameNodeAddr,
Configuration conf) throws IOException {
RPC.setProtocolEngine(conf, DatanodeProtocolPB.class,
@@ -166,12 +174,20 @@ public class DatanodeProtocolClientSideTranslatorPB implements
.newBuilder().setRegistration(PBHelper.convert(registration))
.setBlockPoolId(poolId);
+ boolean useBlocksBuffer = registration.getNamespaceInfo()
+ .isCapabilitySupported(Capability.STORAGE_BLOCK_REPORT_BUFFERS);
+
for (StorageBlockReport r : reports) {
StorageBlockReportProto.Builder reportBuilder = StorageBlockReportProto
.newBuilder().setStorage(PBHelper.convert(r.getStorage()));
- long[] blocks = r.getBlocks();
- for (int i = 0; i < blocks.length; i++) {
- reportBuilder.addBlocks(blocks[i]);
+ BlockListAsLongs blocks = r.getBlocks();
+ if (useBlocksBuffer) {
+ reportBuilder.setNumberOfBlocks(blocks.getNumberOfBlocks());
+ reportBuilder.addAllBlocksBuffers(blocks.getBlocksBuffers());
+ } else {
+ for (long value : blocks.getBlockListAsLongs()) {
+ reportBuilder.addBlocks(value);
+ }
}
builder.addReports(reportBuilder.build());
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d324164a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java
index 1a89090..e18081f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hdfs.protocolPB;
import java.io.IOException;
import java.util.List;
+import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.RollingUpgradeStatus;
@@ -58,6 +59,7 @@ import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
+import com.google.common.base.Preconditions;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
@@ -145,10 +147,14 @@ public class DatanodeProtocolServerSideTranslatorPB implements
int index = 0;
for (StorageBlockReportProto s : request.getReportsList()) {
- List<Long> blockIds = s.getBlocksList();
- long[] blocks = new long[blockIds.size()];
- for (int i = 0; i < blockIds.size(); i++) {
- blocks[i] = blockIds.get(i);
+ final BlockListAsLongs blocks;
+ if (s.hasNumberOfBlocks()) { // new style buffer based reports
+ int num = (int)s.getNumberOfBlocks();
+ Preconditions.checkState(s.getBlocksCount() == 0,
+ "cannot send both blocks list and buffers");
+ blocks = BlockListAsLongs.decodeBuffers(num, s.getBlocksBuffersList());
+ } else {
+ blocks = BlockListAsLongs.decodeLongs(s.getBlocksList());
}
report[index++] = new StorageBlockReport(PBHelper.convert(s.getStorage()),
blocks);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d324164a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
index ee1603c..c428c2b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
@@ -573,7 +573,7 @@ public class PBHelper {
StorageInfoProto storage = info.getStorageInfo();
return new NamespaceInfo(storage.getNamespceID(), storage.getClusterID(),
info.getBlockPoolID(), storage.getCTime(), info.getBuildVersion(),
- info.getSoftwareVersion());
+ info.getSoftwareVersion(), info.getCapabilities());
}
public static NamenodeCommand convert(NamenodeCommandProto cmd) {
@@ -1233,7 +1233,9 @@ public class PBHelper {
.setBuildVersion(info.getBuildVersion())
.setUnused(0)
.setStorageInfo(PBHelper.convert((StorageInfo)info))
- .setSoftwareVersion(info.getSoftwareVersion()).build();
+ .setSoftwareVersion(info.getSoftwareVersion())
+ .setCapabilities(info.getCapabilities())
+ .build();
}
// Located Block Arrays and Lists
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d324164a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index c1a3e05..f155375 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -47,7 +47,7 @@ import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.HAUtil;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
-import org.apache.hadoop.hdfs.protocol.BlockListAsLongs.BlockReportIterator;
+import org.apache.hadoop.hdfs.protocol.BlockListAsLongs.BlockReportReplica;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
@@ -1968,11 +1968,9 @@ public class BlockManager {
if (report == null) return;
assert (namesystem.hasWriteLock());
assert (storageInfo.numBlocks() == 0);
- BlockReportIterator itBR = report.getBlockReportIterator();
- while(itBR.hasNext()) {
- Block iblk = itBR.next();
- ReplicaState reportedState = itBR.getCurrentReplicaState();
+ for (BlockReportReplica iblk : report) {
+ ReplicaState reportedState = iblk.getState();
if (shouldPostponeBlocksFromFuture &&
namesystem.isGenStampInFuture(iblk)) {
@@ -2042,13 +2040,11 @@ public class BlockManager {
int curIndex;
if (newReport == null) {
- newReport = new BlockListAsLongs();
+ newReport = BlockListAsLongs.EMPTY;
}
// scan the report and process newly reported blocks
- BlockReportIterator itBR = newReport.getBlockReportIterator();
- while(itBR.hasNext()) {
- Block iblk = itBR.next();
- ReplicaState iState = itBR.getCurrentReplicaState();
+ for (BlockReportReplica iblk : newReport) {
+ ReplicaState iState = iblk.getState();
BlockInfoContiguous storedBlock = processReportedBlock(storageInfo,
iblk, iState, toAdd, toInvalidate, toCorrupt, toUC);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d324164a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
index 3ba2f54..3c20f6b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
@@ -228,7 +228,7 @@ class BPServiceActor implements Runnable {
bpos.verifyAndSetNamespaceInfo(nsInfo);
// Second phase of the handshake with the NN.
- register();
+ register(nsInfo);
}
// This is useful to make sure NN gets Heartbeat before Blockreport
@@ -468,8 +468,7 @@ class BPServiceActor implements Runnable {
for(Map.Entry<DatanodeStorage, BlockListAsLongs> kvPair : perVolumeBlockLists.entrySet()) {
BlockListAsLongs blockList = kvPair.getValue();
- reports[i++] = new StorageBlockReport(
- kvPair.getKey(), blockList.getBlockListAsLongs());
+ reports[i++] = new StorageBlockReport(kvPair.getKey(), blockList);
totalBlockCount += blockList.getNumberOfBlocks();
}
@@ -774,10 +773,11 @@ class BPServiceActor implements Runnable {
*
* issued by the namenode to recognize registered datanodes.
*
+ * @param nsInfo current NamespaceInfo
* @see FSNamesystem#registerDatanode(DatanodeRegistration)
* @throws IOException
*/
- void register() throws IOException {
+ void register(NamespaceInfo nsInfo) throws IOException {
// The handshake() phase loaded the block pool storage
// off disk - so update the bpRegistration object from that info
bpRegistration = bpos.createRegistration();
@@ -788,6 +788,7 @@ class BPServiceActor implements Runnable {
try {
// Use returned registration from namenode with updated fields
bpRegistration = bpNamenode.registerDatanode(bpRegistration);
+ bpRegistration.setNamespaceInfo(nsInfo);
break;
} catch(EOFException e) { // namenode might have just restarted
LOG.info("Problem connecting to server: " + nnAddr + " :"
@@ -915,9 +916,9 @@ class BPServiceActor implements Runnable {
if (shouldRun()) {
// re-retrieve namespace info to make sure that, if the NN
// was restarted, we still match its version (HDFS-2120)
- retrieveNamespaceInfo();
+ NamespaceInfo nsInfo = retrieveNamespaceInfo();
// and re-register
- register();
+ register(nsInfo);
scheduleHeartbeat();
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d324164a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
index 486acbc..d42c00c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
@@ -1575,30 +1575,26 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
Map<DatanodeStorage, BlockListAsLongs> blockReportsMap =
new HashMap<DatanodeStorage, BlockListAsLongs>();
- Map<String, ArrayList<ReplicaInfo>> finalized =
- new HashMap<String, ArrayList<ReplicaInfo>>();
- Map<String, ArrayList<ReplicaInfo>> uc =
- new HashMap<String, ArrayList<ReplicaInfo>>();
+ Map<String, BlockListAsLongs.Builder> builders =
+ new HashMap<String, BlockListAsLongs.Builder>();
List<FsVolumeImpl> curVolumes = getVolumes();
for (FsVolumeSpi v : curVolumes) {
- finalized.put(v.getStorageID(), new ArrayList<ReplicaInfo>());
- uc.put(v.getStorageID(), new ArrayList<ReplicaInfo>());
+ builders.put(v.getStorageID(), BlockListAsLongs.builder());
}
synchronized(this) {
for (ReplicaInfo b : volumeMap.replicas(bpid)) {
switch(b.getState()) {
case FINALIZED:
- finalized.get(b.getVolume().getStorageID()).add(b);
- break;
case RBW:
case RWR:
- uc.get(b.getVolume().getStorageID()).add(b);
+ builders.get(b.getVolume().getStorageID()).add(b);
break;
case RUR:
ReplicaUnderRecovery rur = (ReplicaUnderRecovery)b;
- uc.get(rur.getVolume().getStorageID()).add(rur.getOriginalReplica());
+ builders.get(rur.getVolume().getStorageID())
+ .add(rur.getOriginalReplica());
break;
case TEMPORARY:
break;
@@ -1609,10 +1605,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
}
for (FsVolumeImpl v : curVolumes) {
- ArrayList<ReplicaInfo> finalizedList = finalized.get(v.getStorageID());
- ArrayList<ReplicaInfo> ucList = uc.get(v.getStorageID());
blockReportsMap.put(v.toDatanodeStorage(),
- new BlockListAsLongs(finalizedList, ucList));
+ builders.get(v.getStorageID()).build());
}
return blockReportsMap;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d324164a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
index f20fb35..059bd28 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
@@ -1302,7 +1302,7 @@ class NameNodeRpcServer implements NamenodeProtocols {
final BlockManager bm = namesystem.getBlockManager();
boolean noStaleStorages = false;
for(StorageBlockReport r : reports) {
- final BlockListAsLongs blocks = new BlockListAsLongs(r.getBlocks());
+ final BlockListAsLongs blocks = r.getBlocks();
//
// BlockManager.processReport accumulates information of prior calls
// for the same node and storage, so the value returned by the last
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d324164a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeRegistration.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeRegistration.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeRegistration.java
index 9db2fca..e788137 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeRegistration.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeRegistration.java
@@ -40,6 +40,7 @@ public class DatanodeRegistration extends DatanodeID
private final StorageInfo storageInfo;
private ExportedBlockKeys exportedKeys;
private final String softwareVersion;
+ private NamespaceInfo nsInfo;
@VisibleForTesting
public DatanodeRegistration(String uuid, DatanodeRegistration dnr) {
@@ -77,6 +78,14 @@ public class DatanodeRegistration extends DatanodeID
public int getVersion() {
return storageInfo.getLayoutVersion();
}
+
+ public void setNamespaceInfo(NamespaceInfo nsInfo) {
+ this.nsInfo = nsInfo;
+ }
+
+ public NamespaceInfo getNamespaceInfo() {
+ return nsInfo;
+ }
@Override // NodeRegistration
public String getRegistrationID() {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d324164a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NamespaceInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NamespaceInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NamespaceInfo.java
index 0733743..a7439a0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NamespaceInfo.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NamespaceInfo.java
@@ -29,6 +29,9 @@ import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NodeType;
import org.apache.hadoop.hdfs.server.namenode.NNStorage;
import org.apache.hadoop.util.VersionInfo;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+
/**
* NamespaceInfo is returned by the name-node in reply
* to a data-node handshake.
@@ -40,19 +43,52 @@ public class NamespaceInfo extends StorageInfo {
final String buildVersion;
String blockPoolID = ""; // id of the block pool
String softwareVersion;
+ long capabilities;
+
+ // only authoritative on the server-side to determine advertisement to
+ // clients. enum will update the supported values
+ private static long CAPABILITIES_SUPPORTED = 0;
+
+ public enum Capability {
+ UNKNOWN(false),
+ STORAGE_BLOCK_REPORT_BUFFERS(true); // use optimized ByteString buffers
+ private final long mask;
+ Capability(boolean isSupported) {
+ int bits = ordinal() - 1;
+ mask = (bits < 0) ? 0 : (1L << bits);
+ if (isSupported) {
+ CAPABILITIES_SUPPORTED |= mask;
+ }
+ }
+ public long getMask() {
+ return mask;
+ }
+ }
+ // defaults to enabled capabilites since this ctor is for server
public NamespaceInfo() {
super(NodeType.NAME_NODE);
buildVersion = null;
+ capabilities = CAPABILITIES_SUPPORTED;
}
+ // defaults to enabled capabilites since this ctor is for server
public NamespaceInfo(int nsID, String clusterID, String bpID,
long cT, String buildVersion, String softwareVersion) {
+ this(nsID, clusterID, bpID, cT, buildVersion, softwareVersion,
+ CAPABILITIES_SUPPORTED);
+ }
+
+ // for use by server and/or client
+ public NamespaceInfo(int nsID, String clusterID, String bpID,
+ long cT, String buildVersion, String softwareVersion,
+ long capabilities) {
super(HdfsConstants.NAMENODE_LAYOUT_VERSION, nsID, clusterID, cT,
NodeType.NAME_NODE);
blockPoolID = bpID;
this.buildVersion = buildVersion;
this.softwareVersion = softwareVersion;
+ this.capabilities = capabilities;
}
public NamespaceInfo(int nsID, String clusterID, String bpID,
@@ -61,6 +97,22 @@ public class NamespaceInfo extends StorageInfo {
VersionInfo.getVersion());
}
+ public long getCapabilities() {
+ return capabilities;
+ }
+
+ @VisibleForTesting
+ public void setCapabilities(long capabilities) {
+ this.capabilities = capabilities;
+ }
+
+ public boolean isCapabilitySupported(Capability capability) {
+ Preconditions.checkArgument(capability != Capability.UNKNOWN,
+ "cannot test for unknown capability");
+ long mask = capability.getMask();
+ return (capabilities & mask) == mask;
+ }
+
public String getBuildVersion() {
return buildVersion;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d324164a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/StorageBlockReport.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/StorageBlockReport.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/StorageBlockReport.java
index 1521693..4ef5ebc 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/StorageBlockReport.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/StorageBlockReport.java
@@ -18,14 +18,16 @@
package org.apache.hadoop.hdfs.server.protocol;
+import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
+
/**
* Block report for a Datanode storage
*/
public class StorageBlockReport {
private final DatanodeStorage storage;
- private final long[] blocks;
+ private final BlockListAsLongs blocks;
- public StorageBlockReport(DatanodeStorage storage, long[] blocks) {
+ public StorageBlockReport(DatanodeStorage storage, BlockListAsLongs blocks) {
this.storage = storage;
this.blocks = blocks;
}
@@ -34,7 +36,7 @@ public class StorageBlockReport {
return storage;
}
- public long[] getBlocks() {
+ public BlockListAsLongs getBlocks() {
return blocks;
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d324164a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto
index 348f346..7b3a4a9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto
@@ -237,6 +237,8 @@ message BlockReportRequestProto {
message StorageBlockReportProto {
required DatanodeStorageProto storage = 1; // Storage
repeated uint64 blocks = 2 [packed=true];
+ optional uint64 numberOfBlocks = 3;
+ repeated bytes blocksBuffers = 4;
}
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d324164a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto
index 97906b1..31e5585 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto
@@ -517,6 +517,7 @@ message NamespaceInfoProto {
required string blockPoolID = 3; // block pool used by the namespace
required StorageInfoProto storageInfo = 4;// Node information
required string softwareVersion = 5; // Software version number (e.g. 2.0.0)
+ optional uint64 capabilities = 6 [default = 0]; // feature flags
}
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d324164a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocol/TestBlockListAsLongs.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocol/TestBlockListAsLongs.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocol/TestBlockListAsLongs.java
new file mode 100644
index 0000000..bebde18
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocol/TestBlockListAsLongs.java
@@ -0,0 +1,237 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.protocol;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.protocol.BlockListAsLongs.BlockReportReplica;
+import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockReportRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockReportResponseProto;
+import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
+import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolPB;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
+import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica;
+import org.apache.hadoop.hdfs.server.datanode.Replica;
+import org.apache.hadoop.hdfs.server.datanode.ReplicaBeingWritten;
+import org.apache.hadoop.hdfs.server.datanode.ReplicaWaitingToBeRecovered;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
+import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
+import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo.Capability;
+import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import com.google.protobuf.ByteString;
+import com.google.protobuf.RpcController;
+import com.google.protobuf.ServiceException;
+
+public class TestBlockListAsLongs {
+ static Block b1 = new Block(1, 11, 111);
+ static Block b2 = new Block(2, 22, 222);
+ static Block b3 = new Block(3, 33, 333);
+ static Block b4 = new Block(4, 44, 444);
+
+ @Test
+ public void testEmptyReport() {
+ BlockListAsLongs blocks = checkReport();
+ assertArrayEquals(
+ new long[] {
+ 0, 0,
+ -1, -1, -1 },
+ blocks.getBlockListAsLongs());
+ }
+
+ @Test
+ public void testFinalized() {
+ BlockListAsLongs blocks = checkReport(
+ new FinalizedReplica(b1, null, null));
+ assertArrayEquals(
+ new long[] {
+ 1, 0,
+ 1, 11, 111,
+ -1, -1, -1 },
+ blocks.getBlockListAsLongs());
+ }
+
+ @Test
+ public void testUc() {
+ BlockListAsLongs blocks = checkReport(
+ new ReplicaBeingWritten(b1, null, null, null));
+ assertArrayEquals(
+ new long[] {
+ 0, 1,
+ -1, -1, -1,
+ 1, 11, 111, ReplicaState.RBW.getValue() },
+ blocks.getBlockListAsLongs());
+ }
+
+ @Test
+ public void testMix() {
+ BlockListAsLongs blocks = checkReport(
+ new FinalizedReplica(b1, null, null),
+ new FinalizedReplica(b2, null, null),
+ new ReplicaBeingWritten(b3, null, null, null),
+ new ReplicaWaitingToBeRecovered(b4, null, null));
+ assertArrayEquals(
+ new long[] {
+ 2, 2,
+ 1, 11, 111,
+ 2, 22, 222,
+ -1, -1, -1,
+ 3, 33, 333, ReplicaState.RBW.getValue(),
+ 4, 44, 444, ReplicaState.RWR.getValue() },
+ blocks.getBlockListAsLongs());
+ }
+
+ @Test
+ public void testFuzz() throws InterruptedException {
+ Replica[] replicas = new Replica[100000];
+ Random rand = new Random(0);
+ for (int i=0; i<replicas.length; i++) {
+ Block b = new Block(rand.nextLong(), i, i<<4);
+ switch (rand.nextInt(2)) {
+ case 0:
+ replicas[i] = new FinalizedReplica(b, null, null);
+ break;
+ case 1:
+ replicas[i] = new ReplicaBeingWritten(b, null, null, null);
+ break;
+ case 2:
+ replicas[i] = new ReplicaWaitingToBeRecovered(b, null, null);
+ break;
+ }
+ }
+ checkReport(replicas);
+ }
+
+ private BlockListAsLongs checkReport(Replica...replicas) {
+ Map<Long, Replica> expectedReplicas = new HashMap<>();
+ for (Replica replica : replicas) {
+ expectedReplicas.put(replica.getBlockId(), replica);
+ }
+ expectedReplicas = Collections.unmodifiableMap(expectedReplicas);
+
+ // encode the blocks and extract the buffers
+ BlockListAsLongs blocks =
+ BlockListAsLongs.encode(expectedReplicas.values());
+ List<ByteString> buffers = blocks.getBlocksBuffers();
+
+ // convert to old-style list of longs
+ List<Long> longs = new ArrayList<Long>();
+ for (long value : blocks.getBlockListAsLongs()) {
+ longs.add(value);
+ }
+
+ // decode the buffers and verify its contents
+ BlockListAsLongs decodedBlocks =
+ BlockListAsLongs.decodeBuffers(expectedReplicas.size(), buffers);
+ checkReplicas(expectedReplicas, decodedBlocks);
+
+ // decode the long and verify its contents
+ BlockListAsLongs decodedList = BlockListAsLongs.decodeLongs(longs);
+ checkReplicas(expectedReplicas, decodedList);
+ return blocks;
+ }
+
+ private void checkReplicas(Map<Long,Replica> expectedReplicas,
+ BlockListAsLongs decodedBlocks) {
+ assertEquals(expectedReplicas.size(), decodedBlocks.getNumberOfBlocks());
+
+ Map<Long, Replica> reportReplicas = new HashMap<>(expectedReplicas);
+ for (BlockReportReplica replica : decodedBlocks) {
+ assertNotNull(replica);
+ Replica expected = reportReplicas.remove(replica.getBlockId());
+ assertNotNull(expected);
+ assertEquals("wrong bytes",
+ expected.getNumBytes(), replica.getNumBytes());
+ assertEquals("wrong genstamp",
+ expected.getGenerationStamp(), replica.getGenerationStamp());
+ assertEquals("wrong replica state",
+ expected.getState(), replica.getState());
+ }
+ assertTrue(reportReplicas.isEmpty());
+ }
+
+ @Test
+ public void testDatanodeDetect() throws ServiceException, IOException {
+ final AtomicReference<BlockReportRequestProto> request =
+ new AtomicReference<>();
+
+ // just capture the outgoing PB
+ DatanodeProtocolPB mockProxy = mock(DatanodeProtocolPB.class);
+ doAnswer(new Answer<BlockReportResponseProto>() {
+ public BlockReportResponseProto answer(InvocationOnMock invocation) {
+ Object[] args = invocation.getArguments();
+ request.set((BlockReportRequestProto) args[1]);
+ return BlockReportResponseProto.newBuilder().build();
+ }
+ }).when(mockProxy).blockReport(any(RpcController.class),
+ any(BlockReportRequestProto.class));
+
+ @SuppressWarnings("resource")
+ DatanodeProtocolClientSideTranslatorPB nn =
+ new DatanodeProtocolClientSideTranslatorPB(mockProxy);
+
+ DatanodeRegistration reg = DFSTestUtil.getLocalDatanodeRegistration();
+ NamespaceInfo nsInfo = new NamespaceInfo(1, "cluster", "bp", 1);
+ reg.setNamespaceInfo(nsInfo);
+
+ Replica r = new FinalizedReplica(new Block(1, 2, 3), null, null);
+ BlockListAsLongs bbl = BlockListAsLongs.encode(Collections.singleton(r));
+ DatanodeStorage storage = new DatanodeStorage("s1");
+ StorageBlockReport[] sbr = { new StorageBlockReport(storage, bbl) };
+
+ // check DN sends new-style BR
+ request.set(null);
+ nsInfo.setCapabilities(Capability.STORAGE_BLOCK_REPORT_BUFFERS.getMask());
+ nn.blockReport(reg, "pool", sbr);
+ BlockReportRequestProto proto = request.get();
+ assertNotNull(proto);
+ assertTrue(proto.getReports(0).getBlocksList().isEmpty());
+ assertFalse(proto.getReports(0).getBlocksBuffersList().isEmpty());
+
+ // back up to prior version and check DN sends old-style BR
+ request.set(null);
+ nsInfo.setCapabilities(Capability.UNKNOWN.getMask());
+ nn.blockReport(reg, "pool", sbr);
+ proto = request.get();
+ assertNotNull(proto);
+ assertFalse(proto.getReports(0).getBlocksList().isEmpty());
+ assertTrue(proto.getReports(0).getBlocksBuffersList().isEmpty());
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d324164a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java
index 6d67c7d..d9ac9e5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java
@@ -555,12 +555,12 @@ public class TestBlockManager {
reset(node);
bm.processReport(node, new DatanodeStorage(ds.getStorageID()),
- new BlockListAsLongs(null, null));
+ BlockListAsLongs.EMPTY);
assertEquals(1, ds.getBlockReportCount());
// send block report again, should NOT be processed
reset(node);
bm.processReport(node, new DatanodeStorage(ds.getStorageID()),
- new BlockListAsLongs(null, null));
+ BlockListAsLongs.EMPTY);
assertEquals(1, ds.getBlockReportCount());
// re-register as if node restarted, should update existing node
@@ -571,7 +571,7 @@ public class TestBlockManager {
// send block report, should be processed after restart
reset(node);
bm.processReport(node, new DatanodeStorage(ds.getStorageID()),
- new BlockListAsLongs(null, null));
+ BlockListAsLongs.EMPTY);
// Reinitialize as registration with empty storage list pruned
// node.storageMap.
ds = node.getStorageInfos()[0];
@@ -600,7 +600,7 @@ public class TestBlockManager {
reset(node);
doReturn(1).when(node).numBlocks();
bm.processReport(node, new DatanodeStorage(ds.getStorageID()),
- new BlockListAsLongs(null, null));
+ BlockListAsLongs.EMPTY);
assertEquals(1, ds.getBlockReportCount());
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d324164a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/BlockReportTestBase.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/BlockReportTestBase.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/BlockReportTestBase.java
index 8d9de7b..37c503c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/BlockReportTestBase.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/BlockReportTestBase.java
@@ -46,6 +46,7 @@ import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
+import org.apache.hadoop.hdfs.protocol.BlockListAsLongs.BlockReportReplica;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
@@ -146,22 +147,32 @@ public abstract class BlockReportTestBase {
// Walk the list of blocks until we find one each to corrupt the
// generation stamp and length, if so requested.
- for (int i = 0; i < blockList.getNumberOfBlocks(); ++i) {
+ BlockListAsLongs.Builder builder = BlockListAsLongs.builder();
+ for (BlockReportReplica block : blockList) {
if (corruptOneBlockGs && !corruptedGs) {
- blockList.corruptBlockGSForTesting(i, rand);
- LOG.info("Corrupted the GS for block ID " + i);
+ long gsOld = block.getGenerationStamp();
+ long gsNew;
+ do {
+ gsNew = rand.nextInt();
+ } while (gsNew == gsOld);
+ block.setGenerationStamp(gsNew);
+ LOG.info("Corrupted the GS for block ID " + block);
corruptedGs = true;
} else if (corruptOneBlockLen && !corruptedLen) {
- blockList.corruptBlockLengthForTesting(i, rand);
- LOG.info("Corrupted the length for block ID " + i);
+ long lenOld = block.getNumBytes();
+ long lenNew;
+ do {
+ lenNew = rand.nextInt((int)lenOld - 1);
+ } while (lenNew == lenOld);
+ block.setNumBytes(lenNew);
+ LOG.info("Corrupted the length for block ID " + block);
corruptedLen = true;
- } else {
- break;
}
+ builder.add(new BlockReportReplica(block));
}
reports[reportIndex++] =
- new StorageBlockReport(dnStorage, blockList.getBlockListAsLongs());
+ new StorageBlockReport(dnStorage, builder.build());
}
return reports;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d324164a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
index a4ec8d5..5c7b4ac 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
@@ -52,7 +52,6 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams;
-import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl;
import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
@@ -271,7 +270,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
@Override
public ReplicaState getState() {
- return null;
+ return finalized ? ReplicaState.FINALIZED : ReplicaState.RBW;
}
@Override
@@ -529,7 +528,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
}
public synchronized void injectBlocks(String bpid,
- Iterable<Block> injectBlocks) throws IOException {
+ Iterable<? extends Block> injectBlocks) throws IOException {
ExtendedBlock blk = new ExtendedBlock();
if (injectBlocks != null) {
for (Block b: injectBlocks) { // if any blocks in list is bad, reject list
@@ -582,16 +581,16 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
}
synchronized BlockListAsLongs getBlockReport(String bpid) {
- final List<Replica> blocks = new ArrayList<Replica>();
+ BlockListAsLongs.Builder report = BlockListAsLongs.builder();
final Map<Block, BInfo> map = blockMap.get(bpid);
if (map != null) {
for (BInfo b : map.values()) {
if (b.isFinalized()) {
- blocks.add(b);
+ report.add(b);
}
}
}
- return new BlockListAsLongs(blocks, null);
+ return report.build();
}
@Override
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d324164a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockHasMultipleReplicasOnSameDN.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockHasMultipleReplicasOnSameDN.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockHasMultipleReplicasOnSameDN.java
index 1152c74..3238d6a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockHasMultipleReplicasOnSameDN.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockHasMultipleReplicasOnSameDN.java
@@ -107,17 +107,18 @@ public class TestBlockHasMultipleReplicasOnSameDN {
StorageBlockReport reports[] =
new StorageBlockReport[cluster.getStoragesPerDatanode()];
- ArrayList<Block> blocks = new ArrayList<Block>();
+ ArrayList<Replica> blocks = new ArrayList<Replica>();
for (LocatedBlock locatedBlock : locatedBlocks.getLocatedBlocks()) {
- blocks.add(locatedBlock.getBlock().getLocalBlock());
+ Block localBlock = locatedBlock.getBlock().getLocalBlock();
+ blocks.add(new FinalizedReplica(localBlock, null, null));
}
+ BlockListAsLongs bll = BlockListAsLongs.encode(blocks);
for (int i = 0; i < cluster.getStoragesPerDatanode(); ++i) {
- BlockListAsLongs bll = new BlockListAsLongs(blocks);
FsVolumeSpi v = dn.getFSDataset().getVolumes().get(i);
DatanodeStorage dns = new DatanodeStorage(v.getStorageID());
- reports[i] = new StorageBlockReport(dns, bll.getBlockListAsLongs());
+ reports[i] = new StorageBlockReport(dns, bll);
}
// Should not assert!
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d324164a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java
index ba786d1..9cbad6d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java
@@ -25,11 +25,9 @@ import static org.junit.Assert.assertTrue;
import static org.junit.Assume.assumeTrue;
import java.io.File;
-import java.io.FilenameFilter;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
-import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -188,7 +186,7 @@ public class TestDataNodeVolumeFailure {
DatanodeStorage dnStorage = kvPair.getKey();
BlockListAsLongs blockList = kvPair.getValue();
reports[reportIndex++] =
- new StorageBlockReport(dnStorage, blockList.getBlockListAsLongs());
+ new StorageBlockReport(dnStorage, blockList);
}
cluster.getNameNodeRpc().blockReport(dnR, bpid, reports);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d324164a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDnRespectsBlockReportSplitThreshold.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDnRespectsBlockReportSplitThreshold.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDnRespectsBlockReportSplitThreshold.java
index 7058d71..a5e4d4e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDnRespectsBlockReportSplitThreshold.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDnRespectsBlockReportSplitThreshold.java
@@ -98,7 +98,7 @@ public class TestDnRespectsBlockReportSplitThreshold {
assertThat(reports.length, is(expectedReportsPerCall));
for (StorageBlockReport report : reports) {
- BlockListAsLongs blockList = new BlockListAsLongs(report.getBlocks());
+ BlockListAsLongs blockList = report.getBlocks();
numBlocksReported += blockList.getNumberOfBlocks();
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d324164a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java
index 5a440c4..0865e11 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java
@@ -195,7 +195,7 @@ public class ExternalDatasetImpl implements FsDatasetSpi<ExternalVolumeImpl> {
final Map<DatanodeStorage, BlockListAsLongs> result =
new HashMap<DatanodeStorage, BlockListAsLongs>();
- result.put(storage, new BlockListAsLongs(null, null));
+ result.put(storage, BlockListAsLongs.EMPTY);
return result;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d324164a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java
index c11abfc..bc3c6b5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java
@@ -26,6 +26,7 @@ import java.util.EnumSet;
import java.util.List;
import com.google.common.base.Preconditions;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -36,6 +37,7 @@ import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
+import org.apache.hadoop.hdfs.protocol.BlockListAsLongs.BlockReportReplica;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
@@ -891,9 +893,9 @@ public class NNThroughputBenchmark implements Tool {
NamespaceInfo nsInfo;
DatanodeRegistration dnRegistration;
DatanodeStorage storage; //only one storage
- final ArrayList<Block> blocks;
+ final ArrayList<BlockReportReplica> blocks;
int nrBlocks; // actual number of blocks
- long[] blockReportList;
+ BlockListAsLongs blockReportList;
final int dnIdx;
private static int getNodePort(int num) throws IOException {
@@ -904,7 +906,7 @@ public class NNThroughputBenchmark implements Tool {
TinyDatanode(int dnIdx, int blockCapacity) throws IOException {
this.dnIdx = dnIdx;
- this.blocks = new ArrayList<Block>(blockCapacity);
+ this.blocks = new ArrayList<BlockReportReplica>(blockCapacity);
this.nrBlocks = 0;
}
@@ -934,8 +936,7 @@ public class NNThroughputBenchmark implements Tool {
//first block reports
storage = new DatanodeStorage(DatanodeStorage.generateUuid());
final StorageBlockReport[] reports = {
- new StorageBlockReport(storage,
- new BlockListAsLongs(null, null).getBlockListAsLongs())
+ new StorageBlockReport(storage, BlockListAsLongs.EMPTY)
};
nameNodeProto.blockReport(dnRegistration,
nameNode.getNamesystem().getBlockPoolId(), reports);
@@ -968,19 +969,21 @@ public class NNThroughputBenchmark implements Tool {
}
return false;
}
- blocks.set(nrBlocks, blk);
+ blocks.set(nrBlocks, new BlockReportReplica(blk));
nrBlocks++;
return true;
}
void formBlockReport() {
// fill remaining slots with blocks that do not exist
- for(int idx = blocks.size()-1; idx >= nrBlocks; idx--)
- blocks.set(idx, new Block(blocks.size() - idx, 0, 0));
- blockReportList = new BlockListAsLongs(blocks).getBlockListAsLongs();
+ for (int idx = blocks.size()-1; idx >= nrBlocks; idx--) {
+ Block block = new Block(blocks.size() - idx, 0, 0);
+ blocks.set(idx, new BlockReportReplica(block));
+ }
+ blockReportList = BlockListAsLongs.EMPTY;
}
- long[] getBlockReportList() {
+ BlockListAsLongs getBlockReportList() {
return blockReportList;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d324164a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java
index fb1418a..ee80b33 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
@@ -104,7 +105,7 @@ public class TestDeadDatanode {
// Ensure blockReport from dead datanode is rejected with IOException
StorageBlockReport[] report = { new StorageBlockReport(
new DatanodeStorage(reg.getDatanodeUuid()),
- new long[] { 0L, 0L, 0L }) };
+ BlockListAsLongs.EMPTY) };
try {
dnp.blockReport(reg, poolId, report);
fail("Expected IOException is not thrown");
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d324164a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImage.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImage.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImage.java
index f7dad18..7b9ea93 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImage.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImage.java
@@ -41,6 +41,7 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
import org.apache.hadoop.hdfs.server.namenode.LeaseManager.Lease;
import org.apache.hadoop.hdfs.util.MD5FileUtils;
import org.apache.hadoop.test.GenericTestUtils;
@@ -219,6 +220,7 @@ public class TestFSImage {
.manageDataDfsDirs(false)
.manageNameDfsDirs(false)
.waitSafeMode(false)
+ .startupOption(StartupOption.UPGRADE)
.build();
try {
FileSystem fs = cluster.getFileSystem();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d324164a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/offlineEditsViewer/TestOfflineEditsViewer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/offlineEditsViewer/TestOfflineEditsViewer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/offlineEditsViewer/TestOfflineEditsViewer.java
index 0e605ac..2ad7b60 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/offlineEditsViewer/TestOfflineEditsViewer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/offlineEditsViewer/TestOfflineEditsViewer.java
@@ -32,6 +32,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes;
+import org.apache.hadoop.hdfs.server.namenode.NameNodeLayoutVersion;
import org.apache.hadoop.hdfs.server.namenode.OfflineEditsViewerHelper;
import org.apache.hadoop.hdfs.tools.offlineEditsViewer.OfflineEditsViewer.Flags;
import org.apache.hadoop.test.PathUtils;
@@ -140,8 +141,8 @@ public class TestOfflineEditsViewer {
assertEquals(0, runOev(editsReparsed, editsParsedXml2, "xml", false));
// judgment time
- assertTrue("Test round trip",
- filesEqualIgnoreTrailingZeros(editsParsedXml, editsParsedXml2));
+ assertTrue("Test round trip", FileUtils.contentEqualsIgnoreEOL(
+ new File(editsParsedXml), new File(editsParsedXml2), "UTF-8"));
os.close();
}
@@ -238,6 +239,10 @@ public class TestOfflineEditsViewer {
ByteBuffer small = ByteBuffer.wrap(DFSTestUtil.loadFile(filenameSmall));
ByteBuffer large = ByteBuffer.wrap(DFSTestUtil.loadFile(filenameLarge));
+ // OEV outputs with the latest layout version, so tweak the old file's
+ // contents to have latest version so checkedin binary files don't
+ // require frequent updates
+ small.put(3, (byte)NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
// now correct if it's otherwise
if (small.capacity() > large.capacity()) {