You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cm...@apache.org on 2016/02/02 20:28:55 UTC
[2/2] hadoop git commit: HDFS-9260. Improve the performance and GC
friendliness of NameNode startup and full block reports (Staffan Friberg via
cmccabe)
HDFS-9260. Improve the performance and GC friendliness of NameNode startup and full block reports (Staffan Friberg via cmccabe)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/dd9ebf6e
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/dd9ebf6e
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/dd9ebf6e
Branch: refs/heads/trunk
Commit: dd9ebf6eedfd4ff8b3486eae2a446de6b0c7fa8a
Parents: 2da03b4
Author: Colin Patrick Mccabe <cm...@cloudera.com>
Authored: Tue Feb 2 11:23:00 2016 -0800
Committer: Colin Patrick Mccabe <cm...@cloudera.com>
Committed: Tue Feb 2 11:23:00 2016 -0800
----------------------------------------------------------------------
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 +
.../org/apache/hadoop/hdfs/DFSConfigKeys.java | 12 +
.../DatanodeProtocolClientSideTranslatorPB.java | 5 +-
.../apache/hadoop/hdfs/protocolPB/PBHelper.java | 5 +-
.../hdfs/server/blockmanagement/BlockInfo.java | 192 +--
.../blockmanagement/BlockInfoContiguous.java | 29 +-
.../blockmanagement/BlockInfoStriped.java | 30 +-
.../server/blockmanagement/BlockManager.java | 458 +++++--
.../hdfs/server/blockmanagement/BlocksMap.java | 66 +-
.../blockmanagement/DatanodeStorageInfo.java | 123 +-
.../hdfs/server/datanode/BPServiceActor.java | 4 +-
.../datanode/fsdataset/impl/ReplicaMap.java | 71 +-
.../server/protocol/BlockReportContext.java | 10 +-
.../hdfs/server/protocol/DatanodeProtocol.java | 1 -
.../apache/hadoop/hdfs/util/FoldedTreeSet.java | 1285 ++++++++++++++++++
.../src/main/proto/DatanodeProtocol.proto | 3 +
.../hdfs/protocol/TestBlockListAsLongs.java | 4 +-
.../server/blockmanagement/TestBlockInfo.java | 88 --
.../blockmanagement/TestBlockManager.java | 71 +-
.../server/datanode/SimulatedFSDataset.java | 5 +-
.../TestBlockHasMultipleReplicasOnSameDN.java | 20 +-
.../datanode/TestDataNodeHotSwapVolumes.java | 1 +
.../datanode/TestDataNodeVolumeFailure.java | 4 +-
...TestDnRespectsBlockReportSplitThreshold.java | 4 +-
.../TestNNHandlesBlockReportPerStorage.java | 3 +-
.../TestNNHandlesCombinedBlockReport.java | 2 +-
.../server/datanode/TestTriggerBlockReport.java | 1 +
.../server/namenode/NNThroughputBenchmark.java | 4 +-
.../server/namenode/TestAddStripedBlocks.java | 4 +-
.../hdfs/server/namenode/TestDeadDatanode.java | 2 +-
.../hadoop/hdfs/util/FoldedTreeSetTest.java | 644 +++++++++
31 files changed, 2565 insertions(+), 589 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dd9ebf6e/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 2eac881..38cb3df 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -971,6 +971,9 @@ Release 2.9.0 - UNRELEASED
HDFS-7764. DirectoryScanner shouldn't abort the scan if one directory had
an error (Rakesh R via cmccabe)
+ HDFS-9260. Improve the performance and GC friendliness of NameNode startup
+ and full block reports (Staffan Friberg via cmccabe)
+
OPTIMIZATIONS
BUG FIXES
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dd9ebf6e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index 5217740..76915cb 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -219,6 +219,18 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final int DFS_NAMENODE_REPLICATION_MAX_STREAMS_DEFAULT = 2;
public static final String DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_KEY = "dfs.namenode.replication.max-streams-hard-limit";
public static final int DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_DEFAULT = 4;
+ public static final String DFS_NAMENODE_STORAGEINFO_DEFRAGMENT_INTERVAL_MS_KEY
+ = "dfs.namenode.storageinfo.defragment.interval.ms";
+ public static final int
+ DFS_NAMENODE_STORAGEINFO_DEFRAGMENT_INTERVAL_MS_DEFAULT = 10 * 60 * 1000;
+ public static final String DFS_NAMENODE_STORAGEINFO_DEFRAGMENT_TIMEOUT_MS_KEY
+ = "dfs.namenode.storageinfo.defragment.timeout.ms";
+ public static final int
+ DFS_NAMENODE_STORAGEINFO_DEFRAGMENT_TIMEOUT_MS_DEFAULT = 4;
+ public static final String DFS_NAMENODE_STORAGEINFO_DEFRAGMENT_RATIO_KEY
+ = "dfs.namenode.storageinfo.defragment.ratio";
+ public static final double
+ DFS_NAMENODE_STORAGEINFO_DEFRAGMENT_RATIO_DEFAULT = 0.75;
public static final String DFS_WEBHDFS_AUTHENTICATION_FILTER_KEY = "dfs.web.authentication.filter";
/* Phrased as below to avoid javac inlining as a constant, to match the behavior when
this was AuthFilter.class.getName(). Note that if you change the import for AuthFilter, you
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dd9ebf6e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java
index 81c23e1..79113dd 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java
@@ -174,12 +174,13 @@ public class DatanodeProtocolClientSideTranslatorPB implements
@Override
public DatanodeCommand blockReport(DatanodeRegistration registration,
- String poolId, StorageBlockReport[] reports, BlockReportContext context)
+ String poolId, StorageBlockReport[] reports,
+ BlockReportContext context)
throws IOException {
BlockReportRequestProto.Builder builder = BlockReportRequestProto
.newBuilder().setRegistration(PBHelper.convert(registration))
.setBlockPoolId(poolId);
-
+
boolean useBlocksBuffer = registration.getNamespaceInfo()
.isCapabilitySupported(Capability.STORAGE_BLOCK_REPORT_BUFFERS);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dd9ebf6e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
index 4b6baf2..e70cdf0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
@@ -824,8 +824,8 @@ public class PBHelper {
public static BlockReportContext convert(BlockReportContextProto proto) {
- return new BlockReportContext(proto.getTotalRpcs(),
- proto.getCurRpc(), proto.getId(), proto.getLeaseId());
+ return new BlockReportContext(proto.getTotalRpcs(), proto.getCurRpc(),
+ proto.getId(), proto.getLeaseId(), proto.getSorted());
}
public static BlockReportContextProto convert(BlockReportContext context) {
@@ -834,6 +834,7 @@ public class PBHelper {
setCurRpc(context.getCurRpc()).
setId(context.getReportId()).
setLeaseId(context.getLeaseId()).
+ setSorted(context.isSorted()).
build();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dd9ebf6e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java
index e9fa123..5da2140 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java
@@ -18,8 +18,9 @@
package org.apache.hadoop.hdfs.server.blockmanagement;
import java.io.IOException;
-import java.util.LinkedList;
+import java.util.Iterator;
import java.util.List;
+import java.util.NoSuchElementException;
import com.google.common.base.Preconditions;
import org.apache.hadoop.classification.InterfaceAudience;
@@ -55,19 +56,9 @@ public abstract class BlockInfo extends Block
/** For implementing {@link LightWeightGSet.LinkedElement} interface. */
private LightWeightGSet.LinkedElement nextLinkedElement;
- /**
- * This array contains triplets of references. For each i-th storage, the
- * block belongs to triplets[3*i] is the reference to the
- * {@link DatanodeStorageInfo} and triplets[3*i+1] and triplets[3*i+2] are
- * references to the previous and the next blocks, respectively, in the list
- * of blocks belonging to this storage.
- *
- * Using previous and next in Object triplets is done instead of a
- * {@link LinkedList} list to efficiently use memory. With LinkedList the cost
- * per replica is 42 bytes (LinkedList#Entry object per replica) versus 16
- * bytes using the triplets.
- */
- protected Object[] triplets;
+
+ // Storages this block is replicated on
+ protected DatanodeStorageInfo[] storages;
private BlockUnderConstructionFeature uc;
@@ -77,14 +68,14 @@ public abstract class BlockInfo extends Block
* in the block group
*/
public BlockInfo(short size) {
- this.triplets = new Object[3 * size];
+ this.storages = new DatanodeStorageInfo[size];
this.bcId = INVALID_INODE_ID;
this.replication = isStriped() ? 0 : size;
}
public BlockInfo(Block blk, short size) {
super(blk);
- this.triplets = new Object[3*size];
+ this.storages = new DatanodeStorageInfo[size];
this.bcId = INVALID_INODE_ID;
this.replication = isStriped() ? 0 : size;
}
@@ -109,79 +100,52 @@ public abstract class BlockInfo extends Block
return bcId == INVALID_INODE_ID;
}
- public DatanodeDescriptor getDatanode(int index) {
- DatanodeStorageInfo storage = getStorageInfo(index);
- return storage == null ? null : storage.getDatanodeDescriptor();
- }
+ public Iterator<DatanodeStorageInfo> getStorageInfos() {
+ return new Iterator<DatanodeStorageInfo>() {
- DatanodeStorageInfo getStorageInfo(int index) {
- assert this.triplets != null : "BlockInfo is not initialized";
- assert index >= 0 && index*3 < triplets.length : "Index is out of bound";
- return (DatanodeStorageInfo)triplets[index*3];
- }
+ private int index = 0;
- BlockInfo getPrevious(int index) {
- assert this.triplets != null : "BlockInfo is not initialized";
- assert index >= 0 && index*3+1 < triplets.length : "Index is out of bound";
- BlockInfo info = (BlockInfo)triplets[index*3+1];
- assert info == null ||
- info.getClass().getName().startsWith(BlockInfo.class.getName()) :
- "BlockInfo is expected at " + index*3;
- return info;
- }
+ @Override
+ public boolean hasNext() {
+ while (index < storages.length && storages[index] == null) {
+ index++;
+ }
+ return index < storages.length;
+ }
- BlockInfo getNext(int index) {
- assert this.triplets != null : "BlockInfo is not initialized";
- assert index >= 0 && index*3+2 < triplets.length : "Index is out of bound";
- BlockInfo info = (BlockInfo)triplets[index*3+2];
- assert info == null || info.getClass().getName().startsWith(
- BlockInfo.class.getName()) :
- "BlockInfo is expected at " + index*3;
- return info;
+ @Override
+ public DatanodeStorageInfo next() {
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
+ return storages[index++];
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException("Sorry. can't remove.");
+ }
+ };
}
- void setStorageInfo(int index, DatanodeStorageInfo storage) {
- assert this.triplets != null : "BlockInfo is not initialized";
- assert index >= 0 && index*3 < triplets.length : "Index is out of bound";
- triplets[index*3] = storage;
+ public DatanodeDescriptor getDatanode(int index) {
+ DatanodeStorageInfo storage = getStorageInfo(index);
+ return storage == null ? null : storage.getDatanodeDescriptor();
}
- /**
- * Return the previous block on the block list for the datanode at
- * position index. Set the previous block on the list to "to".
- *
- * @param index - the datanode index
- * @param to - block to be set to previous on the list of blocks
- * @return current previous block on the list of blocks
- */
- BlockInfo setPrevious(int index, BlockInfo to) {
- assert this.triplets != null : "BlockInfo is not initialized";
- assert index >= 0 && index*3+1 < triplets.length : "Index is out of bound";
- BlockInfo info = (BlockInfo) triplets[index*3+1];
- triplets[index*3+1] = to;
- return info;
+ DatanodeStorageInfo getStorageInfo(int index) {
+ assert this.storages != null : "BlockInfo is not initialized";
+ return storages[index];
}
- /**
- * Return the next block on the block list for the datanode at
- * position index. Set the next block on the list to "to".
- *
- * @param index - the datanode index
- * @param to - block to be set to next on the list of blocks
- * @return current next block on the list of blocks
- */
- BlockInfo setNext(int index, BlockInfo to) {
- assert this.triplets != null : "BlockInfo is not initialized";
- assert index >= 0 && index*3+2 < triplets.length : "Index is out of bound";
- BlockInfo info = (BlockInfo) triplets[index*3+2];
- triplets[index*3+2] = to;
- return info;
+ void setStorageInfo(int index, DatanodeStorageInfo storage) {
+ assert this.storages != null : "BlockInfo is not initialized";
+ this.storages[index] = storage;
}
public int getCapacity() {
- assert this.triplets != null : "BlockInfo is not initialized";
- assert triplets.length % 3 == 0 : "Malformed BlockInfo";
- return triplets.length / 3;
+ assert this.storages != null : "BlockInfo is not initialized";
+ return storages.length;
}
/**
@@ -240,80 +204,6 @@ public abstract class BlockInfo extends Block
return -1;
}
- /**
- * Insert this block into the head of the list of blocks
- * related to the specified DatanodeStorageInfo.
- * If the head is null then form a new list.
- * @return current block as the new head of the list.
- */
- BlockInfo listInsert(BlockInfo head, DatanodeStorageInfo storage) {
- int dnIndex = this.findStorageInfo(storage);
- assert dnIndex >= 0 : "Data node is not found: current";
- assert getPrevious(dnIndex) == null && getNext(dnIndex) == null :
- "Block is already in the list and cannot be inserted.";
- this.setPrevious(dnIndex, null);
- this.setNext(dnIndex, head);
- if (head != null) {
- head.setPrevious(head.findStorageInfo(storage), this);
- }
- return this;
- }
-
- /**
- * Remove this block from the list of blocks
- * related to the specified DatanodeStorageInfo.
- * If this block is the head of the list then return the next block as
- * the new head.
- * @return the new head of the list or null if the list becomes
- * empy after deletion.
- */
- BlockInfo listRemove(BlockInfo head, DatanodeStorageInfo storage) {
- if (head == null) {
- return null;
- }
- int dnIndex = this.findStorageInfo(storage);
- if (dnIndex < 0) { // this block is not on the data-node list
- return head;
- }
-
- BlockInfo next = this.getNext(dnIndex);
- BlockInfo prev = this.getPrevious(dnIndex);
- this.setNext(dnIndex, null);
- this.setPrevious(dnIndex, null);
- if (prev != null) {
- prev.setNext(prev.findStorageInfo(storage), next);
- }
- if (next != null) {
- next.setPrevious(next.findStorageInfo(storage), prev);
- }
- if (this == head) { // removing the head
- head = next;
- }
- return head;
- }
-
- /**
- * Remove this block from the list of blocks related to the specified
- * DatanodeDescriptor. Insert it into the head of the list of blocks.
- *
- * @return the new head of the list.
- */
- public BlockInfo moveBlockToHead(BlockInfo head, DatanodeStorageInfo storage,
- int curIndex, int headIndex) {
- if (head == this) {
- return this;
- }
- BlockInfo next = this.setNext(curIndex, head);
- BlockInfo prev = this.setPrevious(curIndex, null);
-
- head.setPrevious(headIndex, this);
- prev.setNext(prev.findStorageInfo(storage), next);
- if (next != null) {
- next.setPrevious(next.findStorageInfo(storage), prev);
- }
- return this;
- }
-
@Override
public int hashCode() {
// Super implementation is sufficient
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dd9ebf6e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguous.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguous.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguous.java
index 746e298..f729c4f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguous.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguous.java
@@ -35,20 +35,20 @@ public class BlockInfoContiguous extends BlockInfo {
}
/**
- * Ensure that there is enough space to include num more triplets.
- * @return first free triplet index.
+ * Ensure that there is enough space to include num more storages.
+ * @return first free storage index.
*/
private int ensureCapacity(int num) {
- assert this.triplets != null : "BlockInfo is not initialized";
+ assert this.storages != null : "BlockInfo is not initialized";
int last = numNodes();
- if (triplets.length >= (last+num)*3) {
+ if (storages.length >= (last+num)) {
return last;
}
/* Not enough space left. Create a new array. Should normally
* happen only when replication is manually increased by the user. */
- Object[] old = triplets;
- triplets = new Object[(last+num)*3];
- System.arraycopy(old, 0, triplets, 0, last * 3);
+ DatanodeStorageInfo[] old = storages;
+ storages = new DatanodeStorageInfo[(last+num)];
+ System.arraycopy(old, 0, storages, 0, last);
return last;
}
@@ -57,8 +57,6 @@ public class BlockInfoContiguous extends BlockInfo {
// find the last null node
int lastNode = ensureCapacity(1);
setStorageInfo(lastNode, storage);
- setNext(lastNode, null);
- setPrevious(lastNode, null);
return true;
}
@@ -68,25 +66,18 @@ public class BlockInfoContiguous extends BlockInfo {
if (dnIndex < 0) { // the node is not found
return false;
}
- assert getPrevious(dnIndex) == null && getNext(dnIndex) == null :
- "Block is still in the list and must be removed first.";
// find the last not null node
int lastNode = numNodes()-1;
- // replace current node triplet by the lastNode one
+ // replace current node entry by the lastNode one
setStorageInfo(dnIndex, getStorageInfo(lastNode));
- setNext(dnIndex, getNext(lastNode));
- setPrevious(dnIndex, getPrevious(lastNode));
- // set the last triplet to null
+ // set the last entry to null
setStorageInfo(lastNode, null);
- setNext(lastNode, null);
- setPrevious(lastNode, null);
return true;
}
@Override
public int numNodes() {
- assert this.triplets != null : "BlockInfo is not initialized";
- assert triplets.length % 3 == 0 : "Malformed BlockInfo";
+ assert this.storages != null : "BlockInfo is not initialized";
for (int idx = getCapacity()-1; idx >= 0; idx--) {
if (getDatanode(idx) != null) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dd9ebf6e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStriped.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStriped.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStriped.java
index 20d5858..c6e26ec 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStriped.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStriped.java
@@ -26,21 +26,20 @@ import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
/**
* Subclass of {@link BlockInfo}, presenting a block group in erasure coding.
*
- * We still use triplets to store DatanodeStorageInfo for each block in the
- * block group, as well as the previous/next block in the corresponding
- * DatanodeStorageInfo. For a (m+k) block group, the first (m+k) triplet units
+ * We still use a storage array to store DatanodeStorageInfo for each block in
+ * the block group. For a (m+k) block group, the first (m+k) storage units
* are sorted and strictly mapped to the corresponding block.
*
* Normally each block belonging to group is stored in only one DataNode.
- * However, it is possible that some block is over-replicated. Thus the triplet
+ * However, it is possible that some block is over-replicated. Thus the storage
* array's size can be larger than (m+k). Thus currently we use an extra byte
- * array to record the block index for each triplet.
+ * array to record the block index for each entry.
*/
@InterfaceAudience.Private
public class BlockInfoStriped extends BlockInfo {
private final ErasureCodingPolicy ecPolicy;
/**
- * Always the same size with triplets. Record the block index for each triplet
+ * Always the same size with storage. Record the block index for each entry
* TODO: actually this is only necessary for over-replicated block. Thus can
* be further optimized to save memory usage.
*/
@@ -104,7 +103,7 @@ public class BlockInfoStriped extends BlockInfo {
return i;
}
}
- // need to expand the triplet size
+ // need to expand the storage size
ensureCapacity(i + 1, true);
return i;
}
@@ -130,8 +129,6 @@ public class BlockInfoStriped extends BlockInfo {
private void addStorage(DatanodeStorageInfo storage, int index,
int blockIndex) {
setStorageInfo(index, storage);
- setNext(index, null);
- setPrevious(index, null);
indices[index] = (byte) blockIndex;
}
@@ -173,26 +170,22 @@ public class BlockInfoStriped extends BlockInfo {
if (dnIndex < 0) { // the node is not found
return false;
}
- assert getPrevious(dnIndex) == null && getNext(dnIndex) == null :
- "Block is still in the list and must be removed first.";
- // set the triplet to null
+ // set the entry to null
setStorageInfo(dnIndex, null);
- setNext(dnIndex, null);
- setPrevious(dnIndex, null);
indices[dnIndex] = -1;
return true;
}
private void ensureCapacity(int totalSize, boolean keepOld) {
if (getCapacity() < totalSize) {
- Object[] old = triplets;
+ DatanodeStorageInfo[] old = storages;
byte[] oldIndices = indices;
- triplets = new Object[totalSize * 3];
+ storages = new DatanodeStorageInfo[totalSize];
indices = new byte[totalSize];
initIndices();
if (keepOld) {
- System.arraycopy(old, 0, triplets, 0, old.length);
+ System.arraycopy(old, 0, storages, 0, old.length);
System.arraycopy(oldIndices, 0, indices, 0, oldIndices.length);
}
}
@@ -214,8 +207,7 @@ public class BlockInfoStriped extends BlockInfo {
@Override
public int numNodes() {
- assert this.triplets != null : "BlockInfo is not initialized";
- assert triplets.length % 3 == 0 : "Malformed BlockInfo";
+ assert this.storages != null : "BlockInfo is not initialized";
int num = 0;
for (int idx = getCapacity()-1; idx >= 0; idx--) {
if (getStorageInfo(idx) != null) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dd9ebf6e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index 587e6b6..25cec8a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -61,6 +61,7 @@ import org.apache.hadoop.hdfs.protocol.BlockListAsLongs.BlockReportReplica;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
@@ -93,6 +94,7 @@ import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage.State;
import org.apache.hadoop.hdfs.server.protocol.KeyUpdateCommand;
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
+import org.apache.hadoop.hdfs.util.FoldedTreeSet;
import org.apache.hadoop.hdfs.util.LightWeightHashSet;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
@@ -106,6 +108,7 @@ import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.ExitUtil;
import org.apache.hadoop.util.LightWeightGSet;
import org.apache.hadoop.util.Time;
+import org.apache.hadoop.util.VersionInfo;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
@@ -195,7 +198,12 @@ public class BlockManager implements BlockStatsMXBean {
/**replicationRecheckInterval is how often namenode checks for new replication work*/
private final long replicationRecheckInterval;
-
+
+ /** How often to check and the limit for the storageinfo efficiency. */
+ private final long storageInfoDefragmentInterval;
+ private final long storageInfoDefragmentTimeout;
+ private final double storageInfoDefragmentRatio;
+
/**
* Mapping: Block -> { BlockCollection, datanodes, self ref }
* Updated only in response to client-sent information.
@@ -204,6 +212,10 @@ public class BlockManager implements BlockStatsMXBean {
/** Replication thread. */
final Daemon replicationThread = new Daemon(new ReplicationMonitor());
+
+ /** StorageInfoDefragmenter thread. */
+ private final Daemon storageInfoDefragmenterThread =
+ new Daemon(new StorageInfoDefragmenter());
/** Block report thread for handling async reports. */
private final BlockReportProcessingThread blockReportThread =
@@ -376,7 +388,20 @@ public class BlockManager implements BlockStatsMXBean {
this.replicationRecheckInterval =
conf.getInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY,
DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_DEFAULT) * 1000L;
-
+
+ this.storageInfoDefragmentInterval =
+ conf.getLong(
+ DFSConfigKeys.DFS_NAMENODE_STORAGEINFO_DEFRAGMENT_INTERVAL_MS_KEY,
+ DFSConfigKeys.DFS_NAMENODE_STORAGEINFO_DEFRAGMENT_INTERVAL_MS_DEFAULT);
+ this.storageInfoDefragmentTimeout =
+ conf.getLong(
+ DFSConfigKeys.DFS_NAMENODE_STORAGEINFO_DEFRAGMENT_TIMEOUT_MS_KEY,
+ DFSConfigKeys.DFS_NAMENODE_STORAGEINFO_DEFRAGMENT_TIMEOUT_MS_DEFAULT);
+ this.storageInfoDefragmentRatio =
+ conf.getDouble(
+ DFSConfigKeys.DFS_NAMENODE_STORAGEINFO_DEFRAGMENT_RATIO_KEY,
+ DFSConfigKeys.DFS_NAMENODE_STORAGEINFO_DEFRAGMENT_RATIO_DEFAULT);
+
this.encryptDataTransfer =
conf.getBoolean(DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_KEY,
DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_DEFAULT);
@@ -508,6 +533,8 @@ public class BlockManager implements BlockStatsMXBean {
datanodeManager.activate(conf);
this.replicationThread.setName("ReplicationMonitor");
this.replicationThread.start();
+ storageInfoDefragmenterThread.setName("StorageInfoMonitor");
+ storageInfoDefragmenterThread.start();
this.blockReportThread.start();
mxBeanName = MBeans.register("NameNode", "BlockStats", this);
bmSafeMode.activate(blockTotal);
@@ -517,8 +544,10 @@ public class BlockManager implements BlockStatsMXBean {
bmSafeMode.close();
try {
replicationThread.interrupt();
+ storageInfoDefragmenterThread.interrupt();
blockReportThread.interrupt();
replicationThread.join(3000);
+ storageInfoDefragmenterThread.join(3000);
blockReportThread.join(3000);
} catch (InterruptedException ie) {
}
@@ -1165,9 +1194,15 @@ public class BlockManager implements BlockStatsMXBean {
/** Remove the blocks associated to the given datanode. */
void removeBlocksAssociatedTo(final DatanodeDescriptor node) {
- final Iterator<BlockInfo> it = node.getBlockIterator();
- while(it.hasNext()) {
- removeStoredBlock(it.next(), node);
+ for (DatanodeStorageInfo storage : node.getStorageInfos()) {
+ final Iterator<BlockInfo> it = storage.getBlockIterator();
+ while (it.hasNext()) {
+ BlockInfo block = it.next();
+ // DatanodeStorageInfo must be removed using the iterator to avoid
+ // ConcurrentModificationException in the underlying storage
+ it.remove();
+ removeStoredBlock(block, node);
+ }
}
// Remove all pending DN messages referencing this DN.
pendingDNMessages.removeAllMessagesForDatanode(node);
@@ -1183,6 +1218,9 @@ public class BlockManager implements BlockStatsMXBean {
DatanodeDescriptor node = storageInfo.getDatanodeDescriptor();
while(it.hasNext()) {
BlockInfo block = it.next();
+ // DatanodeStorageInfo must be removed using the iterator to avoid
+ // ConcurrentModificationException in the underlying storage
+ it.remove();
removeStoredBlock(block, node);
final Block b = getBlockOnStorage(block, storageInfo);
if (b != null) {
@@ -2033,8 +2071,8 @@ public class BlockManager implements BlockStatsMXBean {
*/
public boolean processReport(final DatanodeID nodeID,
final DatanodeStorage storage,
- final BlockListAsLongs newReport, BlockReportContext context,
- boolean lastStorageInRpc) throws IOException {
+ final BlockListAsLongs newReport,
+ BlockReportContext context, boolean lastStorageInRpc) throws IOException {
namesystem.writeLock();
final long startTime = Time.monotonicNow(); //after acquiring write lock
final long endTime;
@@ -2079,7 +2117,8 @@ public class BlockManager implements BlockStatsMXBean {
nodeID.getDatanodeUuid());
processFirstBlockReport(storageInfo, newReport);
} else {
- invalidatedBlocks = processReport(storageInfo, newReport);
+ invalidatedBlocks = processReport(storageInfo, newReport,
+ context != null ? context.isSorted() : false);
}
storageInfo.receivedBlockReport();
@@ -2149,6 +2188,9 @@ public class BlockManager implements BlockStatsMXBean {
// TODO: remove this assumption in case we want to put a block on
// more than one storage on a datanode (and because it's a difficult
// assumption to really enforce)
+ // DatanodeStorageInfo must be removed using the iterator to avoid
+ // ConcurrentModificationException in the underlying storage
+ iter.remove();
removeStoredBlock(block, zombie.getDatanodeDescriptor());
Block b = getBlockOnStorage(block, zombie);
if (b != null) {
@@ -2238,7 +2280,7 @@ public class BlockManager implements BlockStatsMXBean {
private Collection<Block> processReport(
final DatanodeStorageInfo storageInfo,
- final BlockListAsLongs report) throws IOException {
+ final BlockListAsLongs report, final boolean sorted) throws IOException {
// Normal case:
// Modify the (block-->datanode) map, according to the difference
// between the old and new block report.
@@ -2248,9 +2290,29 @@ public class BlockManager implements BlockStatsMXBean {
Collection<Block> toInvalidate = new LinkedList<>();
Collection<BlockToMarkCorrupt> toCorrupt = new LinkedList<>();
Collection<StatefulBlockInfo> toUC = new LinkedList<>();
- reportDiff(storageInfo, report,
- toAdd, toRemove, toInvalidate, toCorrupt, toUC);
-
+
+ Iterable<BlockReportReplica> sortedReport;
+ if (!sorted) {
+ blockLog.warn("BLOCK* processReport: Report from the DataNode ({}) is "
+ + "unsorted. This will cause overhead on the NameNode "
+ + "which needs to sort the Full BR. Please update the "
+ + "DataNode to the same version of Hadoop HDFS as the "
+ + "NameNode ({}).",
+ storageInfo.getDatanodeDescriptor().getDatanodeUuid(),
+ VersionInfo.getVersion());
+ Set<BlockReportReplica> set = new FoldedTreeSet<>();
+ for (BlockReportReplica iblk : report) {
+ set.add(new BlockReportReplica(iblk));
+ }
+ sortedReport = set;
+ } else {
+ sortedReport = report;
+ }
+
+ reportDiffSorted(storageInfo, sortedReport,
+ toAdd, toRemove, toInvalidate, toCorrupt, toUC);
+
+
DatanodeDescriptor node = storageInfo.getDatanodeDescriptor();
// Process the blocks on each queue
for (StatefulBlockInfo b : toUC) {
@@ -2399,126 +2461,111 @@ public class BlockManager implements BlockStatsMXBean {
}
}
- private void reportDiff(DatanodeStorageInfo storageInfo,
- BlockListAsLongs newReport,
+ private void reportDiffSorted(DatanodeStorageInfo storageInfo,
+ Iterable<BlockReportReplica> newReport,
Collection<BlockInfoToAdd> toAdd, // add to DatanodeDescriptor
Collection<BlockInfo> toRemove, // remove from DatanodeDescriptor
Collection<Block> toInvalidate, // should be removed from DN
Collection<BlockToMarkCorrupt> toCorrupt, // add to corrupt replicas list
Collection<StatefulBlockInfo> toUC) { // add to under-construction list
- // place a delimiter in the list which separates blocks
- // that have been reported from those that have not
- Block delimiterBlock = new Block();
- BlockInfo delimiter = new BlockInfoContiguous(delimiterBlock,
- (short) 1);
- AddBlockResult result = storageInfo.addBlock(delimiter, delimiterBlock);
- assert result == AddBlockResult.ADDED
- : "Delimiting block cannot be present in the node";
- int headIndex = 0; //currently the delimiter is in the head of the list
- int curIndex;
-
- if (newReport == null) {
- newReport = BlockListAsLongs.EMPTY;
- }
- // scan the report and process newly reported blocks
- for (BlockReportReplica iblk : newReport) {
- ReplicaState iState = iblk.getState();
- BlockInfo storedBlock = processReportedBlock(storageInfo,
- iblk, iState, toAdd, toInvalidate, toCorrupt, toUC);
-
- // move block to the head of the list
- if (storedBlock != null &&
- (curIndex = storedBlock.findStorageInfo(storageInfo)) >= 0) {
- headIndex = storageInfo.moveBlockToHead(storedBlock, curIndex, headIndex);
+ // The blocks must be sorted and the storagenodes blocks must be sorted
+ Iterator<BlockInfo> storageBlocksIterator = storageInfo.getBlockIterator();
+ DatanodeDescriptor dn = storageInfo.getDatanodeDescriptor();
+ BlockInfo storageBlock = null;
+
+ for (BlockReportReplica replica : newReport) {
+
+ long replicaID = replica.getBlockId();
+ if (BlockIdManager.isStripedBlockID(replicaID)
+ && (!hasNonEcBlockUsingStripedID ||
+ !blocksMap.containsBlock(replica))) {
+ replicaID = BlockIdManager.convertToStripedID(replicaID);
+ }
+
+ ReplicaState reportedState = replica.getState();
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Reported block " + replica
+ + " on " + dn + " size " + replica.getNumBytes()
+ + " replicaState = " + reportedState);
+ }
+
+ if (shouldPostponeBlocksFromFuture
+ && isGenStampInFuture(replica)) {
+ queueReportedBlock(storageInfo, replica, reportedState,
+ QUEUE_REASON_FUTURE_GENSTAMP);
+ continue;
+ }
+
+ if (storageBlock == null && storageBlocksIterator.hasNext()) {
+ storageBlock = storageBlocksIterator.next();
}
+
+ do {
+ int cmp;
+ if (storageBlock == null ||
+ (cmp = Long.compare(replicaID, storageBlock.getBlockId())) < 0) {
+ // Check if block is available in NN but not yet on this storage
+ BlockInfo nnBlock = blocksMap.getStoredBlock(new Block(replicaID));
+ if (nnBlock != null) {
+ reportDiffSortedInner(storageInfo, replica, reportedState,
+ nnBlock, toAdd, toCorrupt, toUC);
+ } else {
+ // Replica not found anywhere so it should be invalidated
+ toInvalidate.add(new Block(replica));
+ }
+ break;
+ } else if (cmp == 0) {
+ // Replica matched current storageblock
+ reportDiffSortedInner(storageInfo, replica, reportedState,
+ storageBlock, toAdd, toCorrupt, toUC);
+ storageBlock = null;
+ } else {
+ // replica has higher ID than storedBlock
+ // Remove all stored blocks with IDs lower than replica
+ do {
+ toRemove.add(storageBlock);
+ storageBlock = storageBlocksIterator.hasNext()
+ ? storageBlocksIterator.next() : null;
+ } while (storageBlock != null &&
+ Long.compare(replicaID, storageBlock.getBlockId()) > 0);
+ }
+ } while (storageBlock != null);
}
- // collect blocks that have not been reported
- // all of them are next to the delimiter
- Iterator<BlockInfo> it =
- storageInfo.new BlockIterator(delimiter.getNext(0));
- while (it.hasNext()) {
- toRemove.add(it.next());
+ // Iterate any remaing blocks that have not been reported and remove them
+ while (storageBlocksIterator.hasNext()) {
+ toRemove.add(storageBlocksIterator.next());
}
- storageInfo.removeBlock(delimiter);
}
- /**
- * Process a block replica reported by the data-node.
- * No side effects except adding to the passed-in Collections.
- *
- * <ol>
- * <li>If the block is not known to the system (not in blocksMap) then the
- * data-node should be notified to invalidate this block.</li>
- * <li>If the reported replica is valid that is has the same generation stamp
- * and length as recorded on the name-node, then the replica location should
- * be added to the name-node.</li>
- * <li>If the reported replica is not valid, then it is marked as corrupt,
- * which triggers replication of the existing valid replicas.
- * Corrupt replicas are removed from the system when the block
- * is fully replicated.</li>
- * <li>If the reported replica is for a block currently marked "under
- * construction" in the NN, then it should be added to the
- * BlockUnderConstructionFeature's list of replicas.</li>
- * </ol>
- *
- * @param storageInfo DatanodeStorageInfo that sent the report.
- * @param block reported block replica
- * @param reportedState reported replica state
- * @param toAdd add to DatanodeDescriptor
- * @param toInvalidate missing blocks (not in the blocks map)
- * should be removed from the data-node
- * @param toCorrupt replicas with unexpected length or generation stamp;
- * add to corrupt replicas
- * @param toUC replicas of blocks currently under construction
- * @return the up-to-date stored block, if it should be kept.
- * Otherwise, null.
- */
- private BlockInfo processReportedBlock(
+ private void reportDiffSortedInner(
final DatanodeStorageInfo storageInfo,
- final Block block, final ReplicaState reportedState,
+ final BlockReportReplica replica, final ReplicaState reportedState,
+ final BlockInfo storedBlock,
final Collection<BlockInfoToAdd> toAdd,
- final Collection<Block> toInvalidate,
final Collection<BlockToMarkCorrupt> toCorrupt,
final Collection<StatefulBlockInfo> toUC) {
-
- DatanodeDescriptor dn = storageInfo.getDatanodeDescriptor();
- if(LOG.isDebugEnabled()) {
- LOG.debug("Reported block " + block
- + " on " + dn + " size " + block.getNumBytes()
- + " replicaState = " + reportedState);
- }
-
- if (shouldPostponeBlocksFromFuture && isGenStampInFuture(block)) {
- queueReportedBlock(storageInfo, block, reportedState,
- QUEUE_REASON_FUTURE_GENSTAMP);
- return null;
- }
-
- // find block by blockId
- BlockInfo storedBlock = getStoredBlock(block);
- if(storedBlock == null) {
- // If blocksMap does not contain reported block id,
- // the replica should be removed from the data-node.
- toInvalidate.add(new Block(block));
- return null;
- }
+ assert replica != null;
+ assert storedBlock != null;
+
+ DatanodeDescriptor dn = storageInfo.getDatanodeDescriptor();
BlockUCState ucState = storedBlock.getBlockUCState();
-
+
// Block is on the NN
- if(LOG.isDebugEnabled()) {
+ if (LOG.isDebugEnabled()) {
LOG.debug("In memory blockUCState = " + ucState);
}
// Ignore replicas already scheduled to be removed from the DN
- if(invalidateBlocks.contains(dn, block)) {
- return storedBlock;
+ if (invalidateBlocks.contains(dn, replica)) {
+ return;
}
- BlockToMarkCorrupt c = checkReplicaCorrupt(
- block, reportedState, storedBlock, ucState, dn);
+ BlockToMarkCorrupt c = checkReplicaCorrupt(replica, reportedState,
+ storedBlock, ucState, dn);
if (c != null) {
if (shouldPostponeBlocksFromFuture) {
// If the block is an out-of-date generation stamp or state,
@@ -2532,23 +2579,16 @@ public class BlockManager implements BlockStatsMXBean {
} else {
toCorrupt.add(c);
}
- return storedBlock;
- }
-
- if (isBlockUnderConstruction(storedBlock, ucState, reportedState)) {
- toUC.add(new StatefulBlockInfo(storedBlock,
- new Block(block), reportedState));
- return storedBlock;
- }
-
- // Add replica if appropriate. If the replica was previously corrupt
- // but now okay, it might need to be updated.
- if (reportedState == ReplicaState.FINALIZED
- && (storedBlock.findStorageInfo(storageInfo) == -1 ||
- corruptReplicas.isReplicaCorrupt(storedBlock, dn))) {
- toAdd.add(new BlockInfoToAdd(storedBlock, block));
+ } else if (isBlockUnderConstruction(storedBlock, ucState, reportedState)) {
+ toUC.add(new StatefulBlockInfo(storedBlock, new Block(replica),
+ reportedState));
+ } else if (reportedState == ReplicaState.FINALIZED &&
+ (storedBlock.findStorageInfo(storageInfo) == -1 ||
+ corruptReplicas.isReplicaCorrupt(storedBlock, dn))) {
+ // Add replica if appropriate. If the replica was previously corrupt
+ // but now okay, it might need to be updated.
+ toAdd.add(new BlockInfoToAdd(storedBlock, replica));
}
- return storedBlock;
}
/**
@@ -2774,7 +2814,7 @@ public class BlockManager implements BlockStatsMXBean {
}
// just add it
- AddBlockResult result = storageInfo.addBlock(storedBlock, reported);
+ AddBlockResult result = storageInfo.addBlockInitial(storedBlock, reported);
// Now check for completion of blocks and safe block count
int numCurrentReplica = countLiveNodes(storedBlock);
@@ -3497,40 +3537,75 @@ public class BlockManager implements BlockStatsMXBean {
DatanodeStorageInfo storageInfo, Block block,
ReplicaState reportedState, DatanodeDescriptor delHintNode)
throws IOException {
- // blockReceived reports a finalized block
- Collection<BlockInfoToAdd> toAdd = new LinkedList<>();
- Collection<Block> toInvalidate = new LinkedList<Block>();
- Collection<BlockToMarkCorrupt> toCorrupt = new LinkedList<BlockToMarkCorrupt>();
- Collection<StatefulBlockInfo> toUC = new LinkedList<StatefulBlockInfo>();
+
final DatanodeDescriptor node = storageInfo.getDatanodeDescriptor();
- processReportedBlock(storageInfo, block, reportedState, toAdd, toInvalidate,
- toCorrupt, toUC);
- // the block is only in one of the to-do lists
- // if it is in none then data-node already has it
- assert toUC.size() + toAdd.size() + toInvalidate.size() + toCorrupt.size() <= 1
- : "The block should be only in one of the lists.";
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Reported block " + block
+ + " on " + node + " size " + block.getNumBytes()
+ + " replicaState = " + reportedState);
+ }
- for (StatefulBlockInfo b : toUC) {
- addStoredBlockUnderConstruction(b, storageInfo);
+ if (shouldPostponeBlocksFromFuture &&
+ isGenStampInFuture(block)) {
+ queueReportedBlock(storageInfo, block, reportedState,
+ QUEUE_REASON_FUTURE_GENSTAMP);
+ return;
}
- long numBlocksLogged = 0;
- for (BlockInfoToAdd b : toAdd) {
- addStoredBlock(b.stored, b.reported, storageInfo, delHintNode,
- numBlocksLogged < maxNumBlocksToLog);
- numBlocksLogged++;
+
+ // find block by blockId
+ BlockInfo storedBlock = getStoredBlock(block);
+ if(storedBlock == null) {
+ // If blocksMap does not contain reported block id,
+ // the replica should be removed from the data-node.
+ blockLog.debug("BLOCK* addBlock: block {} on node {} size {} does not " +
+ "belong to any file", block, node, block.getNumBytes());
+ addToInvalidates(new Block(block), node);
+ return;
}
- if (numBlocksLogged > maxNumBlocksToLog) {
- blockLog.debug("BLOCK* addBlock: logged info for {} of {} reported.",
- maxNumBlocksToLog, numBlocksLogged);
+
+ BlockUCState ucState = storedBlock.getBlockUCState();
+ // Block is on the NN
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("In memory blockUCState = " + ucState);
}
- for (Block b : toInvalidate) {
- blockLog.debug("BLOCK* addBlock: block {} on node {} size {} does not " +
- "belong to any file", b, node, b.getNumBytes());
- addToInvalidates(b, node);
+
+ // Ignore replicas already scheduled to be removed from the DN
+ if(invalidateBlocks.contains(node, block)) {
+ return;
}
- for (BlockToMarkCorrupt b : toCorrupt) {
- markBlockAsCorrupt(b, storageInfo, node);
+
+ BlockToMarkCorrupt c = checkReplicaCorrupt(
+ block, reportedState, storedBlock, ucState, node);
+ if (c != null) {
+ if (shouldPostponeBlocksFromFuture) {
+ // If the block is an out-of-date generation stamp or state,
+ // but we're the standby, we shouldn't treat it as corrupt,
+ // but instead just queue it for later processing.
+ // TODO: Pretty confident this should be s/storedBlock/block below,
+ // since we should be postponing the info of the reported block, not
+ // the stored block. See HDFS-6289 for more context.
+ queueReportedBlock(storageInfo, storedBlock, reportedState,
+ QUEUE_REASON_CORRUPT_STATE);
+ } else {
+ markBlockAsCorrupt(c, storageInfo, node);
+ }
+ return;
+ }
+
+ if (isBlockUnderConstruction(storedBlock, ucState, reportedState)) {
+ addStoredBlockUnderConstruction(
+ new StatefulBlockInfo(storedBlock, new Block(block), reportedState),
+ storageInfo);
+ return;
+ }
+
+ // Add replica if appropriate. If the replica was previously corrupt
+ // but now okay, it might need to be updated.
+ if (reportedState == ReplicaState.FINALIZED
+ && (storedBlock.findStorageInfo(storageInfo) == -1 ||
+ corruptReplicas.isReplicaCorrupt(storedBlock, node))) {
+ addStoredBlock(storedBlock, block, storageInfo, delHintNode, true);
}
}
@@ -4060,6 +4135,87 @@ public class BlockManager implements BlockStatsMXBean {
}
}
+ /**
+ * Runnable that monitors the fragmentation of the StorageInfo TreeSet and
+ * compacts it when it falls under a certain threshold.
+ */
+ private class StorageInfoDefragmenter implements Runnable {
+
+ @Override
+ public void run() {
+ while (namesystem.isRunning()) {
+ try {
+ // Check storage efficiency only when active NN is out of safe mode.
+ if (isPopulatingReplQueues()) {
+ scanAndCompactStorages();
+ }
+ Thread.sleep(storageInfoDefragmentInterval);
+ } catch (Throwable t) {
+ if (!namesystem.isRunning()) {
+ LOG.info("Stopping thread.");
+ if (!(t instanceof InterruptedException)) {
+ LOG.info("Received an exception while shutting down.", t);
+ }
+ break;
+ } else if (!checkNSRunning && t instanceof InterruptedException) {
+ LOG.info("Stopping for testing.");
+ break;
+ }
+ LOG.error("Thread received Runtime exception.", t);
+ terminate(1, t);
+ }
+ }
+ }
+
+ private void scanAndCompactStorages() throws InterruptedException {
+ ArrayList<String> datanodesAndStorages = new ArrayList<>();
+ for (DatanodeDescriptor node
+ : datanodeManager.getDatanodeListForReport(DatanodeReportType.ALL)) {
+ for (DatanodeStorageInfo storage : node.getStorageInfos()) {
+ try {
+ namesystem.readLock();
+ double ratio = storage.treeSetFillRatio();
+ if (ratio < storageInfoDefragmentRatio) {
+ datanodesAndStorages.add(node.getDatanodeUuid());
+ datanodesAndStorages.add(storage.getStorageID());
+ }
+ LOG.info("StorageInfo TreeSet fill ratio {} : {}{}",
+ storage.getStorageID(), ratio,
+ (ratio < storageInfoDefragmentRatio)
+ ? " (queued for defragmentation)" : "");
+ } finally {
+ namesystem.readUnlock();
+ }
+ }
+ }
+ if (!datanodesAndStorages.isEmpty()) {
+ for (int i = 0; i < datanodesAndStorages.size(); i += 2) {
+ namesystem.writeLock();
+ try {
+ DatanodeStorageInfo storage = datanodeManager.
+ getDatanode(datanodesAndStorages.get(i)).
+ getStorageInfo(datanodesAndStorages.get(i + 1));
+ if (storage != null) {
+ boolean aborted =
+ !storage.treeSetCompact(storageInfoDefragmentTimeout);
+ if (aborted) {
+ // Compaction timed out, reset iterator to continue with
+ // the same storage next iteration.
+ i -= 2;
+ }
+ LOG.info("StorageInfo TreeSet defragmented {} : {}{}",
+ storage.getStorageID(), storage.treeSetFillRatio(),
+ aborted ? " (aborted)" : "");
+ }
+ } finally {
+ namesystem.writeUnlock();
+ }
+ // Wait between each iteration
+ Thread.sleep(1000);
+ }
+ }
+ }
+ }
/**
* Compute block replication and block invalidation work that can be scheduled
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dd9ebf6e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java
index 47a21fe..71d0598 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hdfs.server.blockmanagement;
+import java.util.Collections;
import java.util.Iterator;
import org.apache.hadoop.hdfs.protocol.Block;
@@ -30,37 +31,6 @@ import org.apache.hadoop.util.LightWeightGSet;
* the datanodes that store the block.
*/
class BlocksMap {
- private static class StorageIterator implements Iterator<DatanodeStorageInfo> {
- private final BlockInfo blockInfo;
- private int nextIdx = 0;
-
- StorageIterator(BlockInfo blkInfo) {
- this.blockInfo = blkInfo;
- }
-
- @Override
- public boolean hasNext() {
- if (blockInfo == null) {
- return false;
- }
- while (nextIdx < blockInfo.getCapacity() &&
- blockInfo.getDatanode(nextIdx) == null) {
- // note that for striped blocks there may be null in the triplets
- nextIdx++;
- }
- return nextIdx < blockInfo.getCapacity();
- }
-
- @Override
- public DatanodeStorageInfo next() {
- return blockInfo.getStorageInfo(nextIdx++);
- }
-
- @Override
- public void remove() {
- throw new UnsupportedOperationException("Sorry. can't remove.");
- }
- }
/** Constant {@link LightWeightGSet} capacity. */
private final int capacity;
@@ -132,6 +102,16 @@ class BlocksMap {
}
}
+ /**
+ * Check if BlocksMap contains the block.
+ *
+ * @param b Block to check
+ * @return true if block is in the map, otherwise false
+ */
+ boolean containsBlock(Block b) {
+ return blocks.contains(b);
+ }
+
/** Returns the block object if it exists in the map. */
BlockInfo getStoredBlock(Block b) {
return blocks.get(b);
@@ -142,7 +122,9 @@ class BlocksMap {
* returns {@link Iterable} of the storages the block belongs to.
*/
Iterable<DatanodeStorageInfo> getStorages(Block b) {
- return getStorages(blocks.get(b));
+ BlockInfo block = blocks.get(b);
+ return block != null ? getStorages(block)
+ : Collections.<DatanodeStorageInfo>emptyList();
}
/**
@@ -150,12 +132,16 @@ class BlocksMap {
* returns {@link Iterable} of the storages the block belongs to.
*/
Iterable<DatanodeStorageInfo> getStorages(final BlockInfo storedBlock) {
- return new Iterable<DatanodeStorageInfo>() {
- @Override
- public Iterator<DatanodeStorageInfo> iterator() {
- return new StorageIterator(storedBlock);
- }
- };
+ if (storedBlock == null) {
+ return Collections.emptyList();
+ } else {
+ return new Iterable<DatanodeStorageInfo>() {
+ @Override
+ public Iterator<DatanodeStorageInfo> iterator() {
+ return storedBlock.getStorageInfos();
+ }
+ };
+ }
}
/** counts number of containing nodes. Better than using iterator. */
@@ -174,7 +160,7 @@ class BlocksMap {
if (info == null)
return false;
- // remove block from the data-node list and the node from the block info
+ // remove block from the data-node set and the node from the block info
boolean removed = removeBlock(node, info);
if (info.hasNoStorage() // no datanodes left
@@ -185,7 +171,7 @@ class BlocksMap {
}
/**
- * Remove block from the list of blocks belonging to the data-node. Remove
+ * Remove block from the set of blocks belonging to the data-node. Remove
* data-node from the block.
*/
static boolean removeBlock(DatanodeDescriptor dn, BlockInfo b) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dd9ebf6e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java
index 1f1b24b..c4729ea 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage.State;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
+import org.apache.hadoop.hdfs.util.FoldedTreeSet;
import com.google.common.annotations.VisibleForTesting;
@@ -85,31 +86,6 @@ public class DatanodeStorageInfo {
storageType = storage.getStorageType();
}
- /**
- * Iterates over the list of blocks belonging to the data-node.
- */
- class BlockIterator implements Iterator<BlockInfo> {
- private BlockInfo current;
-
- BlockIterator(BlockInfo head) {
- this.current = head;
- }
-
- public boolean hasNext() {
- return current != null;
- }
-
- public BlockInfo next() {
- BlockInfo res = current;
- current = current.getNext(current.findStorageInfo(DatanodeStorageInfo.this));
- return res;
- }
-
- public void remove() {
- throw new UnsupportedOperationException("Sorry. can't remove.");
- }
- }
-
private final DatanodeDescriptor dn;
private final String storageID;
private StorageType storageType;
@@ -120,8 +96,7 @@ public class DatanodeStorageInfo {
private volatile long remaining;
private long blockPoolUsed;
- private volatile BlockInfo blockList = null;
- private int numBlocks = 0;
+ private final FoldedTreeSet<BlockInfo> blocks = new FoldedTreeSet<>();
// The ID of the last full block report which updated this storage.
private long lastBlockReportId = 0;
@@ -207,7 +182,7 @@ public class DatanodeStorageInfo {
}
boolean areBlocksOnFailedStorage() {
- return getState() == State.FAILED && numBlocks != 0;
+ return getState() == State.FAILED && !blocks.isEmpty();
}
@VisibleForTesting
@@ -234,6 +209,36 @@ public class DatanodeStorageInfo {
long getBlockPoolUsed() {
return blockPoolUsed;
}
+ /**
+ * For use during startup. Expects block to be added in sorted order
+ * to enable fast insert in to the DatanodeStorageInfo
+ *
+ * @param b Block to add to DatanodeStorageInfo
+ * @param reportedBlock The reported replica
+ * @return Enum describing if block was added, replaced or already existed
+ */
+ public AddBlockResult addBlockInitial(BlockInfo b, Block reportedBlock) {
+ // First check whether the block belongs to a different storage
+ // on the same DN.
+ AddBlockResult result = AddBlockResult.ADDED;
+ DatanodeStorageInfo otherStorage =
+ b.findStorageInfo(getDatanodeDescriptor());
+
+ if (otherStorage != null) {
+ if (otherStorage != this) {
+ // The block belongs to a different storage. Remove it first.
+ otherStorage.removeBlock(b);
+ result = AddBlockResult.REPLACED;
+ } else {
+ // The block is already associated with this storage.
+ return AddBlockResult.ALREADY_EXIST;
+ }
+ }
+
+ b.addStorage(this, reportedBlock);
+ blocks.addSortedLast(b);
+ return result;
+ }
public AddBlockResult addBlock(BlockInfo b, Block reportedBlock) {
// First check whether the block belongs to a different storage
@@ -253,9 +258,8 @@ public class DatanodeStorageInfo {
}
}
- // add to the head of the data-node list
b.addStorage(this, reportedBlock);
- insertToList(b);
+ blocks.add(b);
return result;
}
@@ -263,45 +267,17 @@ public class DatanodeStorageInfo {
return addBlock(b, b);
}
- public void insertToList(BlockInfo b) {
- blockList = b.listInsert(blockList, this);
- numBlocks++;
- }
-
- public boolean removeBlock(BlockInfo b) {
- blockList = b.listRemove(blockList, this);
- if (b.removeStorage(this)) {
- numBlocks--;
- return true;
- } else {
- return false;
- }
+ boolean removeBlock(BlockInfo b) {
+ blocks.remove(b);
+ return b.removeStorage(this);
}
int numBlocks() {
- return numBlocks;
+ return blocks.size();
}
Iterator<BlockInfo> getBlockIterator() {
- return new BlockIterator(blockList);
- }
-
- /**
- * Move block to the head of the list of blocks belonging to the data-node.
- * @return the index of the head of the blockList
- */
- int moveBlockToHead(BlockInfo b, int curIndex, int headIndex) {
- blockList = b.moveBlockToHead(blockList, this, curIndex, headIndex);
- return curIndex;
- }
-
- /**
- * Used for testing only
- * @return the head of the blockList
- */
- @VisibleForTesting
- BlockInfo getBlockListHeadForTesting(){
- return blockList;
+ return blocks.iterator();
}
void updateState(StorageReport r) {
@@ -349,6 +325,27 @@ public class DatanodeStorageInfo {
false, capacity, dfsUsed, remaining, blockPoolUsed);
}
+ /**
+ * The fill ratio of the underlying TreeSet holding blocks.
+ *
+ * @return the fill ratio of the tree
+ */
+ public double treeSetFillRatio() {
+ return blocks.fillRatio();
+ }
+
+ /**
+ * Compact the underlying TreeSet holding blocks.
+ *
+ * @param timeout Maximum time to spend compacting the tree set in
+ * milliseconds.
+ *
+ * @return true if compaction completed, false if aborted
+ */
+ public boolean treeSetCompact(long timeout) {
+ return blocks.compact(timeout);
+ }
+
static Iterable<StorageType> toStorageTypes(
final Iterable<DatanodeStorageInfo> infos) {
return new Iterable<StorageType>() {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dd9ebf6e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
index 1b72961..bc4f2d8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
@@ -461,7 +461,7 @@ class BPServiceActor implements Runnable {
// Below split threshold, send all reports in a single message.
DatanodeCommand cmd = bpNamenode.blockReport(
bpRegistration, bpos.getBlockPoolId(), reports,
- new BlockReportContext(1, 0, reportId, fullBrLeaseId));
+ new BlockReportContext(1, 0, reportId, fullBrLeaseId, true));
numRPCs = 1;
numReportsSent = reports.length;
if (cmd != null) {
@@ -474,7 +474,7 @@ class BPServiceActor implements Runnable {
DatanodeCommand cmd = bpNamenode.blockReport(
bpRegistration, bpos.getBlockPoolId(), singleReport,
new BlockReportContext(reports.length, r, reportId,
- fullBrLeaseId));
+ fullBrLeaseId, true));
numReportsSent++;
numRPCs++;
if (cmd != null) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dd9ebf6e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ReplicaMap.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ReplicaMap.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ReplicaMap.java
index 6f0b8a7..34c9f2e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ReplicaMap.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ReplicaMap.java
@@ -18,13 +18,14 @@
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
import java.util.Collection;
+import java.util.Comparator;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
-import org.apache.hadoop.util.LightWeightResizableGSet;
+import org.apache.hadoop.hdfs.util.FoldedTreeSet;
/**
* Maintains the replica map.
@@ -33,9 +34,20 @@ class ReplicaMap {
// Object using which this class is synchronized
private final Object mutex;
- // Map of block pool Id to another map of block Id to ReplicaInfo.
- private final Map<String, LightWeightResizableGSet<Block, ReplicaInfo>> map =
- new HashMap<String, LightWeightResizableGSet<Block, ReplicaInfo>>();
+ // Map of block pool Id to a set of ReplicaInfo.
+ private final Map<String, FoldedTreeSet<ReplicaInfo>> map = new HashMap<>();
+
+ // Special comparator used to compare Long to Block ID in the TreeSet.
+ private static final Comparator<Object> LONG_AND_BLOCK_COMPARATOR
+ = new Comparator<Object>() {
+
+ @Override
+ public int compare(Object o1, Object o2) {
+ long lookup = (long) o1;
+ long stored = ((Block) o2).getBlockId();
+ return lookup > stored ? 1 : lookup < stored ? -1 : 0;
+ }
+ };
ReplicaMap(Object mutex) {
if (mutex == null) {
@@ -92,11 +104,14 @@ class ReplicaMap {
ReplicaInfo get(String bpid, long blockId) {
checkBlockPool(bpid);
synchronized(mutex) {
- LightWeightResizableGSet<Block, ReplicaInfo> m = map.get(bpid);
- return m != null ? m.get(new Block(blockId)) : null;
+ FoldedTreeSet<ReplicaInfo> set = map.get(bpid);
+ if (set == null) {
+ return null;
+ }
+ return set.get(blockId, LONG_AND_BLOCK_COMPARATOR);
}
}
-
+
/**
* Add a replica's meta information into the map
*
@@ -109,13 +124,13 @@ class ReplicaMap {
checkBlockPool(bpid);
checkBlock(replicaInfo);
synchronized(mutex) {
- LightWeightResizableGSet<Block, ReplicaInfo> m = map.get(bpid);
- if (m == null) {
+ FoldedTreeSet<ReplicaInfo> set = map.get(bpid);
+ if (set == null) {
// Add an entry for block pool if it does not exist already
- m = new LightWeightResizableGSet<Block, ReplicaInfo>();
- map.put(bpid, m);
+ set = new FoldedTreeSet<>();
+ map.put(bpid, set);
}
- return m.put(replicaInfo);
+ return set.addOrReplace(replicaInfo);
}
}
@@ -138,12 +153,13 @@ class ReplicaMap {
checkBlockPool(bpid);
checkBlock(block);
synchronized(mutex) {
- LightWeightResizableGSet<Block, ReplicaInfo> m = map.get(bpid);
- if (m != null) {
- ReplicaInfo replicaInfo = m.get(block);
+ FoldedTreeSet<ReplicaInfo> set = map.get(bpid);
+ if (set != null) {
+ ReplicaInfo replicaInfo =
+ set.get(block.getBlockId(), LONG_AND_BLOCK_COMPARATOR);
if (replicaInfo != null &&
block.getGenerationStamp() == replicaInfo.getGenerationStamp()) {
- return m.remove(block);
+ return set.removeAndGet(replicaInfo);
}
}
}
@@ -160,9 +176,9 @@ class ReplicaMap {
ReplicaInfo remove(String bpid, long blockId) {
checkBlockPool(bpid);
synchronized(mutex) {
- LightWeightResizableGSet<Block, ReplicaInfo> m = map.get(bpid);
- if (m != null) {
- return m.remove(new Block(blockId));
+ FoldedTreeSet<ReplicaInfo> set = map.get(bpid);
+ if (set != null) {
+ return set.removeAndGet(blockId, LONG_AND_BLOCK_COMPARATOR);
}
}
return null;
@@ -174,10 +190,9 @@ class ReplicaMap {
* @return the number of replicas in the map
*/
int size(String bpid) {
- LightWeightResizableGSet<Block, ReplicaInfo> m = null;
synchronized(mutex) {
- m = map.get(bpid);
- return m != null ? m.size() : 0;
+ FoldedTreeSet<ReplicaInfo> set = map.get(bpid);
+ return set != null ? set.size() : 0;
}
}
@@ -192,19 +207,17 @@ class ReplicaMap {
* @return a collection of the replicas belonging to the block pool
*/
Collection<ReplicaInfo> replicas(String bpid) {
- LightWeightResizableGSet<Block, ReplicaInfo> m = null;
- m = map.get(bpid);
- return m != null ? m.values() : null;
+ return map.get(bpid);
}
void initBlockPool(String bpid) {
checkBlockPool(bpid);
synchronized(mutex) {
- LightWeightResizableGSet<Block, ReplicaInfo> m = map.get(bpid);
- if (m == null) {
+ FoldedTreeSet<ReplicaInfo> set = map.get(bpid);
+ if (set == null) {
// Add an entry for block pool if it does not exist already
- m = new LightWeightResizableGSet<Block, ReplicaInfo>();
- map.put(bpid, m);
+ set = new FoldedTreeSet<>();
+ map.put(bpid, set);
}
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dd9ebf6e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockReportContext.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockReportContext.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockReportContext.java
index 5bcd719..94749e2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockReportContext.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockReportContext.java
@@ -52,12 +52,16 @@ public class BlockReportContext {
*/
private final long leaseId;
+ private final boolean sorted;
+
public BlockReportContext(int totalRpcs, int curRpc,
- long reportId, long leaseId) {
+ long reportId, long leaseId,
+ boolean sorted) {
this.totalRpcs = totalRpcs;
this.curRpc = curRpc;
this.reportId = reportId;
this.leaseId = leaseId;
+ this.sorted = sorted;
}
public int getTotalRpcs() {
@@ -75,4 +79,8 @@ public class BlockReportContext {
public long getLeaseId() {
return leaseId;
}
+
+ public boolean isSorted() {
+ return sorted;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dd9ebf6e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
index add4e73..b962855 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
@@ -131,7 +131,6 @@ public interface DatanodeProtocol {
* Each finalized block is represented as 3 longs. Each under-
* construction replica is represented as 4 longs.
* This is done instead of Block[] to reduce memory used by block reports.
- * @param reports report of blocks per storage
* @param context Context information for this block report.
*
* @return - the next command for DN to process.