You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by zh...@apache.org on 2015/03/16 21:18:47 UTC

[40/50] [abbrv] hadoop git commit: HDFS-7716. Erasure Coding: extend BlockInfo to handle EC info. Contributed by Jing Zhao.

HDFS-7716. Erasure Coding: extend BlockInfo to handle EC info. Contributed by Jing Zhao.

Conflicts:
	hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/10488c9c
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/10488c9c
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/10488c9c

Branch: refs/heads/HDFS-7285
Commit: 10488c9c27e03ea6692e146319dec10dbc0b4fa9
Parents: c054b2a
Author: Jing Zhao <ji...@apache.org>
Authored: Tue Feb 10 17:54:10 2015 -0800
Committer: Zhe Zhang <zh...@apache.org>
Committed: Mon Mar 16 13:09:27 2015 -0700

----------------------------------------------------------------------
 .../hadoop/hdfs/protocol/HdfsConstants.java     |   1 +
 .../server/blockmanagement/BlockCollection.java |  13 +-
 .../server/blockmanagement/BlockIdManager.java  |   7 +-
 .../hdfs/server/blockmanagement/BlockInfo.java  | 339 +++++++++++++++++
 .../blockmanagement/BlockInfoContiguous.java    | 363 +++----------------
 .../BlockInfoContiguousUnderConstruction.java   | 140 +------
 .../blockmanagement/BlockInfoStriped.java       | 179 +++++++++
 .../server/blockmanagement/BlockManager.java    | 188 +++++-----
 .../hdfs/server/blockmanagement/BlocksMap.java  |  46 +--
 .../CacheReplicationMonitor.java                |  10 +-
 .../blockmanagement/DatanodeDescriptor.java     |  22 +-
 .../blockmanagement/DatanodeStorageInfo.java    |  38 +-
 .../ReplicaUnderConstruction.java               | 119 ++++++
 .../hdfs/server/namenode/FSDirectory.java       |   4 +-
 .../hdfs/server/namenode/FSNamesystem.java      |  24 +-
 .../hdfs/server/namenode/NamenodeFsck.java      |   3 +-
 .../snapshot/FSImageFormatPBSnapshot.java       |   4 +-
 .../org/apache/hadoop/hdfs/DFSTestUtil.java     |   4 +-
 .../server/blockmanagement/TestBlockInfo.java   |   6 +-
 .../blockmanagement/TestBlockInfoStriped.java   | 219 +++++++++++
 .../blockmanagement/TestBlockManager.java       |   4 +-
 .../blockmanagement/TestReplicationPolicy.java  |   2 +-
 22 files changed, 1127 insertions(+), 608 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/10488c9c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java
index de60b6e..245b630 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java
@@ -184,5 +184,6 @@ public class HdfsConstants {
 
   public static final byte NUM_DATA_BLOCKS = 3;
   public static final byte NUM_PARITY_BLOCKS = 2;
+  public static final long BLOCK_GROUP_INDEX_MASK = 15;
   public static final byte MAX_BLOCKS_IN_GROUP = 16;
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/10488c9c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockCollection.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockCollection.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockCollection.java
index 1547611..974cac3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockCollection.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockCollection.java
@@ -39,12 +39,12 @@ public interface BlockCollection {
   public ContentSummary computeContentSummary();
 
   /**
-   * @return the number of blocks
+   * @return the number of blocks or block groups
    */ 
   public int numBlocks();
 
   /**
-   * Get the blocks.
+   * Get the blocks or block groups.
    */
   public BlockInfoContiguous[] getBlocks();
 
@@ -55,8 +55,8 @@ public interface BlockCollection {
   public long getPreferredBlockSize();
 
   /**
-   * Get block replication for the collection 
-   * @return block replication value
+   * Get block replication for the collection.
+   * @return block replication value. Return 0 if the file is erasure coded.
    */
   public short getBlockReplication();
 
@@ -71,7 +71,7 @@ public interface BlockCollection {
   public String getName();
 
   /**
-   * Set the block at the given index.
+   * Set the block/block-group at the given index.
    */
   public void setBlock(int index, BlockInfoContiguous blk);
 
@@ -79,7 +79,8 @@ public interface BlockCollection {
    * Convert the last block of the collection to an under-construction block
    * and set the locations.
    */
-  public BlockInfoContiguousUnderConstruction setLastBlock(BlockInfoContiguous lastBlock,
+  public BlockInfoContiguousUnderConstruction setLastBlock(
+      BlockInfoContiguous lastBlock,
       DatanodeStorageInfo[] targets) throws IOException;
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/10488c9c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockIdManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockIdManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockIdManager.java
index e7f8a05..3ae54ce 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockIdManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockIdManager.java
@@ -217,6 +217,11 @@ public class BlockIdManager {
   }
 
   public static long convertToGroupID(long id) {
-    return id & (~(HdfsConstants.MAX_BLOCKS_IN_GROUP - 1));
+    return id & (~HdfsConstants.BLOCK_GROUP_INDEX_MASK);
+  }
+
+  public static int getBlockIndex(Block reportedBlock) {
+    return (int) (reportedBlock.getBlockId() &
+        HdfsConstants.BLOCK_GROUP_INDEX_MASK);
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/10488c9c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java
new file mode 100644
index 0000000..f19ad32
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java
@@ -0,0 +1,339 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.blockmanagement;
+
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
+import org.apache.hadoop.util.LightWeightGSet;
+
+import java.util.LinkedList;
+
+/**
+ * For a given block (or an erasure coding block group), BlockInfo class
+ * maintains 1) the {@link BlockCollection} it is part of, and 2) datanodes
+ * where the replicas of the block, or blocks belonging to the erasure coding
+ * block group, are stored.
+ */
+public abstract class BlockInfo extends Block
+    implements LightWeightGSet.LinkedElement {
+  private BlockCollection bc;
+
+  /** For implementing {@link LightWeightGSet.LinkedElement} interface */
+  private LightWeightGSet.LinkedElement nextLinkedElement;
+
+  /**
+   * This array contains triplets of references. For each i-th storage, the
+   * block belongs to triplets[3*i] is the reference to the
+   * {@link DatanodeStorageInfo} and triplets[3*i+1] and triplets[3*i+2] are
+   * references to the previous and the next blocks, respectively, in the list
+   * of blocks belonging to this storage.
+   *
+   * Using previous and next in Object triplets is done instead of a
+   * {@link LinkedList} list to efficiently use memory. With LinkedList the cost
+   * per replica is 42 bytes (LinkedList#Entry object per replica) versus 16
+   * bytes using the triplets.
+   */
+  protected Object[] triplets;
+
+  /**
+   * Construct an entry for blocksmap
+   * @param size the block's replication factor, or the total number of blocks
+   *             in the block group
+   */
+  public BlockInfo(short size) {
+    this.triplets = new Object[3 * size];
+    this.bc = null;
+  }
+
+  public BlockInfo(Block blk, short size) {
+    super(blk);
+    this.triplets = new Object[3 * size];
+    this.bc = null;
+  }
+
+  public BlockCollection getBlockCollection() {
+    return bc;
+  }
+
+  public void setBlockCollection(BlockCollection bc) {
+    this.bc = bc;
+  }
+
+  public DatanodeDescriptor getDatanode(int index) {
+    DatanodeStorageInfo storage = getStorageInfo(index);
+    return storage == null ? null : storage.getDatanodeDescriptor();
+  }
+
+  DatanodeStorageInfo getStorageInfo(int index) {
+    assert this.triplets != null : "BlockInfo is not initialized";
+    assert index >= 0 && index*3 < triplets.length : "Index is out of bound";
+    return (DatanodeStorageInfo)triplets[index*3];
+  }
+
+  BlockInfo getPrevious(int index) {
+    assert this.triplets != null : "BlockInfo is not initialized";
+    assert index >= 0 && index*3+1 < triplets.length : "Index is out of bound";
+    return (BlockInfo) triplets[index*3+1];
+  }
+
+  BlockInfo getNext(int index) {
+    assert this.triplets != null : "BlockInfo is not initialized";
+    assert index >= 0 && index*3+2 < triplets.length : "Index is out of bound";
+    return (BlockInfo) triplets[index*3+2];
+  }
+
+  void setStorageInfo(int index, DatanodeStorageInfo storage) {
+    assert this.triplets != null : "BlockInfo is not initialized";
+    assert index >= 0 && index*3 < triplets.length : "Index is out of bound";
+    triplets[index*3] = storage;
+  }
+
+  /**
+   * Return the previous block on the block list for the datanode at
+   * position index. Set the previous block on the list to "to".
+   *
+   * @param index - the datanode index
+   * @param to - block to be set to previous on the list of blocks
+   * @return current previous block on the list of blocks
+   */
+  BlockInfo setPrevious(int index, BlockInfo to) {
+    assert this.triplets != null : "BlockInfo is not initialized";
+    assert index >= 0 && index*3+1 < triplets.length : "Index is out of bound";
+    BlockInfo info = (BlockInfo) triplets[index*3+1];
+    triplets[index*3+1] = to;
+    return info;
+  }
+
+  /**
+   * Return the next block on the block list for the datanode at
+   * position index. Set the next block on the list to "to".
+   *
+   * @param index - the datanode index
+   * @param to - block to be set to next on the list of blocks
+   * @return current next block on the list of blocks
+   */
+  BlockInfo setNext(int index, BlockInfo to) {
+    assert this.triplets != null : "BlockInfo is not initialized";
+    assert index >= 0 && index*3+2 < triplets.length : "Index is out of bound";
+    BlockInfo info = (BlockInfo) triplets[index*3+2];
+    triplets[index*3+2] = to;
+    return info;
+  }
+
+  public int getCapacity() {
+    assert this.triplets != null : "BlockInfo is not initialized";
+    assert triplets.length % 3 == 0 : "Malformed BlockInfo";
+    return triplets.length / 3;
+  }
+
+  /**
+   * Count the number of data-nodes the block currently belongs to (i.e., NN
+   * has received block reports from the DN).
+   */
+  public abstract int numNodes();
+
+  /**
+   * Add a {@link DatanodeStorageInfo} location for a block
+   * @param storage The storage to add
+   * @param reportedBlock The block reported from the datanode. This is only
+   *                      used by erasure coded blocks, this block's id contains
+   *                      information indicating the index of the block in the
+   *                      corresponding block group.
+   */
+  abstract boolean addStorage(DatanodeStorageInfo storage, Block reportedBlock);
+
+  /**
+   * Remove {@link DatanodeStorageInfo} location for a block
+   */
+  abstract boolean removeStorage(DatanodeStorageInfo storage);
+
+  /**
+   * Replace the current BlockInfo with the new one in corresponding
+   * DatanodeStorageInfo's linked list
+   */
+  abstract void replaceBlock(BlockInfo newBlock);
+
+  /**
+   * Find specified DatanodeDescriptor.
+   * @return index or -1 if not found.
+   */
+  boolean findDatanode(DatanodeDescriptor dn) {
+    int len = getCapacity();
+    for (int idx = 0; idx < len; idx++) {
+      DatanodeDescriptor cur = getDatanode(idx);
+      if(cur == dn) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  /**
+   * Find specified DatanodeStorageInfo.
+   * @return DatanodeStorageInfo or null if not found.
+   */
+  DatanodeStorageInfo findStorageInfo(DatanodeDescriptor dn) {
+    int len = getCapacity();
+    for(int idx = 0; idx < len; idx++) {
+      DatanodeStorageInfo cur = getStorageInfo(idx);
+      if(cur != null && cur.getDatanodeDescriptor() == dn) {
+        return cur;
+      }
+    }
+    return null;
+  }
+
+  /**
+   * Find specified DatanodeStorageInfo.
+   * @return index or -1 if not found.
+   */
+  int findStorageInfo(DatanodeStorageInfo storageInfo) {
+    int len = getCapacity();
+    for(int idx = 0; idx < len; idx++) {
+      DatanodeStorageInfo cur = getStorageInfo(idx);
+      if (cur == storageInfo) {
+        return idx;
+      }
+    }
+    return -1;
+  }
+
+  /**
+   * Insert this block into the head of the list of blocks
+   * related to the specified DatanodeStorageInfo.
+   * If the head is null then form a new list.
+   * @return current block as the new head of the list.
+   */
+  BlockInfo listInsert(BlockInfo head, DatanodeStorageInfo storage) {
+    int dnIndex = this.findStorageInfo(storage);
+    assert dnIndex >= 0 : "Data node is not found: current";
+    assert getPrevious(dnIndex) == null && getNext(dnIndex) == null :
+        "Block is already in the list and cannot be inserted.";
+    this.setPrevious(dnIndex, null);
+    this.setNext(dnIndex, head);
+    if (head != null) {
+      head.setPrevious(head.findStorageInfo(storage), this);
+    }
+    return this;
+  }
+
+  /**
+   * Remove this block from the list of blocks
+   * related to the specified DatanodeStorageInfo.
+   * If this block is the head of the list then return the next block as
+   * the new head.
+   * @return the new head of the list or null if the list becomes
+   * empy after deletion.
+   */
+  BlockInfo listRemove(BlockInfo head, DatanodeStorageInfo storage) {
+    if (head == null) {
+      return null;
+    }
+    int dnIndex = this.findStorageInfo(storage);
+    if (dnIndex < 0) { // this block is not on the data-node list
+      return head;
+    }
+
+    BlockInfo next = this.getNext(dnIndex);
+    BlockInfo prev = this.getPrevious(dnIndex);
+    this.setNext(dnIndex, null);
+    this.setPrevious(dnIndex, null);
+    if (prev != null) {
+      prev.setNext(prev.findStorageInfo(storage), next);
+    }
+    if (next != null) {
+      next.setPrevious(next.findStorageInfo(storage), prev);
+    }
+    if (this == head) { // removing the head
+      head = next;
+    }
+    return head;
+  }
+
+  /**
+   * Remove this block from the list of blocks related to the specified
+   * DatanodeDescriptor. Insert it into the head of the list of blocks.
+   *
+   * @return the new head of the list.
+   */
+  public BlockInfo moveBlockToHead(BlockInfo head, DatanodeStorageInfo storage,
+      int curIndex, int headIndex) {
+    if (head == this) {
+      return this;
+    }
+    BlockInfo next = this.setNext(curIndex, head);
+    BlockInfo prev = this.setPrevious(curIndex, null);
+
+    head.setPrevious(headIndex, this);
+    prev.setNext(prev.findStorageInfo(storage), next);
+    if (next != null) {
+      next.setPrevious(next.findStorageInfo(storage), prev);
+    }
+    return this;
+  }
+
+  /**
+   * BlockInfo represents a block that is not being constructed.
+   * In order to start modifying the block, the BlockInfo should be converted
+   * to {@link BlockInfoContiguousUnderConstruction}.
+   * @return {@link HdfsServerConstants.BlockUCState#COMPLETE}
+   */
+  public HdfsServerConstants.BlockUCState getBlockUCState() {
+    return HdfsServerConstants.BlockUCState.COMPLETE;
+  }
+
+  /**
+   * Is this block complete?
+   *
+   * @return true if the state of the block is
+   *         {@link HdfsServerConstants.BlockUCState#COMPLETE}
+   */
+  public boolean isComplete() {
+    return getBlockUCState().equals(HdfsServerConstants.BlockUCState.COMPLETE);
+  }
+
+  @Override
+  public int hashCode() {
+    // Super implementation is sufficient
+    return super.hashCode();
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    // Sufficient to rely on super's implementation
+    return (this == obj) || super.equals(obj);
+  }
+
+  @Override
+  public LightWeightGSet.LinkedElement getNext() {
+    return nextLinkedElement;
+  }
+
+  @Override
+  public void setNext(LightWeightGSet.LinkedElement next) {
+    this.nextLinkedElement = next;
+  }
+
+  static BlockInfo copyOf(BlockInfo b) {
+    if (b instanceof BlockInfoContiguous) {
+      return new BlockInfoContiguous((BlockInfoContiguous) b);
+    } else {
+      return new BlockInfoStriped((BlockInfoStriped) b);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/10488c9c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguous.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguous.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguous.java
index 48069c1..e54cba3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguous.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguous.java
@@ -17,148 +17,33 @@
  */
 package org.apache.hadoop.hdfs.server.blockmanagement;
 
-import java.util.LinkedList;
-
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
-import org.apache.hadoop.util.LightWeightGSet;
 
 /**
- * BlockInfo class maintains for a given block
- * the {@link BlockCollection} it is part of and datanodes where the replicas of 
- * the block are stored.
+ * Subclass of {@link BlockInfo}, used for a block with replication scheme.
  */
 @InterfaceAudience.Private
-public class BlockInfoContiguous extends Block
-    implements LightWeightGSet.LinkedElement {
+public class BlockInfoContiguous extends BlockInfo {
   public static final BlockInfoContiguous[] EMPTY_ARRAY = {};
 
-  private BlockCollection bc;
-
-  /** For implementing {@link LightWeightGSet.LinkedElement} interface */
-  private LightWeightGSet.LinkedElement nextLinkedElement;
-
-  /**
-   * This array contains triplets of references. For each i-th storage, the
-   * block belongs to triplets[3*i] is the reference to the
-   * {@link DatanodeStorageInfo} and triplets[3*i+1] and triplets[3*i+2] are
-   * references to the previous and the next blocks, respectively, in the list
-   * of blocks belonging to this storage.
-   * 
-   * Using previous and next in Object triplets is done instead of a
-   * {@link LinkedList} list to efficiently use memory. With LinkedList the cost
-   * per replica is 42 bytes (LinkedList#Entry object per replica) versus 16
-   * bytes using the triplets.
-   */
-  private Object[] triplets;
-
-  /**
-   * Construct an entry for blocksmap
-   * @param replication the block's replication factor
-   */
-  public BlockInfoContiguous(short replication) {
-    this.triplets = new Object[3*replication];
-    this.bc = null;
+  public BlockInfoContiguous(short size) {
+    super(size);
   }
-  
-  public BlockInfoContiguous(Block blk, short replication) {
-    super(blk);
-    this.triplets = new Object[3*replication];
-    this.bc = null;
+
+  public BlockInfoContiguous(Block blk, short size) {
+    super(blk, size);
   }
 
   /**
    * Copy construction.
-   * This is used to convert BlockInfoUnderConstruction
-   * @param from BlockInfo to copy from.
+   * This is used to convert BlockReplicationInfoUnderConstruction
+   * @param from BlockReplicationInfo to copy from.
    */
   protected BlockInfoContiguous(BlockInfoContiguous from) {
-    this(from, from.bc.getBlockReplication());
-    this.bc = from.bc;
-  }
-
-  public BlockCollection getBlockCollection() {
-    return bc;
-  }
-
-  public void setBlockCollection(BlockCollection bc) {
-    this.bc = bc;
-  }
-
-  public DatanodeDescriptor getDatanode(int index) {
-    DatanodeStorageInfo storage = getStorageInfo(index);
-    return storage == null ? null : storage.getDatanodeDescriptor();
-  }
-
-  DatanodeStorageInfo getStorageInfo(int index) {
-    assert this.triplets != null : "BlockInfo is not initialized";
-    assert index >= 0 && index*3 < triplets.length : "Index is out of bound";
-    return (DatanodeStorageInfo)triplets[index*3];
-  }
-
-  private BlockInfoContiguous getPrevious(int index) {
-    assert this.triplets != null : "BlockInfo is not initialized";
-    assert index >= 0 && index*3+1 < triplets.length : "Index is out of bound";
-    BlockInfoContiguous info = (BlockInfoContiguous)triplets[index*3+1];
-    assert info == null || 
-        info.getClass().getName().startsWith(BlockInfoContiguous.class.getName()) :
-              "BlockInfo is expected at " + index*3;
-    return info;
-  }
-
-  BlockInfoContiguous getNext(int index) {
-    assert this.triplets != null : "BlockInfo is not initialized";
-    assert index >= 0 && index*3+2 < triplets.length : "Index is out of bound";
-    BlockInfoContiguous info = (BlockInfoContiguous)triplets[index*3+2];
-    assert info == null || info.getClass().getName().startsWith(
-        BlockInfoContiguous.class.getName()) :
-        "BlockInfo is expected at " + index*3;
-    return info;
-  }
-
-  private void setStorageInfo(int index, DatanodeStorageInfo storage) {
-    assert this.triplets != null : "BlockInfo is not initialized";
-    assert index >= 0 && index*3 < triplets.length : "Index is out of bound";
-    triplets[index*3] = storage;
-  }
-
-  /**
-   * Return the previous block on the block list for the datanode at
-   * position index. Set the previous block on the list to "to".
-   *
-   * @param index - the datanode index
-   * @param to - block to be set to previous on the list of blocks
-   * @return current previous block on the list of blocks
-   */
-  private BlockInfoContiguous setPrevious(int index, BlockInfoContiguous to) {
-    assert this.triplets != null : "BlockInfo is not initialized";
-    assert index >= 0 && index*3+1 < triplets.length : "Index is out of bound";
-    BlockInfoContiguous info = (BlockInfoContiguous)triplets[index*3+1];
-    triplets[index*3+1] = to;
-    return info;
-  }
-
-  /**
-   * Return the next block on the block list for the datanode at
-   * position index. Set the next block on the list to "to".
-   *
-   * @param index - the datanode index
-   * @param to - block to be set to next on the list of blocks
-   *    * @return current next block on the list of blocks
-   */
-  private BlockInfoContiguous setNext(int index, BlockInfoContiguous to) {
-    assert this.triplets != null : "BlockInfo is not initialized";
-    assert index >= 0 && index*3+2 < triplets.length : "Index is out of bound";
-    BlockInfoContiguous info = (BlockInfoContiguous)triplets[index*3+2];
-    triplets[index*3+2] = to;
-    return info;
-  }
-
-  public int getCapacity() {
-    assert this.triplets != null : "BlockInfo is not initialized";
-    assert triplets.length % 3 == 0 : "Malformed BlockInfo";
-    return triplets.length / 3;
+    this(from, from.getBlockCollection().getBlockReplication());
+    this.setBlockCollection(from.getBlockCollection());
   }
 
   /**
@@ -168,9 +53,10 @@ public class BlockInfoContiguous extends Block
   private int ensureCapacity(int num) {
     assert this.triplets != null : "BlockInfo is not initialized";
     int last = numNodes();
-    if(triplets.length >= (last+num)*3)
+    if (triplets.length >= (last+num)*3) {
       return last;
-    /* Not enough space left. Create a new array. Should normally 
+    }
+    /* Not enough space left. Create a new array. Should normally
      * happen only when replication is manually increased by the user. */
     Object[] old = triplets;
     triplets = new Object[(last+num)*3];
@@ -178,23 +64,8 @@ public class BlockInfoContiguous extends Block
     return last;
   }
 
-  /**
-   * Count the number of data-nodes the block belongs to.
-   */
-  public int numNodes() {
-    assert this.triplets != null : "BlockInfo is not initialized";
-    assert triplets.length % 3 == 0 : "Malformed BlockInfo";
-    for(int idx = getCapacity()-1; idx >= 0; idx--) {
-      if(getDatanode(idx) != null)
-        return idx+1;
-    }
-    return 0;
-  }
-
-  /**
-   * Add a {@link DatanodeStorageInfo} location for a block
-   */
-  boolean addStorage(DatanodeStorageInfo storage) {
+  @Override
+  boolean addStorage(DatanodeStorageInfo storage, Block reportedBlock) {
     // find the last null node
     int lastNode = ensureCapacity(1);
     setStorageInfo(lastNode, storage);
@@ -203,167 +74,53 @@ public class BlockInfoContiguous extends Block
     return true;
   }
 
-  /**
-   * Remove {@link DatanodeStorageInfo} location for a block
-   */
+  @Override
   boolean removeStorage(DatanodeStorageInfo storage) {
     int dnIndex = findStorageInfo(storage);
-    if(dnIndex < 0) // the node is not found
+    if (dnIndex < 0) { // the node is not found
       return false;
-    assert getPrevious(dnIndex) == null && getNext(dnIndex) == null : 
-      "Block is still in the list and must be removed first.";
+    }
+    assert getPrevious(dnIndex) == null && getNext(dnIndex) == null :
+        "Block is still in the list and must be removed first.";
     // find the last not null node
-    int lastNode = numNodes()-1; 
-    // replace current node triplet by the lastNode one 
+    int lastNode = numNodes()-1;
+    // replace current node triplet by the lastNode one
     setStorageInfo(dnIndex, getStorageInfo(lastNode));
-    setNext(dnIndex, getNext(lastNode)); 
-    setPrevious(dnIndex, getPrevious(lastNode)); 
+    setNext(dnIndex, getNext(lastNode));
+    setPrevious(dnIndex, getPrevious(lastNode));
     // set the last triplet to null
     setStorageInfo(lastNode, null);
-    setNext(lastNode, null); 
-    setPrevious(lastNode, null); 
+    setNext(lastNode, null);
+    setPrevious(lastNode, null);
     return true;
   }
 
-  /**
-   * Find specified DatanodeDescriptor.
-   * @return index or -1 if not found.
-   */
-  boolean findDatanode(DatanodeDescriptor dn) {
-    int len = getCapacity();
-    for(int idx = 0; idx < len; idx++) {
-      DatanodeDescriptor cur = getDatanode(idx);
-      if(cur == dn) {
-        return true;
-      }
-      if(cur == null) {
-        break;
-      }
-    }
-    return false;
-  }
+  @Override
+  public int numNodes() {
+    assert this.triplets != null : "BlockInfo is not initialized";
+    assert triplets.length % 3 == 0 : "Malformed BlockInfo";
 
-  /**
-   * Find specified DatanodeStorageInfo.
-   * @return DatanodeStorageInfo or null if not found.
-   */
-  DatanodeStorageInfo findStorageInfo(DatanodeDescriptor dn) {
-    int len = getCapacity();
-    for(int idx = 0; idx < len; idx++) {
-      DatanodeStorageInfo cur = getStorageInfo(idx);
-      if(cur == null)
-        break;
-      if(cur.getDatanodeDescriptor() == dn)
-        return cur;
-    }
-    return null;
-  }
-  
-  /**
-   * Find specified DatanodeStorageInfo.
-   * @return index or -1 if not found.
-   */
-  int findStorageInfo(DatanodeStorageInfo storageInfo) {
-    int len = getCapacity();
-    for(int idx = 0; idx < len; idx++) {
-      DatanodeStorageInfo cur = getStorageInfo(idx);
-      if (cur == storageInfo) {
-        return idx;
-      }
-      if (cur == null) {
-        break;
+    for (int idx = getCapacity()-1; idx >= 0; idx--) {
+      if (getDatanode(idx) != null) {
+        return idx + 1;
       }
     }
-    return -1;
-  }
-
-  /**
-   * Insert this block into the head of the list of blocks 
-   * related to the specified DatanodeStorageInfo.
-   * If the head is null then form a new list.
-   * @return current block as the new head of the list.
-   */
-  BlockInfoContiguous listInsert(BlockInfoContiguous head,
-      DatanodeStorageInfo storage) {
-    int dnIndex = this.findStorageInfo(storage);
-    assert dnIndex >= 0 : "Data node is not found: current";
-    assert getPrevious(dnIndex) == null && getNext(dnIndex) == null : 
-            "Block is already in the list and cannot be inserted.";
-    this.setPrevious(dnIndex, null);
-    this.setNext(dnIndex, head);
-    if(head != null)
-      head.setPrevious(head.findStorageInfo(storage), this);
-    return this;
-  }
-
-  /**
-   * Remove this block from the list of blocks 
-   * related to the specified DatanodeStorageInfo.
-   * If this block is the head of the list then return the next block as 
-   * the new head.
-   * @return the new head of the list or null if the list becomes
-   * empy after deletion.
-   */
-  BlockInfoContiguous listRemove(BlockInfoContiguous head,
-      DatanodeStorageInfo storage) {
-    if(head == null)
-      return null;
-    int dnIndex = this.findStorageInfo(storage);
-    if(dnIndex < 0) // this block is not on the data-node list
-      return head;
-
-    BlockInfoContiguous next = this.getNext(dnIndex);
-    BlockInfoContiguous prev = this.getPrevious(dnIndex);
-    this.setNext(dnIndex, null);
-    this.setPrevious(dnIndex, null);
-    if(prev != null)
-      prev.setNext(prev.findStorageInfo(storage), next);
-    if(next != null)
-      next.setPrevious(next.findStorageInfo(storage), prev);
-    if(this == head)  // removing the head
-      head = next;
-    return head;
+    return 0;
   }
 
-  /**
-   * Remove this block from the list of blocks related to the specified
-   * DatanodeDescriptor. Insert it into the head of the list of blocks.
-   *
-   * @return the new head of the list.
-   */
-  public BlockInfoContiguous moveBlockToHead(BlockInfoContiguous head,
-      DatanodeStorageInfo storage, int curIndex, int headIndex) {
-    if (head == this) {
-      return this;
-    }
-    BlockInfoContiguous next = this.setNext(curIndex, head);
-    BlockInfoContiguous prev = this.setPrevious(curIndex, null);
-
-    head.setPrevious(headIndex, this);
-    prev.setNext(prev.findStorageInfo(storage), next);
-    if (next != null) {
-      next.setPrevious(next.findStorageInfo(storage), prev);
+  @Override
+  void replaceBlock(BlockInfo newBlock) {
+    assert newBlock instanceof BlockInfoContiguous;
+    for (int i = this.numNodes() - 1; i >= 0; i--) {
+      final DatanodeStorageInfo storage = this.getStorageInfo(i);
+      final boolean removed = storage.removeBlock(this);
+      assert removed : "currentBlock not found.";
+
+      final DatanodeStorageInfo.AddBlockResult result = storage.addBlock(
+          newBlock, newBlock);
+      assert result == DatanodeStorageInfo.AddBlockResult.ADDED :
+          "newBlock already exists.";
     }
-    return this;
-  }
-
-  /**
-   * BlockInfo represents a block that is not being constructed.
-   * In order to start modifying the block, the BlockInfo should be converted
-   * to {@link BlockInfoContiguousUnderConstruction}.
-   * @return {@link BlockUCState#COMPLETE}
-   */
-  public BlockUCState getBlockUCState() {
-    return BlockUCState.COMPLETE;
-  }
-
-  /**
-   * Is this block complete?
-   * 
-   * @return true if the state of the block is {@link BlockUCState#COMPLETE}
-   */
-  public boolean isComplete() {
-    return getBlockUCState().equals(BlockUCState.COMPLETE);
   }
 
   /**
@@ -375,38 +132,16 @@ public class BlockInfoContiguous extends Block
     if(isComplete()) {
       BlockInfoContiguousUnderConstruction ucBlock =
           new BlockInfoContiguousUnderConstruction(this,
-          getBlockCollection().getBlockReplication(), s, targets);
+              getBlockCollection().getBlockReplication(), s, targets);
       ucBlock.setBlockCollection(getBlockCollection());
       return ucBlock;
     }
     // the block is already under construction
     BlockInfoContiguousUnderConstruction ucBlock =
-        (BlockInfoContiguousUnderConstruction)this;
+        (BlockInfoContiguousUnderConstruction) this;
     ucBlock.setBlockUCState(s);
     ucBlock.setExpectedLocations(targets);
     ucBlock.setBlockCollection(getBlockCollection());
     return ucBlock;
   }
-
-  @Override
-  public int hashCode() {
-    // Super implementation is sufficient
-    return super.hashCode();
-  }
-
-  @Override
-  public boolean equals(Object obj) {
-    // Sufficient to rely on super's implementation
-    return (this == obj) || super.equals(obj);
-  }
-
-  @Override
-  public LightWeightGSet.LinkedElement getNext() {
-    return nextLinkedElement;
-  }
-
-  @Override
-  public void setNext(LightWeightGSet.LinkedElement next) {
-    this.nextLinkedElement = next;
-  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/10488c9c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguousUnderConstruction.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguousUnderConstruction.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguousUnderConstruction.java
index ae809a5..e75c5d7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguousUnderConstruction.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguousUnderConstruction.java
@@ -60,101 +60,6 @@ public class BlockInfoContiguousUnderConstruction extends BlockInfoContiguous {
   private Block truncateBlock;
 
   /**
-   * ReplicaUnderConstruction contains information about replicas while
-   * they are under construction.
-   * The GS, the length and the state of the replica is as reported by 
-   * the data-node.
-   * It is not guaranteed, but expected, that data-nodes actually have
-   * corresponding replicas.
-   */
-  static class ReplicaUnderConstruction extends Block {
-    private final DatanodeStorageInfo expectedLocation;
-    private ReplicaState state;
-    private boolean chosenAsPrimary;
-
-    ReplicaUnderConstruction(Block block,
-                             DatanodeStorageInfo target,
-                             ReplicaState state) {
-      super(block);
-      this.expectedLocation = target;
-      this.state = state;
-      this.chosenAsPrimary = false;
-    }
-
-    /**
-     * Expected block replica location as assigned when the block was allocated.
-     * This defines the pipeline order.
-     * It is not guaranteed, but expected, that the data-node actually has
-     * the replica.
-     */
-    private DatanodeStorageInfo getExpectedStorageLocation() {
-      return expectedLocation;
-    }
-
-    /**
-     * Get replica state as reported by the data-node.
-     */
-    ReplicaState getState() {
-      return state;
-    }
-
-    /**
-     * Whether the replica was chosen for recovery.
-     */
-    boolean getChosenAsPrimary() {
-      return chosenAsPrimary;
-    }
-
-    /**
-     * Set replica state.
-     */
-    void setState(ReplicaState s) {
-      state = s;
-    }
-
-    /**
-     * Set whether this replica was chosen for recovery.
-     */
-    void setChosenAsPrimary(boolean chosenAsPrimary) {
-      this.chosenAsPrimary = chosenAsPrimary;
-    }
-
-    /**
-     * Is data-node the replica belongs to alive.
-     */
-    boolean isAlive() {
-      return expectedLocation.getDatanodeDescriptor().isAlive;
-    }
-
-    @Override // Block
-    public int hashCode() {
-      return super.hashCode();
-    }
-
-    @Override // Block
-    public boolean equals(Object obj) {
-      // Sufficient to rely on super's implementation
-      return (this == obj) || super.equals(obj);
-    }
-
-    @Override
-    public String toString() {
-      final StringBuilder b = new StringBuilder(50);
-      appendStringTo(b);
-      return b.toString();
-    }
-    
-    @Override
-    public void appendStringTo(StringBuilder sb) {
-      sb.append("ReplicaUC[")
-        .append(expectedLocation)
-        .append("|")
-        .append(state)
-        .append("]");
-    }
-  }
-
-  /**
    * Create block and set its state to
    * {@link BlockUCState#UNDER_CONSTRUCTION}.
    */
@@ -165,7 +70,8 @@ public class BlockInfoContiguousUnderConstruction extends BlockInfoContiguous {
   /**
    * Create a block that is currently being constructed.
    */
-  public BlockInfoContiguousUnderConstruction(Block blk, short replication, BlockUCState state, DatanodeStorageInfo[] targets) {
+  public BlockInfoContiguousUnderConstruction(Block blk, short replication,
+      BlockUCState state, DatanodeStorageInfo[] targets) {
     super(blk, replication);
     assert getBlockUCState() != BlockUCState.COMPLETE :
       "BlockInfoUnderConstruction cannot be in COMPLETE state";
@@ -191,10 +97,11 @@ public class BlockInfoContiguousUnderConstruction extends BlockInfoContiguous {
   /** Set expected locations */
   public void setExpectedLocations(DatanodeStorageInfo[] targets) {
     int numLocations = targets == null ? 0 : targets.length;
-    this.replicas = new ArrayList<ReplicaUnderConstruction>(numLocations);
-    for(int i = 0; i < numLocations; i++)
-      replicas.add(
-        new ReplicaUnderConstruction(this, targets[i], ReplicaState.RBW));
+    this.replicas = new ArrayList<>(numLocations);
+    for(int i = 0; i < numLocations; i++) {
+      replicas.add(new ReplicaUnderConstruction(this, targets[i],
+          ReplicaState.RBW));
+    }
   }
 
   /**
@@ -204,8 +111,9 @@ public class BlockInfoContiguousUnderConstruction extends BlockInfoContiguous {
   public DatanodeStorageInfo[] getExpectedStorageLocations() {
     int numLocations = replicas == null ? 0 : replicas.size();
     DatanodeStorageInfo[] storages = new DatanodeStorageInfo[numLocations];
-    for(int i = 0; i < numLocations; i++)
+    for (int i = 0; i < numLocations; i++) {
       storages[i] = replicas.get(i).getExpectedStorageLocation();
+    }
     return storages;
   }
 
@@ -293,17 +201,17 @@ public class BlockInfoContiguousUnderConstruction extends BlockInfoContiguous {
         + " No blocks found, lease removed.");
     }
     boolean allLiveReplicasTriedAsPrimary = true;
-    for (int i = 0; i < replicas.size(); i++) {
+    for (ReplicaUnderConstruction replica : replicas) {
       // Check if all replicas have been tried or not.
-      if (replicas.get(i).isAlive()) {
-        allLiveReplicasTriedAsPrimary =
-            (allLiveReplicasTriedAsPrimary && replicas.get(i).getChosenAsPrimary());
+      if (replica.isAlive()) {
+        allLiveReplicasTriedAsPrimary = (allLiveReplicasTriedAsPrimary &&
+            replica.getChosenAsPrimary());
       }
     }
     if (allLiveReplicasTriedAsPrimary) {
       // Just set all the replicas to be chosen whether they are alive or not.
-      for (int i = 0; i < replicas.size(); i++) {
-        replicas.get(i).setChosenAsPrimary(false);
+      for (ReplicaUnderConstruction replica : replicas) {
+        replica.setChosenAsPrimary(false);
       }
     }
     long mostRecentLastUpdate = 0;
@@ -315,7 +223,8 @@ public class BlockInfoContiguousUnderConstruction extends BlockInfoContiguous {
         continue;
       }
       final ReplicaUnderConstruction ruc = replicas.get(i);
-      final long lastUpdate = ruc.getExpectedStorageLocation().getDatanodeDescriptor().getLastUpdate(); 
+      final long lastUpdate = ruc.getExpectedStorageLocation()
+          .getDatanodeDescriptor().getLastUpdate();
       if (lastUpdate > mostRecentLastUpdate) {
         primaryNodeIndex = i;
         primary = ruc;
@@ -323,7 +232,8 @@ public class BlockInfoContiguousUnderConstruction extends BlockInfoContiguous {
       }
     }
     if (primary != null) {
-      primary.getExpectedStorageLocation().getDatanodeDescriptor().addBlockToBeRecovered(this);
+      primary.getExpectedStorageLocation().getDatanodeDescriptor()
+          .addBlockToBeRecovered(this);
       primary.setChosenAsPrimary(true);
       NameNode.blockStateChangeLog.info(
           "BLOCK* {} recovery started, primary={}", this, primary);
@@ -356,18 +266,6 @@ public class BlockInfoContiguousUnderConstruction extends BlockInfoContiguous {
     replicas.add(new ReplicaUnderConstruction(block, storage, rState));
   }
 
-  @Override // BlockInfo
-  // BlockInfoUnderConstruction participates in maps the same way as BlockInfo
-  public int hashCode() {
-    return super.hashCode();
-  }
-
-  @Override // BlockInfo
-  public boolean equals(Object obj) {
-    // Sufficient to rely on super's implementation
-    return (this == obj) || super.equals(obj);
-  }
-
   @Override
   public String toString() {
     final StringBuilder b = new StringBuilder(100);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/10488c9c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStriped.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStriped.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStriped.java
new file mode 100644
index 0000000..5fff41e
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStriped.java
@@ -0,0 +1,179 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.blockmanagement;
+
+import org.apache.hadoop.hdfs.protocol.Block;
+
+/**
+ * Subclass of {@link BlockInfo}, presenting a block group in erasure coding.
+ *
+ * We still use triplets to store DatanodeStorageInfo for each block in the
+ * block group, as well as the previous/next block in the corresponding
+ * DatanodeStorageInfo. For a (m+k) block group, the first (m+k) triplet units
+ * are sorted and strictly mapped to the corresponding block.
+ *
+ * Normally each block belonging to group is stored in only one DataNode.
+ * However, it is possible that some block is over-replicated. Thus the triplet
+ * array's size can be larger than (m+k). Thus currently we use an extra byte
+ * array to record the block index for each triplet.
+ */
+public class BlockInfoStriped extends BlockInfo {
+  private final short dataBlockNum;
+  private final short parityBlockNum;
+  /**
+   * Always the same size with triplets. Record the block index for each triplet
+   * TODO: actually this is only necessary for over-replicated block. Thus can
+   * be further optimized to save memory usage.
+   */
+  private byte[] indices;
+
+  public BlockInfoStriped(Block blk, short dataBlockNum, short parityBlockNum) {
+    super(blk, (short) (dataBlockNum + parityBlockNum));
+    indices = new byte[dataBlockNum + parityBlockNum];
+    initIndices();
+    this.dataBlockNum = dataBlockNum;
+    this.parityBlockNum = parityBlockNum;
+  }
+
+  BlockInfoStriped(BlockInfoStriped b) {
+    this(b, b.dataBlockNum, b.parityBlockNum);
+    this.setBlockCollection(b.getBlockCollection());
+  }
+
+  private short getTotalBlockNum() {
+    return (short) (dataBlockNum + parityBlockNum);
+  }
+
+  private void initIndices() {
+    for (int i = 0; i < indices.length; i++) {
+      indices[i] = -1;
+    }
+  }
+
+  private int findSlot() {
+    int i = getTotalBlockNum();
+    for (; i < getCapacity(); i++) {
+      if (getStorageInfo(i) == null) {
+        return i;
+      }
+    }
+    // need to expand the triplet size
+    ensureCapacity(i + 1, true);
+    return i;
+  }
+
+  @Override
+  boolean addStorage(DatanodeStorageInfo storage, Block reportedBlock) {
+    int blockIndex = BlockIdManager.getBlockIndex(reportedBlock);
+    int index = blockIndex;
+    DatanodeStorageInfo old = getStorageInfo(index);
+    if (old != null && !old.equals(storage)) { // over replicated
+      // check if the storage has been stored
+      int i = findStorageInfo(storage);
+      if (i == -1) {
+        index = findSlot();
+      } else {
+        return true;
+      }
+    }
+    addStorage(storage, index, blockIndex);
+    return true;
+  }
+
+  private void addStorage(DatanodeStorageInfo storage, int index,
+      int blockIndex) {
+    setStorageInfo(index, storage);
+    setNext(index, null);
+    setPrevious(index, null);
+    indices[index] = (byte) blockIndex;
+  }
+
+  private int findStorageInfoFromEnd(DatanodeStorageInfo storage) {
+    final int len = getCapacity();
+    for(int idx = len - 1; idx >= 0; idx--) {
+      DatanodeStorageInfo cur = getStorageInfo(idx);
+      if (storage.equals(cur)) {
+        return idx;
+      }
+    }
+    return -1;
+  }
+
+  @Override
+  boolean removeStorage(DatanodeStorageInfo storage) {
+    int dnIndex = findStorageInfoFromEnd(storage);
+    if (dnIndex < 0) { // the node is not found
+      return false;
+    }
+    assert getPrevious(dnIndex) == null && getNext(dnIndex) == null :
+        "Block is still in the list and must be removed first.";
+    // set the triplet to null
+    setStorageInfo(dnIndex, null);
+    setNext(dnIndex, null);
+    setPrevious(dnIndex, null);
+    indices[dnIndex] = -1;
+    return true;
+  }
+
+  private void ensureCapacity(int totalSize, boolean keepOld) {
+    if (getCapacity() < totalSize) {
+      Object[] old = triplets;
+      byte[] oldIndices = indices;
+      triplets = new Object[totalSize * 3];
+      indices = new byte[totalSize];
+      initIndices();
+
+      if (keepOld) {
+        System.arraycopy(old, 0, triplets, 0, old.length);
+        System.arraycopy(oldIndices, 0, indices, 0, oldIndices.length);
+      }
+    }
+  }
+
+  @Override
+  void replaceBlock(BlockInfo newBlock) {
+    assert newBlock instanceof BlockInfoStriped;
+    BlockInfoStriped newBlockGroup = (BlockInfoStriped) newBlock;
+    final int size = getCapacity();
+    newBlockGroup.ensureCapacity(size, false);
+    for (int i = 0; i < size; i++) {
+      final DatanodeStorageInfo storage = this.getStorageInfo(i);
+      if (storage != null) {
+        final int blockIndex = indices[i];
+        final boolean removed = storage.removeBlock(this);
+        assert removed : "currentBlock not found.";
+
+        newBlockGroup.addStorage(storage, i, blockIndex);
+        storage.insertToList(newBlockGroup);
+      }
+    }
+  }
+
+  @Override
+  public int numNodes() {
+    assert this.triplets != null : "BlockInfo is not initialized";
+    assert triplets.length % 3 == 0 : "Malformed BlockInfo";
+    int num = 0;
+    for (int idx = getCapacity()-1; idx >= 0; idx--) {
+      if (getStorageInfo(idx) != null) {
+        num++;
+      }
+    }
+    return num;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/10488c9c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index ec07a5f..d9f2772 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -595,8 +595,8 @@ public class BlockManager {
    * of replicas reported from data-nodes.
    */
   private static boolean commitBlock(
-      final BlockInfoContiguousUnderConstruction block, final Block commitBlock)
-      throws IOException {
+      final BlockInfoContiguousUnderConstruction block,
+      final Block commitBlock) throws IOException {
     if (block.getBlockUCState() == BlockUCState.COMMITTED)
       return false;
     assert block.getNumBytes() <= commitBlock.getNumBytes() :
@@ -627,7 +627,7 @@ public class BlockManager {
       return false; // already completed (e.g. by syncBlock)
     
     final boolean b = commitBlock(
-        (BlockInfoContiguousUnderConstruction) lastBlock, commitBlock);
+        (BlockInfoContiguousUnderConstruction)lastBlock, commitBlock);
     if(countNodes(lastBlock).liveReplicas() >= minReplication)
       completeBlock(bc, bc.numBlocks()-1, false);
     return b;
@@ -640,15 +640,16 @@ public class BlockManager {
    * @throws IOException if the block does not have at least a minimal number
    * of replicas reported from data-nodes.
    */
-  private BlockInfoContiguous completeBlock(final BlockCollection bc,
+  private BlockInfo completeBlock(final BlockCollection bc,
       final int blkIndex, boolean force) throws IOException {
     if(blkIndex < 0)
       return null;
     BlockInfoContiguous curBlock = bc.getBlocks()[blkIndex];
-    if(curBlock.isComplete())
+    if (curBlock.isComplete())
       return curBlock;
+    // TODO: support BlockInfoStripedUC
     BlockInfoContiguousUnderConstruction ucBlock =
-        (BlockInfoContiguousUnderConstruction) curBlock;
+        (BlockInfoContiguousUnderConstruction)curBlock;
     int numNodes = ucBlock.numNodes();
     if (!force && numNodes < minReplication)
       throw new IOException("Cannot complete block: " +
@@ -674,13 +675,15 @@ public class BlockManager {
     return blocksMap.replaceBlock(completeBlock);
   }
 
-  private BlockInfoContiguous completeBlock(final BlockCollection bc,
-      final BlockInfoContiguous block, boolean force) throws IOException {
+  // TODO: support BlockInfoStrippedUC
+  private BlockInfo completeBlock(final BlockCollection bc,
+      final BlockInfo block, boolean force) throws IOException {
     BlockInfoContiguous[] fileBlocks = bc.getBlocks();
-    for(int idx = 0; idx < fileBlocks.length; idx++)
-      if(fileBlocks[idx] == block) {
+    for (int idx = 0; idx < fileBlocks.length; idx++) {
+      if (fileBlocks[idx] == block) {
         return completeBlock(bc, idx, force);
       }
+    }
     return block;
   }
   
@@ -689,7 +692,7 @@ public class BlockManager {
    * regardless of whether enough replicas are present. This is necessary
    * when tailing edit logs as a Standby.
    */
-  public BlockInfoContiguous forceCompleteBlock(final BlockCollection bc,
+  public BlockInfo forceCompleteBlock(final BlockCollection bc,
       final BlockInfoContiguousUnderConstruction block) throws IOException {
     block.commitBlock(block);
     return completeBlock(bc, block, true);
@@ -721,8 +724,8 @@ public class BlockManager {
 
     DatanodeStorageInfo[] targets = getStorages(oldBlock);
 
-    BlockInfoContiguousUnderConstruction ucBlock =
-        bc.setLastBlock(oldBlock, targets);
+    BlockInfoContiguousUnderConstruction ucBlock = bc.setLastBlock(oldBlock,
+        targets);
     blocksMap.replaceBlock(ucBlock);
 
     // Remove block from replication queue.
@@ -1022,7 +1025,7 @@ public class BlockManager {
     if(numBlocks == 0) {
       return new BlocksWithLocations(new BlockWithLocations[0]);
     }
-    Iterator<BlockInfoContiguous> iter = node.getBlockIterator();
+    Iterator<BlockInfo> iter = node.getBlockIterator();
     int startBlock = DFSUtil.getRandom().nextInt(numBlocks); // starting from a random block
     // skip blocks
     for(int i=0; i<startBlock; i++) {
@@ -1030,7 +1033,7 @@ public class BlockManager {
     }
     List<BlockWithLocations> results = new ArrayList<BlockWithLocations>();
     long totalSize = 0;
-    BlockInfoContiguous curBlock;
+    BlockInfo curBlock;
     while(totalSize<size && iter.hasNext()) {
       curBlock = iter.next();
       if(!curBlock.isComplete())  continue;
@@ -1129,7 +1132,8 @@ public class BlockManager {
   public void findAndMarkBlockAsCorrupt(final ExtendedBlock blk,
       final DatanodeInfo dn, String storageID, String reason) throws IOException {
     assert namesystem.hasWriteLock();
-    final BlockInfoContiguous storedBlock = getStoredBlock(blk.getLocalBlock());
+    final Block reportedBlock = blk.getLocalBlock();
+    final BlockInfo storedBlock = getStoredBlock(reportedBlock);
     if (storedBlock == null) {
       // Check if the replica is in the blockMap, if not
       // ignore the request for now. This could happen when BlockScanner
@@ -1146,7 +1150,7 @@ public class BlockManager {
           + ") does not exist");
     }
     
-    markBlockAsCorrupt(new BlockToMarkCorrupt(storedBlock,
+    markBlockAsCorrupt(new BlockToMarkCorrupt(reportedBlock, storedBlock,
             blk.getGenerationStamp(), reason, Reason.CORRUPTION_REPORTED),
         storageID == null ? null : node.getStorageInfo(storageID),
         node);
@@ -1172,7 +1176,7 @@ public class BlockManager {
 
     // Add replica to the data-node if it is not already there
     if (storageInfo != null) {
-      storageInfo.addBlock(b.stored);
+      storageInfo.addBlock(b.stored, b.reportedBlock);
     }
 
     // Add this replica to corruptReplicas Map
@@ -1717,41 +1721,55 @@ public class BlockManager {
       this.reportedState = reportedState;
     }
   }
-  
+
+  private static class BlockInfoToAdd {
+    final BlockInfo stored;
+    final Block reported;
+
+    BlockInfoToAdd(BlockInfo stored, Block reported) {
+      this.stored = stored;
+      this.reported = reported;
+    }
+  }
+
   /**
    * BlockToMarkCorrupt is used to build the "toCorrupt" list, which is a
    * list of blocks that should be considered corrupt due to a block report.
    */
   private static class BlockToMarkCorrupt {
     /** The corrupted block in a datanode. */
-    final BlockInfoContiguous corrupted;
+    final BlockInfo corrupted;
     /** The corresponding block stored in the BlockManager. */
-    final BlockInfoContiguous stored;
+    final BlockInfo stored;
+    /** The block reported from a datanode */
+    final Block reportedBlock;
     /** The reason to mark corrupt. */
     final String reason;
     /** The reason code to be stored */
     final Reason reasonCode;
 
-    BlockToMarkCorrupt(BlockInfoContiguous corrupted,
-        BlockInfoContiguous stored, String reason,
-        Reason reasonCode) {
+    BlockToMarkCorrupt(Block reported, BlockInfo corrupted,
+        BlockInfo stored, String reason, Reason reasonCode) {
+      Preconditions.checkNotNull(reported, "reported is null");
       Preconditions.checkNotNull(corrupted, "corrupted is null");
       Preconditions.checkNotNull(stored, "stored is null");
 
+      this.reportedBlock = reported;
       this.corrupted = corrupted;
       this.stored = stored;
       this.reason = reason;
       this.reasonCode = reasonCode;
     }
 
-    BlockToMarkCorrupt(BlockInfoContiguous stored, String reason,
+    BlockToMarkCorrupt(Block reported, BlockInfo stored, String reason,
         Reason reasonCode) {
-      this(stored, stored, reason, reasonCode);
+      this(reported, stored, stored, reason, reasonCode);
     }
 
-    BlockToMarkCorrupt(BlockInfoContiguous stored, long gs, String reason,
-        Reason reasonCode) {
-      this(new BlockInfoContiguous(stored), stored, reason, reasonCode);
+    BlockToMarkCorrupt(Block reported, BlockInfo stored, long gs,
+        String reason, Reason reasonCode) {
+      this(reported, BlockInfo.copyOf(stored), stored, reason,
+          reasonCode);
       //the corrupted block in datanode has a different generation stamp
       corrupted.setGenerationStamp(gs);
     }
@@ -1876,7 +1894,7 @@ public class BlockManager {
           break;
         }
 
-        BlockInfoContiguous bi = getStoredBlock(b);
+        BlockInfo bi = getStoredBlock(b);
         if (bi == null) {
           if (LOG.isDebugEnabled()) {
             LOG.debug("BLOCK* rescanPostponedMisreplicatedBlocks: " +
@@ -1916,7 +1934,7 @@ public class BlockManager {
     // Modify the (block-->datanode) map, according to the difference
     // between the old and new block report.
     //
-    Collection<BlockInfoContiguous> toAdd = new LinkedList<BlockInfoContiguous>();
+    Collection<BlockInfoToAdd> toAdd = new LinkedList<>();
     Collection<Block> toRemove = new TreeSet<Block>();
     Collection<Block> toInvalidate = new LinkedList<Block>();
     Collection<BlockToMarkCorrupt> toCorrupt = new LinkedList<BlockToMarkCorrupt>();
@@ -1933,8 +1951,9 @@ public class BlockManager {
       removeStoredBlock(b, node);
     }
     int numBlocksLogged = 0;
-    for (BlockInfoContiguous b : toAdd) {
-      addStoredBlock(b, storageInfo, null, numBlocksLogged < maxNumBlocksToLog);
+    for (BlockInfoToAdd b : toAdd) {
+      addStoredBlock(b.stored, b.reported, storageInfo, null,
+          numBlocksLogged < maxNumBlocksToLog);
       numBlocksLogged++;
     }
     if (numBlocksLogged > maxNumBlocksToLog) {
@@ -1979,7 +1998,7 @@ public class BlockManager {
         continue;
       }
       
-      BlockInfoContiguous storedBlock = getStoredBlock(iblk);
+      BlockInfo storedBlock = getStoredBlock(iblk);
       // If block does not belong to any file, we are done.
       if (storedBlock == null) continue;
       
@@ -2002,7 +2021,7 @@ public class BlockManager {
       
       // If block is under construction, add this replica to its list
       if (isBlockUnderConstruction(storedBlock, ucState, reportedState)) {
-        ((BlockInfoContiguousUnderConstruction)storedBlock)
+        ((BlockInfoContiguousUnderConstruction) storedBlock)
             .addReplicaIfNotPresent(storageInfo, iblk, reportedState);
         // OpenFileBlocks only inside snapshots also will be added to safemode
         // threshold. So we need to update such blocks to safemode
@@ -2017,14 +2036,14 @@ public class BlockManager {
       }      
       //add replica if appropriate
       if (reportedState == ReplicaState.FINALIZED) {
-        addStoredBlockImmediate(storedBlock, storageInfo);
+        addStoredBlockImmediate(storedBlock, iblk, storageInfo);
       }
     }
   }
 
   private void reportDiff(DatanodeStorageInfo storageInfo, 
       BlockListAsLongs newReport, 
-      Collection<BlockInfoContiguous> toAdd,              // add to DatanodeDescriptor
+      Collection<BlockInfoToAdd> toAdd,     // add to DatanodeDescriptor
       Collection<Block> toRemove,           // remove from DatanodeDescriptor
       Collection<Block> toInvalidate,       // should be removed from DN
       Collection<BlockToMarkCorrupt> toCorrupt, // add to corrupt replicas list
@@ -2032,8 +2051,10 @@ public class BlockManager {
 
     // place a delimiter in the list which separates blocks 
     // that have been reported from those that have not
-    BlockInfoContiguous delimiter = new BlockInfoContiguous(new Block(), (short) 1);
-    AddBlockResult result = storageInfo.addBlock(delimiter);
+    Block delimiterBlock = new Block();
+    BlockInfoContiguous delimiter = new BlockInfoContiguous(delimiterBlock,
+        (short) 1);
+    AddBlockResult result = storageInfo.addBlock(delimiter, delimiterBlock);
     assert result == AddBlockResult.ADDED 
         : "Delimiting block cannot be present in the node";
     int headIndex = 0; //currently the delimiter is in the head of the list
@@ -2045,7 +2066,7 @@ public class BlockManager {
     // scan the report and process newly reported blocks
     for (BlockReportReplica iblk : newReport) {
       ReplicaState iState = iblk.getState();
-      BlockInfoContiguous storedBlock = processReportedBlock(storageInfo,
+      BlockInfo storedBlock = processReportedBlock(storageInfo,
           iblk, iState, toAdd, toInvalidate, toCorrupt, toUC);
 
       // move block to the head of the list
@@ -2057,8 +2078,7 @@ public class BlockManager {
 
     // collect blocks that have not been reported
     // all of them are next to the delimiter
-    Iterator<BlockInfoContiguous> it =
-        storageInfo.new BlockIterator(delimiter.getNext(0));
+    Iterator<BlockInfo> it = storageInfo.new BlockIterator(delimiter.getNext(0));
     while(it.hasNext())
       toRemove.add(it.next());
     storageInfo.removeBlock(delimiter);
@@ -2095,10 +2115,10 @@ public class BlockManager {
    * @return the up-to-date stored block, if it should be kept.
    *         Otherwise, null.
    */
-  private BlockInfoContiguous processReportedBlock(
+  private BlockInfo processReportedBlock(
       final DatanodeStorageInfo storageInfo,
       final Block block, final ReplicaState reportedState, 
-      final Collection<BlockInfoContiguous> toAdd,
+      final Collection<BlockInfoToAdd> toAdd,
       final Collection<Block> toInvalidate, 
       final Collection<BlockToMarkCorrupt> toCorrupt,
       final Collection<StatefulBlockInfo> toUC) {
@@ -2119,7 +2139,7 @@ public class BlockManager {
     }
     
     // find block by blockId
-    BlockInfoContiguous storedBlock = getStoredBlock(block);
+    BlockInfo storedBlock = getStoredBlock(block);
     if(storedBlock == null) {
       // If blocksMap does not contain reported block id,
       // the replica should be removed from the data-node.
@@ -2173,7 +2193,7 @@ public class BlockManager {
     if (reportedState == ReplicaState.FINALIZED
         && (storedBlock.findStorageInfo(storageInfo) == -1 ||
             corruptReplicas.isReplicaCorrupt(storedBlock, dn))) {
-      toAdd.add(storedBlock);
+      toAdd.add(new BlockInfoToAdd(storedBlock, block));
     }
     return storedBlock;
   }
@@ -2251,7 +2271,7 @@ public class BlockManager {
    */
   private BlockToMarkCorrupt checkReplicaCorrupt(
       Block reported, ReplicaState reportedState, 
-      BlockInfoContiguous storedBlock, BlockUCState ucState,
+      BlockInfo storedBlock, BlockUCState ucState,
       DatanodeDescriptor dn) {
     switch(reportedState) {
     case FINALIZED:
@@ -2260,12 +2280,12 @@ public class BlockManager {
       case COMMITTED:
         if (storedBlock.getGenerationStamp() != reported.getGenerationStamp()) {
           final long reportedGS = reported.getGenerationStamp();
-          return new BlockToMarkCorrupt(storedBlock, reportedGS,
+          return new BlockToMarkCorrupt(reported, storedBlock, reportedGS,
               "block is " + ucState + " and reported genstamp " + reportedGS
               + " does not match genstamp in block map "
               + storedBlock.getGenerationStamp(), Reason.GENSTAMP_MISMATCH);
         } else if (storedBlock.getNumBytes() != reported.getNumBytes()) {
-          return new BlockToMarkCorrupt(storedBlock,
+          return new BlockToMarkCorrupt(reported, storedBlock,
               "block is " + ucState + " and reported length " +
               reported.getNumBytes() + " does not match " +
               "length in block map " + storedBlock.getNumBytes(),
@@ -2276,8 +2296,8 @@ public class BlockManager {
       case UNDER_CONSTRUCTION:
         if (storedBlock.getGenerationStamp() > reported.getGenerationStamp()) {
           final long reportedGS = reported.getGenerationStamp();
-          return new BlockToMarkCorrupt(storedBlock, reportedGS, "block is "
-              + ucState + " and reported state " + reportedState
+          return new BlockToMarkCorrupt(reported, storedBlock, reportedGS,
+              "block is " + ucState + " and reported state " + reportedState
               + ", But reported genstamp " + reportedGS
               + " does not match genstamp in block map "
               + storedBlock.getGenerationStamp(), Reason.GENSTAMP_MISMATCH);
@@ -2292,7 +2312,7 @@ public class BlockManager {
         return null; // not corrupt
       } else if (storedBlock.getGenerationStamp() != reported.getGenerationStamp()) {
         final long reportedGS = reported.getGenerationStamp();
-        return new BlockToMarkCorrupt(storedBlock, reportedGS,
+        return new BlockToMarkCorrupt(reported, storedBlock, reportedGS,
             "reported " + reportedState + " replica with genstamp " + reportedGS
             + " does not match COMPLETE block's genstamp in block map "
             + storedBlock.getGenerationStamp(), Reason.GENSTAMP_MISMATCH);
@@ -2307,7 +2327,7 @@ public class BlockManager {
               "complete with the same genstamp");
           return null;
         } else {
-          return new BlockToMarkCorrupt(storedBlock,
+          return new BlockToMarkCorrupt(reported, storedBlock,
               "reported replica has invalid state " + reportedState,
               Reason.INVALID_STATE);
         }
@@ -2320,11 +2340,12 @@ public class BlockManager {
       " on " + dn + " size " + storedBlock.getNumBytes();
       // log here at WARN level since this is really a broken HDFS invariant
       LOG.warn(msg);
-      return new BlockToMarkCorrupt(storedBlock, msg, Reason.INVALID_STATE);
+      return new BlockToMarkCorrupt(reported, storedBlock, msg,
+          Reason.INVALID_STATE);
     }
   }
 
-  private boolean isBlockUnderConstruction(BlockInfoContiguous storedBlock,
+  private boolean isBlockUnderConstruction(BlockInfo storedBlock,
       BlockUCState ucState, ReplicaState reportedState) {
     switch(reportedState) {
     case FINALIZED:
@@ -2353,7 +2374,7 @@ public class BlockManager {
 
     if (ucBlock.reportedState == ReplicaState.FINALIZED &&
         !block.findDatanode(storageInfo.getDatanodeDescriptor())) {
-      addStoredBlock(block, storageInfo, null, true);
+      addStoredBlock(block, ucBlock.reportedBlock, storageInfo, null, true);
     }
   } 
 
@@ -2368,18 +2389,18 @@ public class BlockManager {
    * 
    * @throws IOException
    */
-  private void addStoredBlockImmediate(BlockInfoContiguous storedBlock,
+  private void addStoredBlockImmediate(BlockInfo storedBlock, Block reported,
       DatanodeStorageInfo storageInfo)
   throws IOException {
     assert (storedBlock != null && namesystem.hasWriteLock());
     if (!namesystem.isInStartupSafeMode() 
         || namesystem.isPopulatingReplQueues()) {
-      addStoredBlock(storedBlock, storageInfo, null, false);
+      addStoredBlock(storedBlock, reported, storageInfo, null, false);
       return;
     }
 
     // just add it
-    storageInfo.addBlock(storedBlock);
+    storageInfo.addBlock(storedBlock, reported);
 
     // Now check for completion of blocks and safe block count
     int numCurrentReplica = countLiveNodes(storedBlock);
@@ -2400,13 +2421,14 @@ public class BlockManager {
    * needed replications if this takes care of the problem.
    * @return the block that is stored in blockMap.
    */
-  private Block addStoredBlock(final BlockInfoContiguous block,
+  private Block addStoredBlock(final BlockInfo block,
+                               final Block reportedBlock,
                                DatanodeStorageInfo storageInfo,
                                DatanodeDescriptor delNodeHint,
                                boolean logEveryBlock)
   throws IOException {
     assert block != null && namesystem.hasWriteLock();
-    BlockInfoContiguous storedBlock;
+    BlockInfo storedBlock;
     DatanodeDescriptor node = storageInfo.getDatanodeDescriptor();
     if (block instanceof BlockInfoContiguousUnderConstruction) {
       //refresh our copy in case the block got completed in another thread
@@ -2427,7 +2449,7 @@ public class BlockManager {
     assert bc != null : "Block must belong to a file";
 
     // add block to the datanode
-    AddBlockResult result = storageInfo.addBlock(storedBlock);
+    AddBlockResult result = storageInfo.addBlock(storedBlock, reportedBlock);
 
     int curReplicaDelta;
     if (result == AddBlockResult.ADDED) {
@@ -2502,13 +2524,13 @@ public class BlockManager {
           storedBlock + "blockMap has " + numCorruptNodes + 
           " but corrupt replicas map has " + corruptReplicasCount);
     }
-    if ((corruptReplicasCount > 0) && (numLiveReplicas >= fileReplication))
-      invalidateCorruptReplicas(storedBlock);
+    if ((corruptReplicasCount > 0) && (numLiveReplicas >= fileReplication)) {
+      invalidateCorruptReplicas(storedBlock, reportedBlock);
+    }
     return storedBlock;
   }
 
-  private void logAddStoredBlock(BlockInfoContiguous storedBlock,
-      DatanodeDescriptor node) {
+  private void logAddStoredBlock(BlockInfo storedBlock, DatanodeDescriptor node) {
     if (!blockLog.isInfoEnabled()) {
       return;
     }
@@ -2535,7 +2557,7 @@ public class BlockManager {
    *
    * @param blk Block whose corrupt replicas need to be invalidated
    */
-  private void invalidateCorruptReplicas(BlockInfoContiguous blk) {
+  private void invalidateCorruptReplicas(BlockInfo blk, Block reported) {
     Collection<DatanodeDescriptor> nodes = corruptReplicas.getNodes(blk);
     boolean removedFromBlocksMap = true;
     if (nodes == null)
@@ -2545,7 +2567,7 @@ public class BlockManager {
     DatanodeDescriptor[] nodesCopy = nodes.toArray(new DatanodeDescriptor[0]);
     for (DatanodeDescriptor node : nodesCopy) {
       try {
-        if (!invalidateBlock(new BlockToMarkCorrupt(blk, null,
+        if (!invalidateBlock(new BlockToMarkCorrupt(reported, blk, null,
               Reason.ANY), node)) {
           removedFromBlocksMap = false;
         }
@@ -2614,7 +2636,7 @@ public class BlockManager {
     long nrInvalid = 0, nrOverReplicated = 0;
     long nrUnderReplicated = 0, nrPostponed = 0, nrUnderConstruction = 0;
     long startTimeMisReplicatedScan = Time.now();
-    Iterator<BlockInfoContiguous> blocksItr = blocksMap.getBlocks().iterator();
+    Iterator<BlockInfo> blocksItr = blocksMap.getBlocks().iterator();
     long totalBlocks = blocksMap.size();
     replicationQueuesInitProgress = 0;
     long totalProcessed = 0;
@@ -2626,7 +2648,7 @@ public class BlockManager {
       namesystem.writeLockInterruptibly();
       try {
         while (processed < numBlocksPerIteration && blocksItr.hasNext()) {
-          BlockInfoContiguous block = blocksItr.next();
+          BlockInfo block = blocksItr.next();
           MisReplicationResult res = processMisReplicatedBlock(block);
           if (LOG.isTraceEnabled()) {
             LOG.trace("block " + block + ": " + res);
@@ -2700,7 +2722,7 @@ public class BlockManager {
    * appropriate queues if necessary, and returns a result code indicating
    * what happened with it.
    */
-  private MisReplicationResult processMisReplicatedBlock(BlockInfoContiguous block) {
+  private MisReplicationResult processMisReplicatedBlock(BlockInfo block) {
     BlockCollection bc = block.getBlockCollection();
     if (bc == null) {
       // block does not belong to any file
@@ -3029,14 +3051,14 @@ public class BlockManager {
       ReplicaState reportedState, DatanodeDescriptor delHintNode)
       throws IOException {
     // blockReceived reports a finalized block
-    Collection<BlockInfoContiguous> toAdd = new LinkedList<BlockInfoContiguous>();
+    Collection<BlockInfoToAdd> toAdd = new LinkedList<>();
     Collection<Block> toInvalidate = new LinkedList<Block>();
     Collection<BlockToMarkCorrupt> toCorrupt = new LinkedList<BlockToMarkCorrupt>();
     Collection<StatefulBlockInfo> toUC = new LinkedList<StatefulBlockInfo>();
     final DatanodeDescriptor node = storageInfo.getDatanodeDescriptor();
 
-    processReportedBlock(storageInfo, block, reportedState,
-                              toAdd, toInvalidate, toCorrupt, toUC);
+    processReportedBlock(storageInfo, block, reportedState, toAdd, toInvalidate,
+        toCorrupt, toUC);
     // the block is only in one of the to-do lists
     // if it is in none then data-node already has it
     assert toUC.size() + toAdd.size() + toInvalidate.size() + toCorrupt.size() <= 1
@@ -3046,8 +3068,9 @@ public class BlockManager {
       addStoredBlockUnderConstruction(b, storageInfo);
     }
     long numBlocksLogged = 0;
-    for (BlockInfoContiguous b : toAdd) {
-      addStoredBlock(b, storageInfo, delHintNode, numBlocksLogged < maxNumBlocksToLog);
+    for (BlockInfoToAdd b : toAdd) {
+      addStoredBlock(b.stored, b.reported, storageInfo, delHintNode,
+          numBlocksLogged < maxNumBlocksToLog);
       numBlocksLogged++;
     }
     if (numBlocksLogged > maxNumBlocksToLog) {
@@ -3170,7 +3193,7 @@ public class BlockManager {
    * @param b - the block being tested
    * @return count of live nodes for this block
    */
-  int countLiveNodes(BlockInfoContiguous b) {
+  int countLiveNodes(BlockInfo b) {
     if (!namesystem.isInStartupSafeMode()) {
       return countNodes(b).liveReplicas();
     }
@@ -3244,7 +3267,7 @@ public class BlockManager {
     return blocksMap.size();
   }
 
-  public DatanodeStorageInfo[] getStorages(BlockInfoContiguous block) {
+  public DatanodeStorageInfo[] getStorages(BlockInfo block) {
     final DatanodeStorageInfo[] storages = new DatanodeStorageInfo[block.numNodes()];
     int i = 0;
     for(DatanodeStorageInfo s : blocksMap.getStorages(block)) {
@@ -3274,8 +3297,8 @@ public class BlockManager {
     }
   }
 
-  public BlockInfoContiguous getStoredBlock(Block block) {
-    BlockInfoContiguous info = null;
+  public BlockInfo getStoredBlock(Block block) {
+    BlockInfo info = null;
     if (BlockIdManager.isStripedBlockID(block.getBlockId())) {
       info = blocksMap.getStoredBlock(
           new Block(BlockIdManager.convertToGroupID(block.getBlockId())));
@@ -3432,7 +3455,8 @@ public class BlockManager {
 
   public BlockInfoContiguous addBlockCollection(BlockInfoContiguous block,
       BlockCollection bc) {
-    return blocksMap.addBlockCollection(block, bc);
+    // TODO
+    return (BlockInfoContiguous) blocksMap.addBlockCollection(block, bc);
   }
 
   public BlockCollection getBlockCollection(Block b) {
@@ -3640,7 +3664,7 @@ public class BlockManager {
 
   /**
    * A simple result enum for the result of
-   * {@link BlockManager#processMisReplicatedBlock(BlockInfoContiguous)}.
+   * {@link BlockManager#processMisReplicatedBlock}.
    */
   enum MisReplicationResult {
     /** The block should be invalidated since it belongs to a deleted file. */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/10488c9c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java
index 806a4cb..d383de8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java
@@ -20,12 +20,10 @@ package org.apache.hadoop.hdfs.server.blockmanagement;
 import java.util.Iterator;
 
 import org.apache.hadoop.hdfs.protocol.Block;
-import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo.AddBlockResult;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
 import org.apache.hadoop.util.GSet;
 import org.apache.hadoop.util.LightWeightGSet;
 
-import com.google.common.base.Preconditions;
 import com.google.common.base.Predicate;
 import com.google.common.collect.Iterables;
 
@@ -36,10 +34,10 @@ import com.google.common.collect.Iterables;
  */
 class BlocksMap {
   private static class StorageIterator implements Iterator<DatanodeStorageInfo> {
-    private final BlockInfoContiguous blockInfo;
+    private final BlockInfo blockInfo;
     private int nextIdx = 0;
       
-    StorageIterator(BlockInfoContiguous blkInfo) {
+    StorageIterator(BlockInfo blkInfo) {
       this.blockInfo = blkInfo;
     }
 
@@ -63,14 +61,14 @@ class BlocksMap {
   /** Constant {@link LightWeightGSet} capacity. */
   private final int capacity;
   
-  private GSet<Block, BlockInfoContiguous> blocks;
+  private GSet<Block, BlockInfo> blocks;
 
   BlocksMap(int capacity) {
     // Use 2% of total memory to size the GSet capacity
     this.capacity = capacity;
-    this.blocks = new LightWeightGSet<Block, BlockInfoContiguous>(capacity) {
+    this.blocks = new LightWeightGSet<Block, BlockInfo>(capacity) {
       @Override
-      public Iterator<BlockInfoContiguous> iterator() {
+      public Iterator<BlockInfo> iterator() {
         SetIterator iterator = new SetIterator();
         /*
          * Not tracking any modifications to set. As this set will be used
@@ -97,15 +95,15 @@ class BlocksMap {
   }
 
   BlockCollection getBlockCollection(Block b) {
-    BlockInfoContiguous info = blocks.get(b);
+    BlockInfo info = blocks.get(b);
     return (info != null) ? info.getBlockCollection() : null;
   }
 
   /**
    * Add block b belonging to the specified block collection to the map.
    */
-  BlockInfoContiguous addBlockCollection(BlockInfoContiguous b, BlockCollection bc) {
-    BlockInfoContiguous info = blocks.get(b);
+  BlockInfo addBlockCollection(BlockInfo b, BlockCollection bc) {
+    BlockInfo info = blocks.get(b);
     if (info != b) {
       info = b;
       blocks.put(info);
@@ -120,11 +118,12 @@ class BlocksMap {
    * and remove all data-node locations associated with the block.
    */
   void removeBlock(Block block) {
-    BlockInfoContiguous blockInfo = blocks.remove(block);
+    BlockInfo blockInfo = blocks.remove(block);
     if (blockInfo == null)
       return;
 
     blockInfo.setBlockCollection(null);
+    // TODO: fix this logic for block group
     for(int idx = blockInfo.numNodes()-1; idx >= 0; idx--) {
       DatanodeDescriptor dn = blockInfo.getDatanode(idx);
       dn.removeBlock(blockInfo); // remove from the list and wipe the location
@@ -132,7 +131,7 @@ class BlocksMap {
   }
   
   /** Returns the block object it it exists in the map. */
-  BlockInfoContiguous getStoredBlock(Block b) {
+  BlockInfo getStoredBlock(Block b) {
     return blocks.get(b);
   }
 
@@ -164,7 +163,7 @@ class BlocksMap {
    * For a block that has already been retrieved from the BlocksMap
    * returns {@link Iterable} of the storages the block belongs to.
    */
-  Iterable<DatanodeStorageInfo> getStorages(final BlockInfoContiguous storedBlock) {
+  Iterable<DatanodeStorageInfo> getStorages(final BlockInfo storedBlock) {
     return new Iterable<DatanodeStorageInfo>() {
       @Override
       public Iterator<DatanodeStorageInfo> iterator() {
@@ -175,7 +174,7 @@ class BlocksMap {
 
   /** counts number of containing nodes. Better than using iterator. */
   int numNodes(Block b) {
-    BlockInfoContiguous info = blocks.get(b);
+    BlockInfo info = blocks.get(b);
     return info == null ? 0 : info.numNodes();
   }
 
@@ -185,7 +184,7 @@ class BlocksMap {
    * only if it does not belong to any file and data-nodes.
    */
   boolean removeNode(Block b, DatanodeDescriptor node) {
-    BlockInfoContiguous info = blocks.get(b);
+    BlockInfo info = blocks.get(b);
     if (info == null)
       return false;
 
@@ -203,7 +202,7 @@ class BlocksMap {
     return blocks.size();
   }
 
-  Iterable<BlockInfoContiguous> getBlocks() {
+  Iterable<BlockInfo> getBlocks() {
     return blocks;
   }
   
@@ -218,20 +217,11 @@ class BlocksMap {
    * @param newBlock - block for replacement
    * @return new block
    */
-  BlockInfoContiguous replaceBlock(BlockInfoContiguous newBlock) {
-    BlockInfoContiguous currentBlock = blocks.get(newBlock);
+  BlockInfo replaceBlock(BlockInfo newBlock) {
+    BlockInfo currentBlock = blocks.get(newBlock);
     assert currentBlock != null : "the block if not in blocksMap";
     // replace block in data-node lists
-    for (int i = currentBlock.numNodes() - 1; i >= 0; i--) {
-      final DatanodeDescriptor dn = currentBlock.getDatanode(i);
-      final DatanodeStorageInfo storage = currentBlock.findStorageInfo(dn);
-      final boolean removed = storage.removeBlock(currentBlock);
-      Preconditions.checkState(removed, "currentBlock not found.");
-
-      final AddBlockResult result = storage.addBlock(newBlock);
-      Preconditions.checkState(result == AddBlockResult.ADDED,
-          "newBlock already exists.");
-    }
+    currentBlock.replaceBlock(newBlock);
     // replace block in the map itself
     blocks.put(newBlock);
     return newBlock;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/10488c9c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java
index bf5ece9..79d7713 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java
@@ -513,8 +513,7 @@ public class CacheReplicationMonitor extends Thread implements Closeable {
           iter.remove();
         }
       }
-      BlockInfoContiguous blockInfo = blockManager.
-            getStoredBlock(new Block(cblock.getBlockId()));
+      BlockInfoContiguous blockInfo = namesystem.getStoredBlock(new Block(cblock.getBlockId()));
       String reason = findReasonForNotCaching(cblock, blockInfo);
       int neededCached = 0;
       if (reason != null) {
@@ -628,8 +627,7 @@ public class CacheReplicationMonitor extends Thread implements Closeable {
       List<DatanodeDescriptor> pendingCached) {
     // To figure out which replicas can be cached, we consult the
     // blocksMap.  We don't want to try to cache a corrupt replica, though.
-    BlockInfoContiguous blockInfo = blockManager.
-          getStoredBlock(new Block(cachedBlock.getBlockId()));
+    BlockInfoContiguous blockInfo = namesystem.getStoredBlock(new Block(cachedBlock.getBlockId()));
     if (blockInfo == null) {
       LOG.debug("Block {}: can't add new cached replicas," +
           " because there is no record of this block " +
@@ -668,7 +666,7 @@ public class CacheReplicationMonitor extends Thread implements Closeable {
       while (it.hasNext()) {
         CachedBlock cBlock = it.next();
         BlockInfoContiguous info =
-            blockManager.getStoredBlock(new Block(cBlock.getBlockId()));
+            namesystem.getStoredBlock(new Block(cBlock.getBlockId()));
         if (info != null) {
           pendingBytes -= info.getNumBytes();
         }
@@ -678,7 +676,7 @@ public class CacheReplicationMonitor extends Thread implements Closeable {
       while (it.hasNext()) {
         CachedBlock cBlock = it.next();
         BlockInfoContiguous info =
-            blockManager.getStoredBlock(new Block(cBlock.getBlockId()));
+            namesystem.getStoredBlock(new Block(cBlock.getBlockId()));
         if (info != null) {
           pendingBytes += info.getNumBytes();
         }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/10488c9c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
index c0a17b1..a54d46b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
@@ -286,7 +286,7 @@ public class DatanodeDescriptor extends DatanodeInfo {
    * Remove block from the list of blocks belonging to the data-node. Remove
    * data-node from the block.
    */
-  boolean removeBlock(BlockInfoContiguous b) {
+  boolean removeBlock(BlockInfo b) {
     final DatanodeStorageInfo s = b.findStorageInfo(this);
     // if block exists on this datanode
     if (s != null) {
@@ -299,12 +299,9 @@ public class DatanodeDescriptor extends DatanodeInfo {
    * Remove block from the list of blocks belonging to the data-node. Remove
    * data-node from the block.
    */
-  boolean removeBlock(String storageID, BlockInfoContiguous b) {
+  boolean removeBlock(String storageID, BlockInfo b) {
     DatanodeStorageInfo s = getStorageInfo(storageID);
-    if (s != null) {
-      return s.removeBlock(b);
-    }
-    return false;
+    return s != null && s.removeBlock(b);
   }
 
   public void resetBlocks() {
@@ -482,12 +479,12 @@ public class DatanodeDescriptor extends DatanodeInfo {
     }
   }
 
-  private static class BlockIterator implements Iterator<BlockInfoContiguous> {
+  private static class BlockIterator implements Iterator<BlockInfo> {
     private int index = 0;
-    private final List<Iterator<BlockInfoContiguous>> iterators;
+    private final List<Iterator<BlockInfo>> iterators;
     
     private BlockIterator(final DatanodeStorageInfo... storages) {
-      List<Iterator<BlockInfoContiguous>> iterators = new ArrayList<Iterator<BlockInfoContiguous>>();
+      List<Iterator<BlockInfo>> iterators = new ArrayList<>();
       for (DatanodeStorageInfo e : storages) {
         iterators.add(e.getBlockIterator());
       }
@@ -501,7 +498,7 @@ public class DatanodeDescriptor extends DatanodeInfo {
     }
 
     @Override
-    public BlockInfoContiguous next() {
+    public BlockInfo next() {
       update();
       return iterators.get(index).next();
     }
@@ -518,10 +515,11 @@ public class DatanodeDescriptor extends DatanodeInfo {
     }
   }
 
-  Iterator<BlockInfoContiguous> getBlockIterator() {
+  Iterator<BlockInfo> getBlockIterator() {
     return new BlockIterator(getStorageInfos());
   }
-  Iterator<BlockInfoContiguous> getBlockIterator(final String storageID) {
+
+  Iterator<BlockInfo> getBlockIterator(final String storageID) {
     return new BlockIterator(getStorageInfo(storageID));
   }