You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by zh...@apache.org on 2015/03/16 21:18:47 UTC
[40/50] [abbrv] hadoop git commit: HDFS-7716. Erasure Coding: extend
BlockInfo to handle EC info. Contributed by Jing Zhao.
HDFS-7716. Erasure Coding: extend BlockInfo to handle EC info. Contributed by Jing Zhao.
Conflicts:
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/10488c9c
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/10488c9c
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/10488c9c
Branch: refs/heads/HDFS-7285
Commit: 10488c9c27e03ea6692e146319dec10dbc0b4fa9
Parents: c054b2a
Author: Jing Zhao <ji...@apache.org>
Authored: Tue Feb 10 17:54:10 2015 -0800
Committer: Zhe Zhang <zh...@apache.org>
Committed: Mon Mar 16 13:09:27 2015 -0700
----------------------------------------------------------------------
.../hadoop/hdfs/protocol/HdfsConstants.java | 1 +
.../server/blockmanagement/BlockCollection.java | 13 +-
.../server/blockmanagement/BlockIdManager.java | 7 +-
.../hdfs/server/blockmanagement/BlockInfo.java | 339 +++++++++++++++++
.../blockmanagement/BlockInfoContiguous.java | 363 +++----------------
.../BlockInfoContiguousUnderConstruction.java | 140 +------
.../blockmanagement/BlockInfoStriped.java | 179 +++++++++
.../server/blockmanagement/BlockManager.java | 188 +++++-----
.../hdfs/server/blockmanagement/BlocksMap.java | 46 +--
.../CacheReplicationMonitor.java | 10 +-
.../blockmanagement/DatanodeDescriptor.java | 22 +-
.../blockmanagement/DatanodeStorageInfo.java | 38 +-
.../ReplicaUnderConstruction.java | 119 ++++++
.../hdfs/server/namenode/FSDirectory.java | 4 +-
.../hdfs/server/namenode/FSNamesystem.java | 24 +-
.../hdfs/server/namenode/NamenodeFsck.java | 3 +-
.../snapshot/FSImageFormatPBSnapshot.java | 4 +-
.../org/apache/hadoop/hdfs/DFSTestUtil.java | 4 +-
.../server/blockmanagement/TestBlockInfo.java | 6 +-
.../blockmanagement/TestBlockInfoStriped.java | 219 +++++++++++
.../blockmanagement/TestBlockManager.java | 4 +-
.../blockmanagement/TestReplicationPolicy.java | 2 +-
22 files changed, 1127 insertions(+), 608 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/10488c9c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java
index de60b6e..245b630 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java
@@ -184,5 +184,6 @@ public class HdfsConstants {
public static final byte NUM_DATA_BLOCKS = 3;
public static final byte NUM_PARITY_BLOCKS = 2;
+ public static final long BLOCK_GROUP_INDEX_MASK = 15;
public static final byte MAX_BLOCKS_IN_GROUP = 16;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/10488c9c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockCollection.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockCollection.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockCollection.java
index 1547611..974cac3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockCollection.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockCollection.java
@@ -39,12 +39,12 @@ public interface BlockCollection {
public ContentSummary computeContentSummary();
/**
- * @return the number of blocks
+ * @return the number of blocks or block groups
*/
public int numBlocks();
/**
- * Get the blocks.
+ * Get the blocks or block groups.
*/
public BlockInfoContiguous[] getBlocks();
@@ -55,8 +55,8 @@ public interface BlockCollection {
public long getPreferredBlockSize();
/**
- * Get block replication for the collection
- * @return block replication value
+ * Get block replication for the collection.
+ * @return block replication value. Return 0 if the file is erasure coded.
*/
public short getBlockReplication();
@@ -71,7 +71,7 @@ public interface BlockCollection {
public String getName();
/**
- * Set the block at the given index.
+ * Set the block/block-group at the given index.
*/
public void setBlock(int index, BlockInfoContiguous blk);
@@ -79,7 +79,8 @@ public interface BlockCollection {
* Convert the last block of the collection to an under-construction block
* and set the locations.
*/
- public BlockInfoContiguousUnderConstruction setLastBlock(BlockInfoContiguous lastBlock,
+ public BlockInfoContiguousUnderConstruction setLastBlock(
+ BlockInfoContiguous lastBlock,
DatanodeStorageInfo[] targets) throws IOException;
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/10488c9c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockIdManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockIdManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockIdManager.java
index e7f8a05..3ae54ce 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockIdManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockIdManager.java
@@ -217,6 +217,11 @@ public class BlockIdManager {
}
public static long convertToGroupID(long id) {
- return id & (~(HdfsConstants.MAX_BLOCKS_IN_GROUP - 1));
+ return id & (~HdfsConstants.BLOCK_GROUP_INDEX_MASK);
+ }
+
+ public static int getBlockIndex(Block reportedBlock) {
+ return (int) (reportedBlock.getBlockId() &
+ HdfsConstants.BLOCK_GROUP_INDEX_MASK);
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/10488c9c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java
new file mode 100644
index 0000000..f19ad32
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java
@@ -0,0 +1,339 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.blockmanagement;
+
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
+import org.apache.hadoop.util.LightWeightGSet;
+
+import java.util.LinkedList;
+
+/**
+ * For a given block (or an erasure coding block group), BlockInfo class
+ * maintains 1) the {@link BlockCollection} it is part of, and 2) datanodes
+ * where the replicas of the block, or blocks belonging to the erasure coding
+ * block group, are stored.
+ */
+public abstract class BlockInfo extends Block
+ implements LightWeightGSet.LinkedElement {
+ private BlockCollection bc;
+
+ /** For implementing {@link LightWeightGSet.LinkedElement} interface */
+ private LightWeightGSet.LinkedElement nextLinkedElement;
+
+ /**
+ * This array contains triplets of references. For each i-th storage, the
+ * block belongs to triplets[3*i] is the reference to the
+ * {@link DatanodeStorageInfo} and triplets[3*i+1] and triplets[3*i+2] are
+ * references to the previous and the next blocks, respectively, in the list
+ * of blocks belonging to this storage.
+ *
+ * Using previous and next in Object triplets is done instead of a
+ * {@link LinkedList} list to efficiently use memory. With LinkedList the cost
+ * per replica is 42 bytes (LinkedList#Entry object per replica) versus 16
+ * bytes using the triplets.
+ */
+ protected Object[] triplets;
+
+ /**
+ * Construct an entry for blocksmap
+ * @param size the block's replication factor, or the total number of blocks
+ * in the block group
+ */
+ public BlockInfo(short size) {
+ this.triplets = new Object[3 * size];
+ this.bc = null;
+ }
+
+ public BlockInfo(Block blk, short size) {
+ super(blk);
+ this.triplets = new Object[3 * size];
+ this.bc = null;
+ }
+
+ public BlockCollection getBlockCollection() {
+ return bc;
+ }
+
+ public void setBlockCollection(BlockCollection bc) {
+ this.bc = bc;
+ }
+
+ public DatanodeDescriptor getDatanode(int index) {
+ DatanodeStorageInfo storage = getStorageInfo(index);
+ return storage == null ? null : storage.getDatanodeDescriptor();
+ }
+
+ DatanodeStorageInfo getStorageInfo(int index) {
+ assert this.triplets != null : "BlockInfo is not initialized";
+ assert index >= 0 && index*3 < triplets.length : "Index is out of bound";
+ return (DatanodeStorageInfo)triplets[index*3];
+ }
+
+ BlockInfo getPrevious(int index) {
+ assert this.triplets != null : "BlockInfo is not initialized";
+ assert index >= 0 && index*3+1 < triplets.length : "Index is out of bound";
+ return (BlockInfo) triplets[index*3+1];
+ }
+
+ BlockInfo getNext(int index) {
+ assert this.triplets != null : "BlockInfo is not initialized";
+ assert index >= 0 && index*3+2 < triplets.length : "Index is out of bound";
+ return (BlockInfo) triplets[index*3+2];
+ }
+
+ void setStorageInfo(int index, DatanodeStorageInfo storage) {
+ assert this.triplets != null : "BlockInfo is not initialized";
+ assert index >= 0 && index*3 < triplets.length : "Index is out of bound";
+ triplets[index*3] = storage;
+ }
+
+ /**
+ * Return the previous block on the block list for the datanode at
+ * position index. Set the previous block on the list to "to".
+ *
+ * @param index - the datanode index
+ * @param to - block to be set to previous on the list of blocks
+ * @return current previous block on the list of blocks
+ */
+ BlockInfo setPrevious(int index, BlockInfo to) {
+ assert this.triplets != null : "BlockInfo is not initialized";
+ assert index >= 0 && index*3+1 < triplets.length : "Index is out of bound";
+ BlockInfo info = (BlockInfo) triplets[index*3+1];
+ triplets[index*3+1] = to;
+ return info;
+ }
+
+ /**
+ * Return the next block on the block list for the datanode at
+ * position index. Set the next block on the list to "to".
+ *
+ * @param index - the datanode index
+ * @param to - block to be set to next on the list of blocks
+ * @return current next block on the list of blocks
+ */
+ BlockInfo setNext(int index, BlockInfo to) {
+ assert this.triplets != null : "BlockInfo is not initialized";
+ assert index >= 0 && index*3+2 < triplets.length : "Index is out of bound";
+ BlockInfo info = (BlockInfo) triplets[index*3+2];
+ triplets[index*3+2] = to;
+ return info;
+ }
+
+ public int getCapacity() {
+ assert this.triplets != null : "BlockInfo is not initialized";
+ assert triplets.length % 3 == 0 : "Malformed BlockInfo";
+ return triplets.length / 3;
+ }
+
+ /**
+ * Count the number of data-nodes the block currently belongs to (i.e., NN
+ * has received block reports from the DN).
+ */
+ public abstract int numNodes();
+
+ /**
+ * Add a {@link DatanodeStorageInfo} location for a block
+ * @param storage The storage to add
+ * @param reportedBlock The block reported from the datanode. This is only
+ * used by erasure coded blocks, this block's id contains
+ * information indicating the index of the block in the
+ * corresponding block group.
+ */
+ abstract boolean addStorage(DatanodeStorageInfo storage, Block reportedBlock);
+
+ /**
+ * Remove {@link DatanodeStorageInfo} location for a block
+ */
+ abstract boolean removeStorage(DatanodeStorageInfo storage);
+
+ /**
+ * Replace the current BlockInfo with the new one in corresponding
+ * DatanodeStorageInfo's linked list
+ */
+ abstract void replaceBlock(BlockInfo newBlock);
+
+ /**
+ * Find specified DatanodeDescriptor.
+ * @return index or -1 if not found.
+ */
+ boolean findDatanode(DatanodeDescriptor dn) {
+ int len = getCapacity();
+ for (int idx = 0; idx < len; idx++) {
+ DatanodeDescriptor cur = getDatanode(idx);
+ if(cur == dn) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Find specified DatanodeStorageInfo.
+ * @return DatanodeStorageInfo or null if not found.
+ */
+ DatanodeStorageInfo findStorageInfo(DatanodeDescriptor dn) {
+ int len = getCapacity();
+ for(int idx = 0; idx < len; idx++) {
+ DatanodeStorageInfo cur = getStorageInfo(idx);
+ if(cur != null && cur.getDatanodeDescriptor() == dn) {
+ return cur;
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Find specified DatanodeStorageInfo.
+ * @return index or -1 if not found.
+ */
+ int findStorageInfo(DatanodeStorageInfo storageInfo) {
+ int len = getCapacity();
+ for(int idx = 0; idx < len; idx++) {
+ DatanodeStorageInfo cur = getStorageInfo(idx);
+ if (cur == storageInfo) {
+ return idx;
+ }
+ }
+ return -1;
+ }
+
+ /**
+ * Insert this block into the head of the list of blocks
+ * related to the specified DatanodeStorageInfo.
+ * If the head is null then form a new list.
+ * @return current block as the new head of the list.
+ */
+ BlockInfo listInsert(BlockInfo head, DatanodeStorageInfo storage) {
+ int dnIndex = this.findStorageInfo(storage);
+ assert dnIndex >= 0 : "Data node is not found: current";
+ assert getPrevious(dnIndex) == null && getNext(dnIndex) == null :
+ "Block is already in the list and cannot be inserted.";
+ this.setPrevious(dnIndex, null);
+ this.setNext(dnIndex, head);
+ if (head != null) {
+ head.setPrevious(head.findStorageInfo(storage), this);
+ }
+ return this;
+ }
+
+ /**
+ * Remove this block from the list of blocks
+ * related to the specified DatanodeStorageInfo.
+ * If this block is the head of the list then return the next block as
+ * the new head.
+ * @return the new head of the list or null if the list becomes
+ * empy after deletion.
+ */
+ BlockInfo listRemove(BlockInfo head, DatanodeStorageInfo storage) {
+ if (head == null) {
+ return null;
+ }
+ int dnIndex = this.findStorageInfo(storage);
+ if (dnIndex < 0) { // this block is not on the data-node list
+ return head;
+ }
+
+ BlockInfo next = this.getNext(dnIndex);
+ BlockInfo prev = this.getPrevious(dnIndex);
+ this.setNext(dnIndex, null);
+ this.setPrevious(dnIndex, null);
+ if (prev != null) {
+ prev.setNext(prev.findStorageInfo(storage), next);
+ }
+ if (next != null) {
+ next.setPrevious(next.findStorageInfo(storage), prev);
+ }
+ if (this == head) { // removing the head
+ head = next;
+ }
+ return head;
+ }
+
+ /**
+ * Remove this block from the list of blocks related to the specified
+ * DatanodeDescriptor. Insert it into the head of the list of blocks.
+ *
+ * @return the new head of the list.
+ */
+ public BlockInfo moveBlockToHead(BlockInfo head, DatanodeStorageInfo storage,
+ int curIndex, int headIndex) {
+ if (head == this) {
+ return this;
+ }
+ BlockInfo next = this.setNext(curIndex, head);
+ BlockInfo prev = this.setPrevious(curIndex, null);
+
+ head.setPrevious(headIndex, this);
+ prev.setNext(prev.findStorageInfo(storage), next);
+ if (next != null) {
+ next.setPrevious(next.findStorageInfo(storage), prev);
+ }
+ return this;
+ }
+
+ /**
+ * BlockInfo represents a block that is not being constructed.
+ * In order to start modifying the block, the BlockInfo should be converted
+ * to {@link BlockInfoContiguousUnderConstruction}.
+ * @return {@link HdfsServerConstants.BlockUCState#COMPLETE}
+ */
+ public HdfsServerConstants.BlockUCState getBlockUCState() {
+ return HdfsServerConstants.BlockUCState.COMPLETE;
+ }
+
+ /**
+ * Is this block complete?
+ *
+ * @return true if the state of the block is
+ * {@link HdfsServerConstants.BlockUCState#COMPLETE}
+ */
+ public boolean isComplete() {
+ return getBlockUCState().equals(HdfsServerConstants.BlockUCState.COMPLETE);
+ }
+
+ @Override
+ public int hashCode() {
+ // Super implementation is sufficient
+ return super.hashCode();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ // Sufficient to rely on super's implementation
+ return (this == obj) || super.equals(obj);
+ }
+
+ @Override
+ public LightWeightGSet.LinkedElement getNext() {
+ return nextLinkedElement;
+ }
+
+ @Override
+ public void setNext(LightWeightGSet.LinkedElement next) {
+ this.nextLinkedElement = next;
+ }
+
+ static BlockInfo copyOf(BlockInfo b) {
+ if (b instanceof BlockInfoContiguous) {
+ return new BlockInfoContiguous((BlockInfoContiguous) b);
+ } else {
+ return new BlockInfoStriped((BlockInfoStriped) b);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/10488c9c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguous.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguous.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguous.java
index 48069c1..e54cba3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguous.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguous.java
@@ -17,148 +17,33 @@
*/
package org.apache.hadoop.hdfs.server.blockmanagement;
-import java.util.LinkedList;
-
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
-import org.apache.hadoop.util.LightWeightGSet;
/**
- * BlockInfo class maintains for a given block
- * the {@link BlockCollection} it is part of and datanodes where the replicas of
- * the block are stored.
+ * Subclass of {@link BlockInfo}, used for a block with replication scheme.
*/
@InterfaceAudience.Private
-public class BlockInfoContiguous extends Block
- implements LightWeightGSet.LinkedElement {
+public class BlockInfoContiguous extends BlockInfo {
public static final BlockInfoContiguous[] EMPTY_ARRAY = {};
- private BlockCollection bc;
-
- /** For implementing {@link LightWeightGSet.LinkedElement} interface */
- private LightWeightGSet.LinkedElement nextLinkedElement;
-
- /**
- * This array contains triplets of references. For each i-th storage, the
- * block belongs to triplets[3*i] is the reference to the
- * {@link DatanodeStorageInfo} and triplets[3*i+1] and triplets[3*i+2] are
- * references to the previous and the next blocks, respectively, in the list
- * of blocks belonging to this storage.
- *
- * Using previous and next in Object triplets is done instead of a
- * {@link LinkedList} list to efficiently use memory. With LinkedList the cost
- * per replica is 42 bytes (LinkedList#Entry object per replica) versus 16
- * bytes using the triplets.
- */
- private Object[] triplets;
-
- /**
- * Construct an entry for blocksmap
- * @param replication the block's replication factor
- */
- public BlockInfoContiguous(short replication) {
- this.triplets = new Object[3*replication];
- this.bc = null;
+ public BlockInfoContiguous(short size) {
+ super(size);
}
-
- public BlockInfoContiguous(Block blk, short replication) {
- super(blk);
- this.triplets = new Object[3*replication];
- this.bc = null;
+
+ public BlockInfoContiguous(Block blk, short size) {
+ super(blk, size);
}
/**
* Copy construction.
- * This is used to convert BlockInfoUnderConstruction
- * @param from BlockInfo to copy from.
+ * This is used to convert BlockReplicationInfoUnderConstruction
+ * @param from BlockReplicationInfo to copy from.
*/
protected BlockInfoContiguous(BlockInfoContiguous from) {
- this(from, from.bc.getBlockReplication());
- this.bc = from.bc;
- }
-
- public BlockCollection getBlockCollection() {
- return bc;
- }
-
- public void setBlockCollection(BlockCollection bc) {
- this.bc = bc;
- }
-
- public DatanodeDescriptor getDatanode(int index) {
- DatanodeStorageInfo storage = getStorageInfo(index);
- return storage == null ? null : storage.getDatanodeDescriptor();
- }
-
- DatanodeStorageInfo getStorageInfo(int index) {
- assert this.triplets != null : "BlockInfo is not initialized";
- assert index >= 0 && index*3 < triplets.length : "Index is out of bound";
- return (DatanodeStorageInfo)triplets[index*3];
- }
-
- private BlockInfoContiguous getPrevious(int index) {
- assert this.triplets != null : "BlockInfo is not initialized";
- assert index >= 0 && index*3+1 < triplets.length : "Index is out of bound";
- BlockInfoContiguous info = (BlockInfoContiguous)triplets[index*3+1];
- assert info == null ||
- info.getClass().getName().startsWith(BlockInfoContiguous.class.getName()) :
- "BlockInfo is expected at " + index*3;
- return info;
- }
-
- BlockInfoContiguous getNext(int index) {
- assert this.triplets != null : "BlockInfo is not initialized";
- assert index >= 0 && index*3+2 < triplets.length : "Index is out of bound";
- BlockInfoContiguous info = (BlockInfoContiguous)triplets[index*3+2];
- assert info == null || info.getClass().getName().startsWith(
- BlockInfoContiguous.class.getName()) :
- "BlockInfo is expected at " + index*3;
- return info;
- }
-
- private void setStorageInfo(int index, DatanodeStorageInfo storage) {
- assert this.triplets != null : "BlockInfo is not initialized";
- assert index >= 0 && index*3 < triplets.length : "Index is out of bound";
- triplets[index*3] = storage;
- }
-
- /**
- * Return the previous block on the block list for the datanode at
- * position index. Set the previous block on the list to "to".
- *
- * @param index - the datanode index
- * @param to - block to be set to previous on the list of blocks
- * @return current previous block on the list of blocks
- */
- private BlockInfoContiguous setPrevious(int index, BlockInfoContiguous to) {
- assert this.triplets != null : "BlockInfo is not initialized";
- assert index >= 0 && index*3+1 < triplets.length : "Index is out of bound";
- BlockInfoContiguous info = (BlockInfoContiguous)triplets[index*3+1];
- triplets[index*3+1] = to;
- return info;
- }
-
- /**
- * Return the next block on the block list for the datanode at
- * position index. Set the next block on the list to "to".
- *
- * @param index - the datanode index
- * @param to - block to be set to next on the list of blocks
- * * @return current next block on the list of blocks
- */
- private BlockInfoContiguous setNext(int index, BlockInfoContiguous to) {
- assert this.triplets != null : "BlockInfo is not initialized";
- assert index >= 0 && index*3+2 < triplets.length : "Index is out of bound";
- BlockInfoContiguous info = (BlockInfoContiguous)triplets[index*3+2];
- triplets[index*3+2] = to;
- return info;
- }
-
- public int getCapacity() {
- assert this.triplets != null : "BlockInfo is not initialized";
- assert triplets.length % 3 == 0 : "Malformed BlockInfo";
- return triplets.length / 3;
+ this(from, from.getBlockCollection().getBlockReplication());
+ this.setBlockCollection(from.getBlockCollection());
}
/**
@@ -168,9 +53,10 @@ public class BlockInfoContiguous extends Block
private int ensureCapacity(int num) {
assert this.triplets != null : "BlockInfo is not initialized";
int last = numNodes();
- if(triplets.length >= (last+num)*3)
+ if (triplets.length >= (last+num)*3) {
return last;
- /* Not enough space left. Create a new array. Should normally
+ }
+ /* Not enough space left. Create a new array. Should normally
* happen only when replication is manually increased by the user. */
Object[] old = triplets;
triplets = new Object[(last+num)*3];
@@ -178,23 +64,8 @@ public class BlockInfoContiguous extends Block
return last;
}
- /**
- * Count the number of data-nodes the block belongs to.
- */
- public int numNodes() {
- assert this.triplets != null : "BlockInfo is not initialized";
- assert triplets.length % 3 == 0 : "Malformed BlockInfo";
- for(int idx = getCapacity()-1; idx >= 0; idx--) {
- if(getDatanode(idx) != null)
- return idx+1;
- }
- return 0;
- }
-
- /**
- * Add a {@link DatanodeStorageInfo} location for a block
- */
- boolean addStorage(DatanodeStorageInfo storage) {
+ @Override
+ boolean addStorage(DatanodeStorageInfo storage, Block reportedBlock) {
// find the last null node
int lastNode = ensureCapacity(1);
setStorageInfo(lastNode, storage);
@@ -203,167 +74,53 @@ public class BlockInfoContiguous extends Block
return true;
}
- /**
- * Remove {@link DatanodeStorageInfo} location for a block
- */
+ @Override
boolean removeStorage(DatanodeStorageInfo storage) {
int dnIndex = findStorageInfo(storage);
- if(dnIndex < 0) // the node is not found
+ if (dnIndex < 0) { // the node is not found
return false;
- assert getPrevious(dnIndex) == null && getNext(dnIndex) == null :
- "Block is still in the list and must be removed first.";
+ }
+ assert getPrevious(dnIndex) == null && getNext(dnIndex) == null :
+ "Block is still in the list and must be removed first.";
// find the last not null node
- int lastNode = numNodes()-1;
- // replace current node triplet by the lastNode one
+ int lastNode = numNodes()-1;
+ // replace current node triplet by the lastNode one
setStorageInfo(dnIndex, getStorageInfo(lastNode));
- setNext(dnIndex, getNext(lastNode));
- setPrevious(dnIndex, getPrevious(lastNode));
+ setNext(dnIndex, getNext(lastNode));
+ setPrevious(dnIndex, getPrevious(lastNode));
// set the last triplet to null
setStorageInfo(lastNode, null);
- setNext(lastNode, null);
- setPrevious(lastNode, null);
+ setNext(lastNode, null);
+ setPrevious(lastNode, null);
return true;
}
- /**
- * Find specified DatanodeDescriptor.
- * @return index or -1 if not found.
- */
- boolean findDatanode(DatanodeDescriptor dn) {
- int len = getCapacity();
- for(int idx = 0; idx < len; idx++) {
- DatanodeDescriptor cur = getDatanode(idx);
- if(cur == dn) {
- return true;
- }
- if(cur == null) {
- break;
- }
- }
- return false;
- }
+ @Override
+ public int numNodes() {
+ assert this.triplets != null : "BlockInfo is not initialized";
+ assert triplets.length % 3 == 0 : "Malformed BlockInfo";
- /**
- * Find specified DatanodeStorageInfo.
- * @return DatanodeStorageInfo or null if not found.
- */
- DatanodeStorageInfo findStorageInfo(DatanodeDescriptor dn) {
- int len = getCapacity();
- for(int idx = 0; idx < len; idx++) {
- DatanodeStorageInfo cur = getStorageInfo(idx);
- if(cur == null)
- break;
- if(cur.getDatanodeDescriptor() == dn)
- return cur;
- }
- return null;
- }
-
- /**
- * Find specified DatanodeStorageInfo.
- * @return index or -1 if not found.
- */
- int findStorageInfo(DatanodeStorageInfo storageInfo) {
- int len = getCapacity();
- for(int idx = 0; idx < len; idx++) {
- DatanodeStorageInfo cur = getStorageInfo(idx);
- if (cur == storageInfo) {
- return idx;
- }
- if (cur == null) {
- break;
+ for (int idx = getCapacity()-1; idx >= 0; idx--) {
+ if (getDatanode(idx) != null) {
+ return idx + 1;
}
}
- return -1;
- }
-
- /**
- * Insert this block into the head of the list of blocks
- * related to the specified DatanodeStorageInfo.
- * If the head is null then form a new list.
- * @return current block as the new head of the list.
- */
- BlockInfoContiguous listInsert(BlockInfoContiguous head,
- DatanodeStorageInfo storage) {
- int dnIndex = this.findStorageInfo(storage);
- assert dnIndex >= 0 : "Data node is not found: current";
- assert getPrevious(dnIndex) == null && getNext(dnIndex) == null :
- "Block is already in the list and cannot be inserted.";
- this.setPrevious(dnIndex, null);
- this.setNext(dnIndex, head);
- if(head != null)
- head.setPrevious(head.findStorageInfo(storage), this);
- return this;
- }
-
- /**
- * Remove this block from the list of blocks
- * related to the specified DatanodeStorageInfo.
- * If this block is the head of the list then return the next block as
- * the new head.
- * @return the new head of the list or null if the list becomes
- * empy after deletion.
- */
- BlockInfoContiguous listRemove(BlockInfoContiguous head,
- DatanodeStorageInfo storage) {
- if(head == null)
- return null;
- int dnIndex = this.findStorageInfo(storage);
- if(dnIndex < 0) // this block is not on the data-node list
- return head;
-
- BlockInfoContiguous next = this.getNext(dnIndex);
- BlockInfoContiguous prev = this.getPrevious(dnIndex);
- this.setNext(dnIndex, null);
- this.setPrevious(dnIndex, null);
- if(prev != null)
- prev.setNext(prev.findStorageInfo(storage), next);
- if(next != null)
- next.setPrevious(next.findStorageInfo(storage), prev);
- if(this == head) // removing the head
- head = next;
- return head;
+ return 0;
}
- /**
- * Remove this block from the list of blocks related to the specified
- * DatanodeDescriptor. Insert it into the head of the list of blocks.
- *
- * @return the new head of the list.
- */
- public BlockInfoContiguous moveBlockToHead(BlockInfoContiguous head,
- DatanodeStorageInfo storage, int curIndex, int headIndex) {
- if (head == this) {
- return this;
- }
- BlockInfoContiguous next = this.setNext(curIndex, head);
- BlockInfoContiguous prev = this.setPrevious(curIndex, null);
-
- head.setPrevious(headIndex, this);
- prev.setNext(prev.findStorageInfo(storage), next);
- if (next != null) {
- next.setPrevious(next.findStorageInfo(storage), prev);
+ @Override
+ void replaceBlock(BlockInfo newBlock) {
+ assert newBlock instanceof BlockInfoContiguous;
+ for (int i = this.numNodes() - 1; i >= 0; i--) {
+ final DatanodeStorageInfo storage = this.getStorageInfo(i);
+ final boolean removed = storage.removeBlock(this);
+ assert removed : "currentBlock not found.";
+
+ final DatanodeStorageInfo.AddBlockResult result = storage.addBlock(
+ newBlock, newBlock);
+ assert result == DatanodeStorageInfo.AddBlockResult.ADDED :
+ "newBlock already exists.";
}
- return this;
- }
-
- /**
- * BlockInfo represents a block that is not being constructed.
- * In order to start modifying the block, the BlockInfo should be converted
- * to {@link BlockInfoContiguousUnderConstruction}.
- * @return {@link BlockUCState#COMPLETE}
- */
- public BlockUCState getBlockUCState() {
- return BlockUCState.COMPLETE;
- }
-
- /**
- * Is this block complete?
- *
- * @return true if the state of the block is {@link BlockUCState#COMPLETE}
- */
- public boolean isComplete() {
- return getBlockUCState().equals(BlockUCState.COMPLETE);
}
/**
@@ -375,38 +132,16 @@ public class BlockInfoContiguous extends Block
if(isComplete()) {
BlockInfoContiguousUnderConstruction ucBlock =
new BlockInfoContiguousUnderConstruction(this,
- getBlockCollection().getBlockReplication(), s, targets);
+ getBlockCollection().getBlockReplication(), s, targets);
ucBlock.setBlockCollection(getBlockCollection());
return ucBlock;
}
// the block is already under construction
BlockInfoContiguousUnderConstruction ucBlock =
- (BlockInfoContiguousUnderConstruction)this;
+ (BlockInfoContiguousUnderConstruction) this;
ucBlock.setBlockUCState(s);
ucBlock.setExpectedLocations(targets);
ucBlock.setBlockCollection(getBlockCollection());
return ucBlock;
}
-
- @Override
- public int hashCode() {
- // Super implementation is sufficient
- return super.hashCode();
- }
-
- @Override
- public boolean equals(Object obj) {
- // Sufficient to rely on super's implementation
- return (this == obj) || super.equals(obj);
- }
-
- @Override
- public LightWeightGSet.LinkedElement getNext() {
- return nextLinkedElement;
- }
-
- @Override
- public void setNext(LightWeightGSet.LinkedElement next) {
- this.nextLinkedElement = next;
- }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/10488c9c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguousUnderConstruction.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguousUnderConstruction.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguousUnderConstruction.java
index ae809a5..e75c5d7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguousUnderConstruction.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguousUnderConstruction.java
@@ -60,101 +60,6 @@ public class BlockInfoContiguousUnderConstruction extends BlockInfoContiguous {
private Block truncateBlock;
/**
- * ReplicaUnderConstruction contains information about replicas while
- * they are under construction.
- * The GS, the length and the state of the replica is as reported by
- * the data-node.
- * It is not guaranteed, but expected, that data-nodes actually have
- * corresponding replicas.
- */
- static class ReplicaUnderConstruction extends Block {
- private final DatanodeStorageInfo expectedLocation;
- private ReplicaState state;
- private boolean chosenAsPrimary;
-
- ReplicaUnderConstruction(Block block,
- DatanodeStorageInfo target,
- ReplicaState state) {
- super(block);
- this.expectedLocation = target;
- this.state = state;
- this.chosenAsPrimary = false;
- }
-
- /**
- * Expected block replica location as assigned when the block was allocated.
- * This defines the pipeline order.
- * It is not guaranteed, but expected, that the data-node actually has
- * the replica.
- */
- private DatanodeStorageInfo getExpectedStorageLocation() {
- return expectedLocation;
- }
-
- /**
- * Get replica state as reported by the data-node.
- */
- ReplicaState getState() {
- return state;
- }
-
- /**
- * Whether the replica was chosen for recovery.
- */
- boolean getChosenAsPrimary() {
- return chosenAsPrimary;
- }
-
- /**
- * Set replica state.
- */
- void setState(ReplicaState s) {
- state = s;
- }
-
- /**
- * Set whether this replica was chosen for recovery.
- */
- void setChosenAsPrimary(boolean chosenAsPrimary) {
- this.chosenAsPrimary = chosenAsPrimary;
- }
-
- /**
- * Is data-node the replica belongs to alive.
- */
- boolean isAlive() {
- return expectedLocation.getDatanodeDescriptor().isAlive;
- }
-
- @Override // Block
- public int hashCode() {
- return super.hashCode();
- }
-
- @Override // Block
- public boolean equals(Object obj) {
- // Sufficient to rely on super's implementation
- return (this == obj) || super.equals(obj);
- }
-
- @Override
- public String toString() {
- final StringBuilder b = new StringBuilder(50);
- appendStringTo(b);
- return b.toString();
- }
-
- @Override
- public void appendStringTo(StringBuilder sb) {
- sb.append("ReplicaUC[")
- .append(expectedLocation)
- .append("|")
- .append(state)
- .append("]");
- }
- }
-
- /**
* Create block and set its state to
* {@link BlockUCState#UNDER_CONSTRUCTION}.
*/
@@ -165,7 +70,8 @@ public class BlockInfoContiguousUnderConstruction extends BlockInfoContiguous {
/**
* Create a block that is currently being constructed.
*/
- public BlockInfoContiguousUnderConstruction(Block blk, short replication, BlockUCState state, DatanodeStorageInfo[] targets) {
+ public BlockInfoContiguousUnderConstruction(Block blk, short replication,
+ BlockUCState state, DatanodeStorageInfo[] targets) {
super(blk, replication);
assert getBlockUCState() != BlockUCState.COMPLETE :
"BlockInfoUnderConstruction cannot be in COMPLETE state";
@@ -191,10 +97,11 @@ public class BlockInfoContiguousUnderConstruction extends BlockInfoContiguous {
/** Set expected locations */
public void setExpectedLocations(DatanodeStorageInfo[] targets) {
int numLocations = targets == null ? 0 : targets.length;
- this.replicas = new ArrayList<ReplicaUnderConstruction>(numLocations);
- for(int i = 0; i < numLocations; i++)
- replicas.add(
- new ReplicaUnderConstruction(this, targets[i], ReplicaState.RBW));
+ this.replicas = new ArrayList<>(numLocations);
+ for(int i = 0; i < numLocations; i++) {
+ replicas.add(new ReplicaUnderConstruction(this, targets[i],
+ ReplicaState.RBW));
+ }
}
/**
@@ -204,8 +111,9 @@ public class BlockInfoContiguousUnderConstruction extends BlockInfoContiguous {
public DatanodeStorageInfo[] getExpectedStorageLocations() {
int numLocations = replicas == null ? 0 : replicas.size();
DatanodeStorageInfo[] storages = new DatanodeStorageInfo[numLocations];
- for(int i = 0; i < numLocations; i++)
+ for (int i = 0; i < numLocations; i++) {
storages[i] = replicas.get(i).getExpectedStorageLocation();
+ }
return storages;
}
@@ -293,17 +201,17 @@ public class BlockInfoContiguousUnderConstruction extends BlockInfoContiguous {
+ " No blocks found, lease removed.");
}
boolean allLiveReplicasTriedAsPrimary = true;
- for (int i = 0; i < replicas.size(); i++) {
+ for (ReplicaUnderConstruction replica : replicas) {
// Check if all replicas have been tried or not.
- if (replicas.get(i).isAlive()) {
- allLiveReplicasTriedAsPrimary =
- (allLiveReplicasTriedAsPrimary && replicas.get(i).getChosenAsPrimary());
+ if (replica.isAlive()) {
+ allLiveReplicasTriedAsPrimary = (allLiveReplicasTriedAsPrimary &&
+ replica.getChosenAsPrimary());
}
}
if (allLiveReplicasTriedAsPrimary) {
// Just set all the replicas to be chosen whether they are alive or not.
- for (int i = 0; i < replicas.size(); i++) {
- replicas.get(i).setChosenAsPrimary(false);
+ for (ReplicaUnderConstruction replica : replicas) {
+ replica.setChosenAsPrimary(false);
}
}
long mostRecentLastUpdate = 0;
@@ -315,7 +223,8 @@ public class BlockInfoContiguousUnderConstruction extends BlockInfoContiguous {
continue;
}
final ReplicaUnderConstruction ruc = replicas.get(i);
- final long lastUpdate = ruc.getExpectedStorageLocation().getDatanodeDescriptor().getLastUpdate();
+ final long lastUpdate = ruc.getExpectedStorageLocation()
+ .getDatanodeDescriptor().getLastUpdate();
if (lastUpdate > mostRecentLastUpdate) {
primaryNodeIndex = i;
primary = ruc;
@@ -323,7 +232,8 @@ public class BlockInfoContiguousUnderConstruction extends BlockInfoContiguous {
}
}
if (primary != null) {
- primary.getExpectedStorageLocation().getDatanodeDescriptor().addBlockToBeRecovered(this);
+ primary.getExpectedStorageLocation().getDatanodeDescriptor()
+ .addBlockToBeRecovered(this);
primary.setChosenAsPrimary(true);
NameNode.blockStateChangeLog.info(
"BLOCK* {} recovery started, primary={}", this, primary);
@@ -356,18 +266,6 @@ public class BlockInfoContiguousUnderConstruction extends BlockInfoContiguous {
replicas.add(new ReplicaUnderConstruction(block, storage, rState));
}
- @Override // BlockInfo
- // BlockInfoUnderConstruction participates in maps the same way as BlockInfo
- public int hashCode() {
- return super.hashCode();
- }
-
- @Override // BlockInfo
- public boolean equals(Object obj) {
- // Sufficient to rely on super's implementation
- return (this == obj) || super.equals(obj);
- }
-
@Override
public String toString() {
final StringBuilder b = new StringBuilder(100);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/10488c9c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStriped.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStriped.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStriped.java
new file mode 100644
index 0000000..5fff41e
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStriped.java
@@ -0,0 +1,179 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.blockmanagement;
+
+import org.apache.hadoop.hdfs.protocol.Block;
+
+/**
+ * Subclass of {@link BlockInfo}, presenting a block group in erasure coding.
+ *
+ * We still use triplets to store DatanodeStorageInfo for each block in the
+ * block group, as well as the previous/next block in the corresponding
+ * DatanodeStorageInfo. For a (m+k) block group, the first (m+k) triplet units
+ * are sorted and strictly mapped to the corresponding block.
+ *
+ * Normally each block belonging to group is stored in only one DataNode.
+ * However, it is possible that some block is over-replicated. Thus the triplet
+ * array's size can be larger than (m+k). Thus currently we use an extra byte
+ * array to record the block index for each triplet.
+ */
+public class BlockInfoStriped extends BlockInfo {
+ private final short dataBlockNum;
+ private final short parityBlockNum;
+ /**
+ * Always the same size with triplets. Record the block index for each triplet
+ * TODO: actually this is only necessary for over-replicated block. Thus can
+ * be further optimized to save memory usage.
+ */
+ private byte[] indices;
+
+ public BlockInfoStriped(Block blk, short dataBlockNum, short parityBlockNum) {
+ super(blk, (short) (dataBlockNum + parityBlockNum));
+ indices = new byte[dataBlockNum + parityBlockNum];
+ initIndices();
+ this.dataBlockNum = dataBlockNum;
+ this.parityBlockNum = parityBlockNum;
+ }
+
+ BlockInfoStriped(BlockInfoStriped b) {
+ this(b, b.dataBlockNum, b.parityBlockNum);
+ this.setBlockCollection(b.getBlockCollection());
+ }
+
+ private short getTotalBlockNum() {
+ return (short) (dataBlockNum + parityBlockNum);
+ }
+
+ private void initIndices() {
+ for (int i = 0; i < indices.length; i++) {
+ indices[i] = -1;
+ }
+ }
+
+ private int findSlot() {
+ int i = getTotalBlockNum();
+ for (; i < getCapacity(); i++) {
+ if (getStorageInfo(i) == null) {
+ return i;
+ }
+ }
+ // need to expand the triplet size
+ ensureCapacity(i + 1, true);
+ return i;
+ }
+
+ @Override
+ boolean addStorage(DatanodeStorageInfo storage, Block reportedBlock) {
+ int blockIndex = BlockIdManager.getBlockIndex(reportedBlock);
+ int index = blockIndex;
+ DatanodeStorageInfo old = getStorageInfo(index);
+ if (old != null && !old.equals(storage)) { // over replicated
+ // check if the storage has been stored
+ int i = findStorageInfo(storage);
+ if (i == -1) {
+ index = findSlot();
+ } else {
+ return true;
+ }
+ }
+ addStorage(storage, index, blockIndex);
+ return true;
+ }
+
+ private void addStorage(DatanodeStorageInfo storage, int index,
+ int blockIndex) {
+ setStorageInfo(index, storage);
+ setNext(index, null);
+ setPrevious(index, null);
+ indices[index] = (byte) blockIndex;
+ }
+
+ private int findStorageInfoFromEnd(DatanodeStorageInfo storage) {
+ final int len = getCapacity();
+ for(int idx = len - 1; idx >= 0; idx--) {
+ DatanodeStorageInfo cur = getStorageInfo(idx);
+ if (storage.equals(cur)) {
+ return idx;
+ }
+ }
+ return -1;
+ }
+
+ @Override
+ boolean removeStorage(DatanodeStorageInfo storage) {
+ int dnIndex = findStorageInfoFromEnd(storage);
+ if (dnIndex < 0) { // the node is not found
+ return false;
+ }
+ assert getPrevious(dnIndex) == null && getNext(dnIndex) == null :
+ "Block is still in the list and must be removed first.";
+ // set the triplet to null
+ setStorageInfo(dnIndex, null);
+ setNext(dnIndex, null);
+ setPrevious(dnIndex, null);
+ indices[dnIndex] = -1;
+ return true;
+ }
+
+ private void ensureCapacity(int totalSize, boolean keepOld) {
+ if (getCapacity() < totalSize) {
+ Object[] old = triplets;
+ byte[] oldIndices = indices;
+ triplets = new Object[totalSize * 3];
+ indices = new byte[totalSize];
+ initIndices();
+
+ if (keepOld) {
+ System.arraycopy(old, 0, triplets, 0, old.length);
+ System.arraycopy(oldIndices, 0, indices, 0, oldIndices.length);
+ }
+ }
+ }
+
+ @Override
+ void replaceBlock(BlockInfo newBlock) {
+ assert newBlock instanceof BlockInfoStriped;
+ BlockInfoStriped newBlockGroup = (BlockInfoStriped) newBlock;
+ final int size = getCapacity();
+ newBlockGroup.ensureCapacity(size, false);
+ for (int i = 0; i < size; i++) {
+ final DatanodeStorageInfo storage = this.getStorageInfo(i);
+ if (storage != null) {
+ final int blockIndex = indices[i];
+ final boolean removed = storage.removeBlock(this);
+ assert removed : "currentBlock not found.";
+
+ newBlockGroup.addStorage(storage, i, blockIndex);
+ storage.insertToList(newBlockGroup);
+ }
+ }
+ }
+
+ @Override
+ public int numNodes() {
+ assert this.triplets != null : "BlockInfo is not initialized";
+ assert triplets.length % 3 == 0 : "Malformed BlockInfo";
+ int num = 0;
+ for (int idx = getCapacity()-1; idx >= 0; idx--) {
+ if (getStorageInfo(idx) != null) {
+ num++;
+ }
+ }
+ return num;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/10488c9c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index ec07a5f..d9f2772 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -595,8 +595,8 @@ public class BlockManager {
* of replicas reported from data-nodes.
*/
private static boolean commitBlock(
- final BlockInfoContiguousUnderConstruction block, final Block commitBlock)
- throws IOException {
+ final BlockInfoContiguousUnderConstruction block,
+ final Block commitBlock) throws IOException {
if (block.getBlockUCState() == BlockUCState.COMMITTED)
return false;
assert block.getNumBytes() <= commitBlock.getNumBytes() :
@@ -627,7 +627,7 @@ public class BlockManager {
return false; // already completed (e.g. by syncBlock)
final boolean b = commitBlock(
- (BlockInfoContiguousUnderConstruction) lastBlock, commitBlock);
+ (BlockInfoContiguousUnderConstruction)lastBlock, commitBlock);
if(countNodes(lastBlock).liveReplicas() >= minReplication)
completeBlock(bc, bc.numBlocks()-1, false);
return b;
@@ -640,15 +640,16 @@ public class BlockManager {
* @throws IOException if the block does not have at least a minimal number
* of replicas reported from data-nodes.
*/
- private BlockInfoContiguous completeBlock(final BlockCollection bc,
+ private BlockInfo completeBlock(final BlockCollection bc,
final int blkIndex, boolean force) throws IOException {
if(blkIndex < 0)
return null;
BlockInfoContiguous curBlock = bc.getBlocks()[blkIndex];
- if(curBlock.isComplete())
+ if (curBlock.isComplete())
return curBlock;
+ // TODO: support BlockInfoStripedUC
BlockInfoContiguousUnderConstruction ucBlock =
- (BlockInfoContiguousUnderConstruction) curBlock;
+ (BlockInfoContiguousUnderConstruction)curBlock;
int numNodes = ucBlock.numNodes();
if (!force && numNodes < minReplication)
throw new IOException("Cannot complete block: " +
@@ -674,13 +675,15 @@ public class BlockManager {
return blocksMap.replaceBlock(completeBlock);
}
- private BlockInfoContiguous completeBlock(final BlockCollection bc,
- final BlockInfoContiguous block, boolean force) throws IOException {
+ // TODO: support BlockInfoStrippedUC
+ private BlockInfo completeBlock(final BlockCollection bc,
+ final BlockInfo block, boolean force) throws IOException {
BlockInfoContiguous[] fileBlocks = bc.getBlocks();
- for(int idx = 0; idx < fileBlocks.length; idx++)
- if(fileBlocks[idx] == block) {
+ for (int idx = 0; idx < fileBlocks.length; idx++) {
+ if (fileBlocks[idx] == block) {
return completeBlock(bc, idx, force);
}
+ }
return block;
}
@@ -689,7 +692,7 @@ public class BlockManager {
* regardless of whether enough replicas are present. This is necessary
* when tailing edit logs as a Standby.
*/
- public BlockInfoContiguous forceCompleteBlock(final BlockCollection bc,
+ public BlockInfo forceCompleteBlock(final BlockCollection bc,
final BlockInfoContiguousUnderConstruction block) throws IOException {
block.commitBlock(block);
return completeBlock(bc, block, true);
@@ -721,8 +724,8 @@ public class BlockManager {
DatanodeStorageInfo[] targets = getStorages(oldBlock);
- BlockInfoContiguousUnderConstruction ucBlock =
- bc.setLastBlock(oldBlock, targets);
+ BlockInfoContiguousUnderConstruction ucBlock = bc.setLastBlock(oldBlock,
+ targets);
blocksMap.replaceBlock(ucBlock);
// Remove block from replication queue.
@@ -1022,7 +1025,7 @@ public class BlockManager {
if(numBlocks == 0) {
return new BlocksWithLocations(new BlockWithLocations[0]);
}
- Iterator<BlockInfoContiguous> iter = node.getBlockIterator();
+ Iterator<BlockInfo> iter = node.getBlockIterator();
int startBlock = DFSUtil.getRandom().nextInt(numBlocks); // starting from a random block
// skip blocks
for(int i=0; i<startBlock; i++) {
@@ -1030,7 +1033,7 @@ public class BlockManager {
}
List<BlockWithLocations> results = new ArrayList<BlockWithLocations>();
long totalSize = 0;
- BlockInfoContiguous curBlock;
+ BlockInfo curBlock;
while(totalSize<size && iter.hasNext()) {
curBlock = iter.next();
if(!curBlock.isComplete()) continue;
@@ -1129,7 +1132,8 @@ public class BlockManager {
public void findAndMarkBlockAsCorrupt(final ExtendedBlock blk,
final DatanodeInfo dn, String storageID, String reason) throws IOException {
assert namesystem.hasWriteLock();
- final BlockInfoContiguous storedBlock = getStoredBlock(blk.getLocalBlock());
+ final Block reportedBlock = blk.getLocalBlock();
+ final BlockInfo storedBlock = getStoredBlock(reportedBlock);
if (storedBlock == null) {
// Check if the replica is in the blockMap, if not
// ignore the request for now. This could happen when BlockScanner
@@ -1146,7 +1150,7 @@ public class BlockManager {
+ ") does not exist");
}
- markBlockAsCorrupt(new BlockToMarkCorrupt(storedBlock,
+ markBlockAsCorrupt(new BlockToMarkCorrupt(reportedBlock, storedBlock,
blk.getGenerationStamp(), reason, Reason.CORRUPTION_REPORTED),
storageID == null ? null : node.getStorageInfo(storageID),
node);
@@ -1172,7 +1176,7 @@ public class BlockManager {
// Add replica to the data-node if it is not already there
if (storageInfo != null) {
- storageInfo.addBlock(b.stored);
+ storageInfo.addBlock(b.stored, b.reportedBlock);
}
// Add this replica to corruptReplicas Map
@@ -1717,41 +1721,55 @@ public class BlockManager {
this.reportedState = reportedState;
}
}
-
+
+ private static class BlockInfoToAdd {
+ final BlockInfo stored;
+ final Block reported;
+
+ BlockInfoToAdd(BlockInfo stored, Block reported) {
+ this.stored = stored;
+ this.reported = reported;
+ }
+ }
+
/**
* BlockToMarkCorrupt is used to build the "toCorrupt" list, which is a
* list of blocks that should be considered corrupt due to a block report.
*/
private static class BlockToMarkCorrupt {
/** The corrupted block in a datanode. */
- final BlockInfoContiguous corrupted;
+ final BlockInfo corrupted;
/** The corresponding block stored in the BlockManager. */
- final BlockInfoContiguous stored;
+ final BlockInfo stored;
+ /** The block reported from a datanode */
+ final Block reportedBlock;
/** The reason to mark corrupt. */
final String reason;
/** The reason code to be stored */
final Reason reasonCode;
- BlockToMarkCorrupt(BlockInfoContiguous corrupted,
- BlockInfoContiguous stored, String reason,
- Reason reasonCode) {
+ BlockToMarkCorrupt(Block reported, BlockInfo corrupted,
+ BlockInfo stored, String reason, Reason reasonCode) {
+ Preconditions.checkNotNull(reported, "reported is null");
Preconditions.checkNotNull(corrupted, "corrupted is null");
Preconditions.checkNotNull(stored, "stored is null");
+ this.reportedBlock = reported;
this.corrupted = corrupted;
this.stored = stored;
this.reason = reason;
this.reasonCode = reasonCode;
}
- BlockToMarkCorrupt(BlockInfoContiguous stored, String reason,
+ BlockToMarkCorrupt(Block reported, BlockInfo stored, String reason,
Reason reasonCode) {
- this(stored, stored, reason, reasonCode);
+ this(reported, stored, stored, reason, reasonCode);
}
- BlockToMarkCorrupt(BlockInfoContiguous stored, long gs, String reason,
- Reason reasonCode) {
- this(new BlockInfoContiguous(stored), stored, reason, reasonCode);
+ BlockToMarkCorrupt(Block reported, BlockInfo stored, long gs,
+ String reason, Reason reasonCode) {
+ this(reported, BlockInfo.copyOf(stored), stored, reason,
+ reasonCode);
//the corrupted block in datanode has a different generation stamp
corrupted.setGenerationStamp(gs);
}
@@ -1876,7 +1894,7 @@ public class BlockManager {
break;
}
- BlockInfoContiguous bi = getStoredBlock(b);
+ BlockInfo bi = getStoredBlock(b);
if (bi == null) {
if (LOG.isDebugEnabled()) {
LOG.debug("BLOCK* rescanPostponedMisreplicatedBlocks: " +
@@ -1916,7 +1934,7 @@ public class BlockManager {
// Modify the (block-->datanode) map, according to the difference
// between the old and new block report.
//
- Collection<BlockInfoContiguous> toAdd = new LinkedList<BlockInfoContiguous>();
+ Collection<BlockInfoToAdd> toAdd = new LinkedList<>();
Collection<Block> toRemove = new TreeSet<Block>();
Collection<Block> toInvalidate = new LinkedList<Block>();
Collection<BlockToMarkCorrupt> toCorrupt = new LinkedList<BlockToMarkCorrupt>();
@@ -1933,8 +1951,9 @@ public class BlockManager {
removeStoredBlock(b, node);
}
int numBlocksLogged = 0;
- for (BlockInfoContiguous b : toAdd) {
- addStoredBlock(b, storageInfo, null, numBlocksLogged < maxNumBlocksToLog);
+ for (BlockInfoToAdd b : toAdd) {
+ addStoredBlock(b.stored, b.reported, storageInfo, null,
+ numBlocksLogged < maxNumBlocksToLog);
numBlocksLogged++;
}
if (numBlocksLogged > maxNumBlocksToLog) {
@@ -1979,7 +1998,7 @@ public class BlockManager {
continue;
}
- BlockInfoContiguous storedBlock = getStoredBlock(iblk);
+ BlockInfo storedBlock = getStoredBlock(iblk);
// If block does not belong to any file, we are done.
if (storedBlock == null) continue;
@@ -2002,7 +2021,7 @@ public class BlockManager {
// If block is under construction, add this replica to its list
if (isBlockUnderConstruction(storedBlock, ucState, reportedState)) {
- ((BlockInfoContiguousUnderConstruction)storedBlock)
+ ((BlockInfoContiguousUnderConstruction) storedBlock)
.addReplicaIfNotPresent(storageInfo, iblk, reportedState);
// OpenFileBlocks only inside snapshots also will be added to safemode
// threshold. So we need to update such blocks to safemode
@@ -2017,14 +2036,14 @@ public class BlockManager {
}
//add replica if appropriate
if (reportedState == ReplicaState.FINALIZED) {
- addStoredBlockImmediate(storedBlock, storageInfo);
+ addStoredBlockImmediate(storedBlock, iblk, storageInfo);
}
}
}
private void reportDiff(DatanodeStorageInfo storageInfo,
BlockListAsLongs newReport,
- Collection<BlockInfoContiguous> toAdd, // add to DatanodeDescriptor
+ Collection<BlockInfoToAdd> toAdd, // add to DatanodeDescriptor
Collection<Block> toRemove, // remove from DatanodeDescriptor
Collection<Block> toInvalidate, // should be removed from DN
Collection<BlockToMarkCorrupt> toCorrupt, // add to corrupt replicas list
@@ -2032,8 +2051,10 @@ public class BlockManager {
// place a delimiter in the list which separates blocks
// that have been reported from those that have not
- BlockInfoContiguous delimiter = new BlockInfoContiguous(new Block(), (short) 1);
- AddBlockResult result = storageInfo.addBlock(delimiter);
+ Block delimiterBlock = new Block();
+ BlockInfoContiguous delimiter = new BlockInfoContiguous(delimiterBlock,
+ (short) 1);
+ AddBlockResult result = storageInfo.addBlock(delimiter, delimiterBlock);
assert result == AddBlockResult.ADDED
: "Delimiting block cannot be present in the node";
int headIndex = 0; //currently the delimiter is in the head of the list
@@ -2045,7 +2066,7 @@ public class BlockManager {
// scan the report and process newly reported blocks
for (BlockReportReplica iblk : newReport) {
ReplicaState iState = iblk.getState();
- BlockInfoContiguous storedBlock = processReportedBlock(storageInfo,
+ BlockInfo storedBlock = processReportedBlock(storageInfo,
iblk, iState, toAdd, toInvalidate, toCorrupt, toUC);
// move block to the head of the list
@@ -2057,8 +2078,7 @@ public class BlockManager {
// collect blocks that have not been reported
// all of them are next to the delimiter
- Iterator<BlockInfoContiguous> it =
- storageInfo.new BlockIterator(delimiter.getNext(0));
+ Iterator<BlockInfo> it = storageInfo.new BlockIterator(delimiter.getNext(0));
while(it.hasNext())
toRemove.add(it.next());
storageInfo.removeBlock(delimiter);
@@ -2095,10 +2115,10 @@ public class BlockManager {
* @return the up-to-date stored block, if it should be kept.
* Otherwise, null.
*/
- private BlockInfoContiguous processReportedBlock(
+ private BlockInfo processReportedBlock(
final DatanodeStorageInfo storageInfo,
final Block block, final ReplicaState reportedState,
- final Collection<BlockInfoContiguous> toAdd,
+ final Collection<BlockInfoToAdd> toAdd,
final Collection<Block> toInvalidate,
final Collection<BlockToMarkCorrupt> toCorrupt,
final Collection<StatefulBlockInfo> toUC) {
@@ -2119,7 +2139,7 @@ public class BlockManager {
}
// find block by blockId
- BlockInfoContiguous storedBlock = getStoredBlock(block);
+ BlockInfo storedBlock = getStoredBlock(block);
if(storedBlock == null) {
// If blocksMap does not contain reported block id,
// the replica should be removed from the data-node.
@@ -2173,7 +2193,7 @@ public class BlockManager {
if (reportedState == ReplicaState.FINALIZED
&& (storedBlock.findStorageInfo(storageInfo) == -1 ||
corruptReplicas.isReplicaCorrupt(storedBlock, dn))) {
- toAdd.add(storedBlock);
+ toAdd.add(new BlockInfoToAdd(storedBlock, block));
}
return storedBlock;
}
@@ -2251,7 +2271,7 @@ public class BlockManager {
*/
private BlockToMarkCorrupt checkReplicaCorrupt(
Block reported, ReplicaState reportedState,
- BlockInfoContiguous storedBlock, BlockUCState ucState,
+ BlockInfo storedBlock, BlockUCState ucState,
DatanodeDescriptor dn) {
switch(reportedState) {
case FINALIZED:
@@ -2260,12 +2280,12 @@ public class BlockManager {
case COMMITTED:
if (storedBlock.getGenerationStamp() != reported.getGenerationStamp()) {
final long reportedGS = reported.getGenerationStamp();
- return new BlockToMarkCorrupt(storedBlock, reportedGS,
+ return new BlockToMarkCorrupt(reported, storedBlock, reportedGS,
"block is " + ucState + " and reported genstamp " + reportedGS
+ " does not match genstamp in block map "
+ storedBlock.getGenerationStamp(), Reason.GENSTAMP_MISMATCH);
} else if (storedBlock.getNumBytes() != reported.getNumBytes()) {
- return new BlockToMarkCorrupt(storedBlock,
+ return new BlockToMarkCorrupt(reported, storedBlock,
"block is " + ucState + " and reported length " +
reported.getNumBytes() + " does not match " +
"length in block map " + storedBlock.getNumBytes(),
@@ -2276,8 +2296,8 @@ public class BlockManager {
case UNDER_CONSTRUCTION:
if (storedBlock.getGenerationStamp() > reported.getGenerationStamp()) {
final long reportedGS = reported.getGenerationStamp();
- return new BlockToMarkCorrupt(storedBlock, reportedGS, "block is "
- + ucState + " and reported state " + reportedState
+ return new BlockToMarkCorrupt(reported, storedBlock, reportedGS,
+ "block is " + ucState + " and reported state " + reportedState
+ ", But reported genstamp " + reportedGS
+ " does not match genstamp in block map "
+ storedBlock.getGenerationStamp(), Reason.GENSTAMP_MISMATCH);
@@ -2292,7 +2312,7 @@ public class BlockManager {
return null; // not corrupt
} else if (storedBlock.getGenerationStamp() != reported.getGenerationStamp()) {
final long reportedGS = reported.getGenerationStamp();
- return new BlockToMarkCorrupt(storedBlock, reportedGS,
+ return new BlockToMarkCorrupt(reported, storedBlock, reportedGS,
"reported " + reportedState + " replica with genstamp " + reportedGS
+ " does not match COMPLETE block's genstamp in block map "
+ storedBlock.getGenerationStamp(), Reason.GENSTAMP_MISMATCH);
@@ -2307,7 +2327,7 @@ public class BlockManager {
"complete with the same genstamp");
return null;
} else {
- return new BlockToMarkCorrupt(storedBlock,
+ return new BlockToMarkCorrupt(reported, storedBlock,
"reported replica has invalid state " + reportedState,
Reason.INVALID_STATE);
}
@@ -2320,11 +2340,12 @@ public class BlockManager {
" on " + dn + " size " + storedBlock.getNumBytes();
// log here at WARN level since this is really a broken HDFS invariant
LOG.warn(msg);
- return new BlockToMarkCorrupt(storedBlock, msg, Reason.INVALID_STATE);
+ return new BlockToMarkCorrupt(reported, storedBlock, msg,
+ Reason.INVALID_STATE);
}
}
- private boolean isBlockUnderConstruction(BlockInfoContiguous storedBlock,
+ private boolean isBlockUnderConstruction(BlockInfo storedBlock,
BlockUCState ucState, ReplicaState reportedState) {
switch(reportedState) {
case FINALIZED:
@@ -2353,7 +2374,7 @@ public class BlockManager {
if (ucBlock.reportedState == ReplicaState.FINALIZED &&
!block.findDatanode(storageInfo.getDatanodeDescriptor())) {
- addStoredBlock(block, storageInfo, null, true);
+ addStoredBlock(block, ucBlock.reportedBlock, storageInfo, null, true);
}
}
@@ -2368,18 +2389,18 @@ public class BlockManager {
*
* @throws IOException
*/
- private void addStoredBlockImmediate(BlockInfoContiguous storedBlock,
+ private void addStoredBlockImmediate(BlockInfo storedBlock, Block reported,
DatanodeStorageInfo storageInfo)
throws IOException {
assert (storedBlock != null && namesystem.hasWriteLock());
if (!namesystem.isInStartupSafeMode()
|| namesystem.isPopulatingReplQueues()) {
- addStoredBlock(storedBlock, storageInfo, null, false);
+ addStoredBlock(storedBlock, reported, storageInfo, null, false);
return;
}
// just add it
- storageInfo.addBlock(storedBlock);
+ storageInfo.addBlock(storedBlock, reported);
// Now check for completion of blocks and safe block count
int numCurrentReplica = countLiveNodes(storedBlock);
@@ -2400,13 +2421,14 @@ public class BlockManager {
* needed replications if this takes care of the problem.
* @return the block that is stored in blockMap.
*/
- private Block addStoredBlock(final BlockInfoContiguous block,
+ private Block addStoredBlock(final BlockInfo block,
+ final Block reportedBlock,
DatanodeStorageInfo storageInfo,
DatanodeDescriptor delNodeHint,
boolean logEveryBlock)
throws IOException {
assert block != null && namesystem.hasWriteLock();
- BlockInfoContiguous storedBlock;
+ BlockInfo storedBlock;
DatanodeDescriptor node = storageInfo.getDatanodeDescriptor();
if (block instanceof BlockInfoContiguousUnderConstruction) {
//refresh our copy in case the block got completed in another thread
@@ -2427,7 +2449,7 @@ public class BlockManager {
assert bc != null : "Block must belong to a file";
// add block to the datanode
- AddBlockResult result = storageInfo.addBlock(storedBlock);
+ AddBlockResult result = storageInfo.addBlock(storedBlock, reportedBlock);
int curReplicaDelta;
if (result == AddBlockResult.ADDED) {
@@ -2502,13 +2524,13 @@ public class BlockManager {
storedBlock + "blockMap has " + numCorruptNodes +
" but corrupt replicas map has " + corruptReplicasCount);
}
- if ((corruptReplicasCount > 0) && (numLiveReplicas >= fileReplication))
- invalidateCorruptReplicas(storedBlock);
+ if ((corruptReplicasCount > 0) && (numLiveReplicas >= fileReplication)) {
+ invalidateCorruptReplicas(storedBlock, reportedBlock);
+ }
return storedBlock;
}
- private void logAddStoredBlock(BlockInfoContiguous storedBlock,
- DatanodeDescriptor node) {
+ private void logAddStoredBlock(BlockInfo storedBlock, DatanodeDescriptor node) {
if (!blockLog.isInfoEnabled()) {
return;
}
@@ -2535,7 +2557,7 @@ public class BlockManager {
*
* @param blk Block whose corrupt replicas need to be invalidated
*/
- private void invalidateCorruptReplicas(BlockInfoContiguous blk) {
+ private void invalidateCorruptReplicas(BlockInfo blk, Block reported) {
Collection<DatanodeDescriptor> nodes = corruptReplicas.getNodes(blk);
boolean removedFromBlocksMap = true;
if (nodes == null)
@@ -2545,7 +2567,7 @@ public class BlockManager {
DatanodeDescriptor[] nodesCopy = nodes.toArray(new DatanodeDescriptor[0]);
for (DatanodeDescriptor node : nodesCopy) {
try {
- if (!invalidateBlock(new BlockToMarkCorrupt(blk, null,
+ if (!invalidateBlock(new BlockToMarkCorrupt(reported, blk, null,
Reason.ANY), node)) {
removedFromBlocksMap = false;
}
@@ -2614,7 +2636,7 @@ public class BlockManager {
long nrInvalid = 0, nrOverReplicated = 0;
long nrUnderReplicated = 0, nrPostponed = 0, nrUnderConstruction = 0;
long startTimeMisReplicatedScan = Time.now();
- Iterator<BlockInfoContiguous> blocksItr = blocksMap.getBlocks().iterator();
+ Iterator<BlockInfo> blocksItr = blocksMap.getBlocks().iterator();
long totalBlocks = blocksMap.size();
replicationQueuesInitProgress = 0;
long totalProcessed = 0;
@@ -2626,7 +2648,7 @@ public class BlockManager {
namesystem.writeLockInterruptibly();
try {
while (processed < numBlocksPerIteration && blocksItr.hasNext()) {
- BlockInfoContiguous block = blocksItr.next();
+ BlockInfo block = blocksItr.next();
MisReplicationResult res = processMisReplicatedBlock(block);
if (LOG.isTraceEnabled()) {
LOG.trace("block " + block + ": " + res);
@@ -2700,7 +2722,7 @@ public class BlockManager {
* appropriate queues if necessary, and returns a result code indicating
* what happened with it.
*/
- private MisReplicationResult processMisReplicatedBlock(BlockInfoContiguous block) {
+ private MisReplicationResult processMisReplicatedBlock(BlockInfo block) {
BlockCollection bc = block.getBlockCollection();
if (bc == null) {
// block does not belong to any file
@@ -3029,14 +3051,14 @@ public class BlockManager {
ReplicaState reportedState, DatanodeDescriptor delHintNode)
throws IOException {
// blockReceived reports a finalized block
- Collection<BlockInfoContiguous> toAdd = new LinkedList<BlockInfoContiguous>();
+ Collection<BlockInfoToAdd> toAdd = new LinkedList<>();
Collection<Block> toInvalidate = new LinkedList<Block>();
Collection<BlockToMarkCorrupt> toCorrupt = new LinkedList<BlockToMarkCorrupt>();
Collection<StatefulBlockInfo> toUC = new LinkedList<StatefulBlockInfo>();
final DatanodeDescriptor node = storageInfo.getDatanodeDescriptor();
- processReportedBlock(storageInfo, block, reportedState,
- toAdd, toInvalidate, toCorrupt, toUC);
+ processReportedBlock(storageInfo, block, reportedState, toAdd, toInvalidate,
+ toCorrupt, toUC);
// the block is only in one of the to-do lists
// if it is in none then data-node already has it
assert toUC.size() + toAdd.size() + toInvalidate.size() + toCorrupt.size() <= 1
@@ -3046,8 +3068,9 @@ public class BlockManager {
addStoredBlockUnderConstruction(b, storageInfo);
}
long numBlocksLogged = 0;
- for (BlockInfoContiguous b : toAdd) {
- addStoredBlock(b, storageInfo, delHintNode, numBlocksLogged < maxNumBlocksToLog);
+ for (BlockInfoToAdd b : toAdd) {
+ addStoredBlock(b.stored, b.reported, storageInfo, delHintNode,
+ numBlocksLogged < maxNumBlocksToLog);
numBlocksLogged++;
}
if (numBlocksLogged > maxNumBlocksToLog) {
@@ -3170,7 +3193,7 @@ public class BlockManager {
* @param b - the block being tested
* @return count of live nodes for this block
*/
- int countLiveNodes(BlockInfoContiguous b) {
+ int countLiveNodes(BlockInfo b) {
if (!namesystem.isInStartupSafeMode()) {
return countNodes(b).liveReplicas();
}
@@ -3244,7 +3267,7 @@ public class BlockManager {
return blocksMap.size();
}
- public DatanodeStorageInfo[] getStorages(BlockInfoContiguous block) {
+ public DatanodeStorageInfo[] getStorages(BlockInfo block) {
final DatanodeStorageInfo[] storages = new DatanodeStorageInfo[block.numNodes()];
int i = 0;
for(DatanodeStorageInfo s : blocksMap.getStorages(block)) {
@@ -3274,8 +3297,8 @@ public class BlockManager {
}
}
- public BlockInfoContiguous getStoredBlock(Block block) {
- BlockInfoContiguous info = null;
+ public BlockInfo getStoredBlock(Block block) {
+ BlockInfo info = null;
if (BlockIdManager.isStripedBlockID(block.getBlockId())) {
info = blocksMap.getStoredBlock(
new Block(BlockIdManager.convertToGroupID(block.getBlockId())));
@@ -3432,7 +3455,8 @@ public class BlockManager {
public BlockInfoContiguous addBlockCollection(BlockInfoContiguous block,
BlockCollection bc) {
- return blocksMap.addBlockCollection(block, bc);
+ // TODO
+ return (BlockInfoContiguous) blocksMap.addBlockCollection(block, bc);
}
public BlockCollection getBlockCollection(Block b) {
@@ -3640,7 +3664,7 @@ public class BlockManager {
/**
* A simple result enum for the result of
- * {@link BlockManager#processMisReplicatedBlock(BlockInfoContiguous)}.
+ * {@link BlockManager#processMisReplicatedBlock}.
*/
enum MisReplicationResult {
/** The block should be invalidated since it belongs to a deleted file. */
http://git-wip-us.apache.org/repos/asf/hadoop/blob/10488c9c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java
index 806a4cb..d383de8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java
@@ -20,12 +20,10 @@ package org.apache.hadoop.hdfs.server.blockmanagement;
import java.util.Iterator;
import org.apache.hadoop.hdfs.protocol.Block;
-import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo.AddBlockResult;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.util.GSet;
import org.apache.hadoop.util.LightWeightGSet;
-import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.collect.Iterables;
@@ -36,10 +34,10 @@ import com.google.common.collect.Iterables;
*/
class BlocksMap {
private static class StorageIterator implements Iterator<DatanodeStorageInfo> {
- private final BlockInfoContiguous blockInfo;
+ private final BlockInfo blockInfo;
private int nextIdx = 0;
- StorageIterator(BlockInfoContiguous blkInfo) {
+ StorageIterator(BlockInfo blkInfo) {
this.blockInfo = blkInfo;
}
@@ -63,14 +61,14 @@ class BlocksMap {
/** Constant {@link LightWeightGSet} capacity. */
private final int capacity;
- private GSet<Block, BlockInfoContiguous> blocks;
+ private GSet<Block, BlockInfo> blocks;
BlocksMap(int capacity) {
// Use 2% of total memory to size the GSet capacity
this.capacity = capacity;
- this.blocks = new LightWeightGSet<Block, BlockInfoContiguous>(capacity) {
+ this.blocks = new LightWeightGSet<Block, BlockInfo>(capacity) {
@Override
- public Iterator<BlockInfoContiguous> iterator() {
+ public Iterator<BlockInfo> iterator() {
SetIterator iterator = new SetIterator();
/*
* Not tracking any modifications to set. As this set will be used
@@ -97,15 +95,15 @@ class BlocksMap {
}
BlockCollection getBlockCollection(Block b) {
- BlockInfoContiguous info = blocks.get(b);
+ BlockInfo info = blocks.get(b);
return (info != null) ? info.getBlockCollection() : null;
}
/**
* Add block b belonging to the specified block collection to the map.
*/
- BlockInfoContiguous addBlockCollection(BlockInfoContiguous b, BlockCollection bc) {
- BlockInfoContiguous info = blocks.get(b);
+ BlockInfo addBlockCollection(BlockInfo b, BlockCollection bc) {
+ BlockInfo info = blocks.get(b);
if (info != b) {
info = b;
blocks.put(info);
@@ -120,11 +118,12 @@ class BlocksMap {
* and remove all data-node locations associated with the block.
*/
void removeBlock(Block block) {
- BlockInfoContiguous blockInfo = blocks.remove(block);
+ BlockInfo blockInfo = blocks.remove(block);
if (blockInfo == null)
return;
blockInfo.setBlockCollection(null);
+ // TODO: fix this logic for block group
for(int idx = blockInfo.numNodes()-1; idx >= 0; idx--) {
DatanodeDescriptor dn = blockInfo.getDatanode(idx);
dn.removeBlock(blockInfo); // remove from the list and wipe the location
@@ -132,7 +131,7 @@ class BlocksMap {
}
/** Returns the block object it it exists in the map. */
- BlockInfoContiguous getStoredBlock(Block b) {
+ BlockInfo getStoredBlock(Block b) {
return blocks.get(b);
}
@@ -164,7 +163,7 @@ class BlocksMap {
* For a block that has already been retrieved from the BlocksMap
* returns {@link Iterable} of the storages the block belongs to.
*/
- Iterable<DatanodeStorageInfo> getStorages(final BlockInfoContiguous storedBlock) {
+ Iterable<DatanodeStorageInfo> getStorages(final BlockInfo storedBlock) {
return new Iterable<DatanodeStorageInfo>() {
@Override
public Iterator<DatanodeStorageInfo> iterator() {
@@ -175,7 +174,7 @@ class BlocksMap {
/** counts number of containing nodes. Better than using iterator. */
int numNodes(Block b) {
- BlockInfoContiguous info = blocks.get(b);
+ BlockInfo info = blocks.get(b);
return info == null ? 0 : info.numNodes();
}
@@ -185,7 +184,7 @@ class BlocksMap {
* only if it does not belong to any file and data-nodes.
*/
boolean removeNode(Block b, DatanodeDescriptor node) {
- BlockInfoContiguous info = blocks.get(b);
+ BlockInfo info = blocks.get(b);
if (info == null)
return false;
@@ -203,7 +202,7 @@ class BlocksMap {
return blocks.size();
}
- Iterable<BlockInfoContiguous> getBlocks() {
+ Iterable<BlockInfo> getBlocks() {
return blocks;
}
@@ -218,20 +217,11 @@ class BlocksMap {
* @param newBlock - block for replacement
* @return new block
*/
- BlockInfoContiguous replaceBlock(BlockInfoContiguous newBlock) {
- BlockInfoContiguous currentBlock = blocks.get(newBlock);
+ BlockInfo replaceBlock(BlockInfo newBlock) {
+ BlockInfo currentBlock = blocks.get(newBlock);
assert currentBlock != null : "the block if not in blocksMap";
// replace block in data-node lists
- for (int i = currentBlock.numNodes() - 1; i >= 0; i--) {
- final DatanodeDescriptor dn = currentBlock.getDatanode(i);
- final DatanodeStorageInfo storage = currentBlock.findStorageInfo(dn);
- final boolean removed = storage.removeBlock(currentBlock);
- Preconditions.checkState(removed, "currentBlock not found.");
-
- final AddBlockResult result = storage.addBlock(newBlock);
- Preconditions.checkState(result == AddBlockResult.ADDED,
- "newBlock already exists.");
- }
+ currentBlock.replaceBlock(newBlock);
// replace block in the map itself
blocks.put(newBlock);
return newBlock;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/10488c9c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java
index bf5ece9..79d7713 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java
@@ -513,8 +513,7 @@ public class CacheReplicationMonitor extends Thread implements Closeable {
iter.remove();
}
}
- BlockInfoContiguous blockInfo = blockManager.
- getStoredBlock(new Block(cblock.getBlockId()));
+ BlockInfoContiguous blockInfo = namesystem.getStoredBlock(new Block(cblock.getBlockId()));
String reason = findReasonForNotCaching(cblock, blockInfo);
int neededCached = 0;
if (reason != null) {
@@ -628,8 +627,7 @@ public class CacheReplicationMonitor extends Thread implements Closeable {
List<DatanodeDescriptor> pendingCached) {
// To figure out which replicas can be cached, we consult the
// blocksMap. We don't want to try to cache a corrupt replica, though.
- BlockInfoContiguous blockInfo = blockManager.
- getStoredBlock(new Block(cachedBlock.getBlockId()));
+ BlockInfoContiguous blockInfo = namesystem.getStoredBlock(new Block(cachedBlock.getBlockId()));
if (blockInfo == null) {
LOG.debug("Block {}: can't add new cached replicas," +
" because there is no record of this block " +
@@ -668,7 +666,7 @@ public class CacheReplicationMonitor extends Thread implements Closeable {
while (it.hasNext()) {
CachedBlock cBlock = it.next();
BlockInfoContiguous info =
- blockManager.getStoredBlock(new Block(cBlock.getBlockId()));
+ namesystem.getStoredBlock(new Block(cBlock.getBlockId()));
if (info != null) {
pendingBytes -= info.getNumBytes();
}
@@ -678,7 +676,7 @@ public class CacheReplicationMonitor extends Thread implements Closeable {
while (it.hasNext()) {
CachedBlock cBlock = it.next();
BlockInfoContiguous info =
- blockManager.getStoredBlock(new Block(cBlock.getBlockId()));
+ namesystem.getStoredBlock(new Block(cBlock.getBlockId()));
if (info != null) {
pendingBytes += info.getNumBytes();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/10488c9c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
index c0a17b1..a54d46b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
@@ -286,7 +286,7 @@ public class DatanodeDescriptor extends DatanodeInfo {
* Remove block from the list of blocks belonging to the data-node. Remove
* data-node from the block.
*/
- boolean removeBlock(BlockInfoContiguous b) {
+ boolean removeBlock(BlockInfo b) {
final DatanodeStorageInfo s = b.findStorageInfo(this);
// if block exists on this datanode
if (s != null) {
@@ -299,12 +299,9 @@ public class DatanodeDescriptor extends DatanodeInfo {
* Remove block from the list of blocks belonging to the data-node. Remove
* data-node from the block.
*/
- boolean removeBlock(String storageID, BlockInfoContiguous b) {
+ boolean removeBlock(String storageID, BlockInfo b) {
DatanodeStorageInfo s = getStorageInfo(storageID);
- if (s != null) {
- return s.removeBlock(b);
- }
- return false;
+ return s != null && s.removeBlock(b);
}
public void resetBlocks() {
@@ -482,12 +479,12 @@ public class DatanodeDescriptor extends DatanodeInfo {
}
}
- private static class BlockIterator implements Iterator<BlockInfoContiguous> {
+ private static class BlockIterator implements Iterator<BlockInfo> {
private int index = 0;
- private final List<Iterator<BlockInfoContiguous>> iterators;
+ private final List<Iterator<BlockInfo>> iterators;
private BlockIterator(final DatanodeStorageInfo... storages) {
- List<Iterator<BlockInfoContiguous>> iterators = new ArrayList<Iterator<BlockInfoContiguous>>();
+ List<Iterator<BlockInfo>> iterators = new ArrayList<>();
for (DatanodeStorageInfo e : storages) {
iterators.add(e.getBlockIterator());
}
@@ -501,7 +498,7 @@ public class DatanodeDescriptor extends DatanodeInfo {
}
@Override
- public BlockInfoContiguous next() {
+ public BlockInfo next() {
update();
return iterators.get(index).next();
}
@@ -518,10 +515,11 @@ public class DatanodeDescriptor extends DatanodeInfo {
}
}
- Iterator<BlockInfoContiguous> getBlockIterator() {
+ Iterator<BlockInfo> getBlockIterator() {
return new BlockIterator(getStorageInfos());
}
- Iterator<BlockInfoContiguous> getBlockIterator(final String storageID) {
+
+ Iterator<BlockInfo> getBlockIterator(final String storageID) {
return new BlockIterator(getStorageInfo(storageID));
}