You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by zh...@apache.org on 2015/03/23 21:15:07 UTC
[34/50] [abbrv] hadoop git commit: HDFS-7749. Erasure Coding: Add
striped block support in INodeFile. Contributed by Jing Zhao.
HDFS-7749. Erasure Coding: Add striped block support in INodeFile. Contributed by Jing Zhao.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/f05e27ee
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/f05e27ee
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/f05e27ee
Branch: refs/heads/HDFS-7285
Commit: f05e27eeba40b069ec71416969a65cfda1cb3261
Parents: 90f073f
Author: Jing Zhao <ji...@apache.org>
Authored: Wed Feb 25 22:10:26 2015 -0800
Committer: Zhe Zhang <zh...@apache.org>
Committed: Mon Mar 23 11:12:17 2015 -0700
----------------------------------------------------------------------
.../apache/hadoop/hdfs/protocolPB/PBHelper.java | 17 ++
.../server/blockmanagement/BlockCollection.java | 13 +-
.../hdfs/server/blockmanagement/BlockInfo.java | 88 ++++++-
.../BlockInfoContiguousUnderConstruction.java | 6 +-
.../blockmanagement/BlockInfoStriped.java | 31 +++
.../BlockInfoStripedUnderConstruction.java | 240 ++++++++++++++++++
.../server/blockmanagement/BlockManager.java | 147 +++++------
.../CacheReplicationMonitor.java | 16 +-
.../hdfs/server/namenode/FSDirConcatOp.java | 8 +-
.../hdfs/server/namenode/FSDirectory.java | 5 +-
.../hadoop/hdfs/server/namenode/FSEditLog.java | 8 +-
.../hdfs/server/namenode/FSEditLogLoader.java | 16 +-
.../hdfs/server/namenode/FSImageFormat.java | 7 +-
.../server/namenode/FSImageFormatPBINode.java | 46 +++-
.../hdfs/server/namenode/FSNamesystem.java | 130 ++++++----
.../namenode/FileUnderConstructionFeature.java | 15 +-
.../namenode/FileWithStripedBlocksFeature.java | 112 ++++++++
.../hadoop/hdfs/server/namenode/INodeFile.java | 254 +++++++++++++------
.../hdfs/server/namenode/LeaseManager.java | 6 +-
.../hdfs/server/namenode/NamenodeFsck.java | 4 +-
.../hadoop/hdfs/server/namenode/Namesystem.java | 3 +-
.../snapshot/FSImageFormatPBSnapshot.java | 7 +-
.../server/namenode/snapshot/FileDiffList.java | 9 +-
.../hadoop-hdfs/src/main/proto/fsimage.proto | 5 +
.../hadoop-hdfs/src/main/proto/hdfs.proto | 10 +
.../org/apache/hadoop/hdfs/DFSTestUtil.java | 3 +-
.../blockmanagement/TestReplicationPolicy.java | 4 +-
.../hdfs/server/namenode/TestAddBlock.java | 12 +-
.../hdfs/server/namenode/TestAddBlockgroup.java | 3 +-
.../namenode/TestBlockUnderConstruction.java | 6 +-
.../hdfs/server/namenode/TestFSImage.java | 4 +-
.../hdfs/server/namenode/TestFileTruncate.java | 4 +-
.../hadoop/hdfs/server/namenode/TestFsck.java | 4 +-
.../snapshot/TestSnapshotBlocksMap.java | 24 +-
.../namenode/snapshot/TestSnapshotDeletion.java | 16 +-
35 files changed, 963 insertions(+), 320 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f05e27ee/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
index fad1d2c..867023c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
@@ -170,6 +170,7 @@ import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageReportProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageTypeProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageTypesProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageUuidsProto;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StripedBlockProto;
import org.apache.hadoop.hdfs.protocol.proto.InotifyProtos;
import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.JournalInfoProto;
import org.apache.hadoop.hdfs.protocol.proto.XAttrProtos.GetXAttrsResponseProto;
@@ -182,6 +183,7 @@ import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NodeType;
@@ -427,6 +429,21 @@ public class PBHelper {
return new Block(b.getBlockId(), b.getNumBytes(), b.getGenStamp());
}
+ public static BlockInfoStriped convert(StripedBlockProto p) {
+ return new BlockInfoStriped(convert(p.getBlock()),
+ (short) p.getDataBlockNum(), (short) p.getParityBlockNum());
+ }
+
+ public static StripedBlockProto convert(BlockInfoStriped blk) {
+ BlockProto bp = BlockProto.newBuilder().setBlockId(blk.getBlockId())
+ .setGenStamp(blk.getGenerationStamp()).setNumBytes(blk.getNumBytes())
+ .build();
+ return StripedBlockProto.newBuilder()
+ .setDataBlockNum(blk.getDataBlockNum())
+ .setParityBlockNum(blk.getParityBlockNum())
+ .setBlock(bp).build();
+ }
+
public static BlockWithLocationsProto convert(BlockWithLocations blk) {
return BlockWithLocationsProto.newBuilder()
.setBlock(convert(blk.getBlock()))
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f05e27ee/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockCollection.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockCollection.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockCollection.java
index 974cac3..1c753de 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockCollection.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockCollection.java
@@ -31,7 +31,7 @@ public interface BlockCollection {
/**
* Get the last block of the collection.
*/
- public BlockInfoContiguous getLastBlock();
+ public BlockInfo getLastBlock();
/**
* Get content summary.
@@ -44,9 +44,9 @@ public interface BlockCollection {
public int numBlocks();
/**
- * Get the blocks or block groups.
+ * Get the blocks (striped or contiguous).
*/
- public BlockInfoContiguous[] getBlocks();
+ public BlockInfo[] getBlocks();
/**
* Get preferred block size for the collection
@@ -71,16 +71,15 @@ public interface BlockCollection {
public String getName();
/**
- * Set the block/block-group at the given index.
+ * Set the block (contiguous or striped) at the given index.
*/
- public void setBlock(int index, BlockInfoContiguous blk);
+ public void setBlock(int index, BlockInfo blk);
/**
* Convert the last block of the collection to an under-construction block
* and set the locations.
*/
- public BlockInfoContiguousUnderConstruction setLastBlock(
- BlockInfoContiguous lastBlock,
+ public void convertLastBlockToUC(BlockInfo lastBlock,
DatanodeStorageInfo[] targets) throws IOException;
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f05e27ee/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java
index f19ad32..d15cbec 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java
@@ -21,6 +21,7 @@ import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.util.LightWeightGSet;
+import java.io.IOException;
import java.util.LinkedList;
/**
@@ -289,8 +290,9 @@ public abstract class BlockInfo extends Block
/**
* BlockInfo represents a block that is not being constructed.
- * In order to start modifying the block, the BlockInfo should be converted
- * to {@link BlockInfoContiguousUnderConstruction}.
+ * In order to start modifying the block, the BlockInfo should be converted to
+ * {@link BlockInfoContiguousUnderConstruction} or
+ * {@link BlockInfoStripedUnderConstruction}.
* @return {@link HdfsServerConstants.BlockUCState#COMPLETE}
*/
public HdfsServerConstants.BlockUCState getBlockUCState() {
@@ -336,4 +338,86 @@ public abstract class BlockInfo extends Block
return new BlockInfoStriped((BlockInfoStriped) b);
}
}
+
+ static BlockInfo convertToCompleteBlock(BlockInfo blk) throws IOException {
+ if (blk instanceof BlockInfoContiguousUnderConstruction) {
+ return ((BlockInfoContiguousUnderConstruction) blk)
+ .convertToCompleteBlock();
+ } else if (blk instanceof BlockInfoStripedUnderConstruction) {
+ return ((BlockInfoStripedUnderConstruction) blk).convertToCompleteBlock();
+ } else {
+ return blk;
+ }
+ }
+
+ static void commitBlock(BlockInfo blockInfo, Block reported)
+ throws IOException {
+ if (blockInfo instanceof BlockInfoContiguousUnderConstruction) {
+ ((BlockInfoContiguousUnderConstruction) blockInfo).commitBlock(reported);
+ } else if (blockInfo instanceof BlockInfoStripedUnderConstruction) {
+ ((BlockInfoStripedUnderConstruction) blockInfo).commitBlock(reported);
+ }
+ }
+
+ static void addReplica(BlockInfo ucBlock, DatanodeStorageInfo storageInfo,
+ Block reportedBlock, HdfsServerConstants.ReplicaState reportedState) {
+ assert ucBlock instanceof BlockInfoContiguousUnderConstruction ||
+ ucBlock instanceof BlockInfoStripedUnderConstruction;
+ if (ucBlock instanceof BlockInfoContiguousUnderConstruction) {
+ ((BlockInfoContiguousUnderConstruction) ucBlock).addReplicaIfNotPresent(
+ storageInfo, reportedBlock, reportedState);
+ } else { // StripedUC
+ ((BlockInfoStripedUnderConstruction) ucBlock).addReplicaIfNotPresent(
+ storageInfo, reportedBlock, reportedState);
+ }
+ }
+
+ static int getNumExpectedLocations(BlockInfo ucBlock) {
+ assert ucBlock instanceof BlockInfoContiguousUnderConstruction ||
+ ucBlock instanceof BlockInfoStripedUnderConstruction;
+ if (ucBlock instanceof BlockInfoContiguousUnderConstruction) {
+ return ((BlockInfoContiguousUnderConstruction) ucBlock)
+ .getNumExpectedLocations();
+ } else { // StripedUC
+ return ((BlockInfoStripedUnderConstruction) ucBlock)
+ .getNumExpectedLocations();
+ }
+ }
+
+ public static DatanodeStorageInfo[] getExpectedStorageLocations(
+ BlockInfo ucBlock) {
+ assert ucBlock instanceof BlockInfoContiguousUnderConstruction ||
+ ucBlock instanceof BlockInfoStripedUnderConstruction;
+ if (ucBlock instanceof BlockInfoContiguousUnderConstruction) {
+ return ((BlockInfoContiguousUnderConstruction) ucBlock)
+ .getExpectedStorageLocations();
+ } else { // StripedUC
+ return ((BlockInfoStripedUnderConstruction) ucBlock)
+ .getExpectedStorageLocations();
+ }
+ }
+
+ public static void setExpectedLocations(BlockInfo ucBlock,
+ DatanodeStorageInfo[] targets) {
+ assert ucBlock instanceof BlockInfoContiguousUnderConstruction ||
+ ucBlock instanceof BlockInfoStripedUnderConstruction;
+ if (ucBlock instanceof BlockInfoContiguousUnderConstruction) {
+ ((BlockInfoContiguousUnderConstruction) ucBlock)
+ .setExpectedLocations(targets);
+ } else { // StripedUC
+ ((BlockInfoStripedUnderConstruction) ucBlock)
+ .setExpectedLocations(targets);
+ }
+ }
+
+ public static long getBlockRecoveryId(BlockInfo ucBlock) {
+ assert ucBlock instanceof BlockInfoContiguousUnderConstruction ||
+ ucBlock instanceof BlockInfoStripedUnderConstruction;
+ if (ucBlock instanceof BlockInfoContiguousUnderConstruction) {
+ return ((BlockInfoContiguousUnderConstruction) ucBlock)
+ .getBlockRecoveryId();
+ } else { // StripedUC
+ return ((BlockInfoStripedUnderConstruction) ucBlock).getBlockRecoveryId();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f05e27ee/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguousUnderConstruction.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguousUnderConstruction.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguousUnderConstruction.java
index c78c9e2..7a052fd 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguousUnderConstruction.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguousUnderConstruction.java
@@ -74,7 +74,7 @@ public class BlockInfoContiguousUnderConstruction extends BlockInfoContiguous {
BlockUCState state, DatanodeStorageInfo[] targets) {
super(blk, replication);
assert getBlockUCState() != BlockUCState.COMPLETE :
- "BlockInfoUnderConstruction cannot be in COMPLETE state";
+ "BlockInfoContiguousUnderConstruction cannot be in COMPLETE state";
this.blockUCState = state;
setExpectedLocations(targets);
}
@@ -82,7 +82,7 @@ public class BlockInfoContiguousUnderConstruction extends BlockInfoContiguous {
/**
* Convert an under construction block to a complete block.
*
- * @return BlockInfo - a complete block.
+ * @return BlockInfoContiguous - a complete block.
* @throws IOException if the state of the block
* (the generation stamp and the length) has not been committed by
* the client or it does not have at least a minimal number of replicas
@@ -197,7 +197,7 @@ public class BlockInfoContiguousUnderConstruction extends BlockInfoContiguous {
blockRecoveryId = recoveryId;
if (replicas.size() == 0) {
NameNode.blockStateChangeLog.warn("BLOCK*"
- + " BlockInfoUnderConstruction.initLeaseRecovery:"
+ + " BlockInfoContiguousUnderConstruction.initLeaseRecovery:"
+ " No blocks found, lease removed.");
}
boolean allLiveReplicasTriedAsPrimary = true;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f05e27ee/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStriped.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStriped.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStriped.java
index 5fff41e..57de772 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStriped.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStriped.java
@@ -18,6 +18,8 @@
package org.apache.hadoop.hdfs.server.blockmanagement;
import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
/**
* Subclass of {@link BlockInfo}, presenting a block group in erasure coding.
@@ -59,6 +61,14 @@ public class BlockInfoStriped extends BlockInfo {
return (short) (dataBlockNum + parityBlockNum);
}
+ public short getDataBlockNum() {
+ return dataBlockNum;
+ }
+
+ public short getParityBlockNum() {
+ return parityBlockNum;
+ }
+
private void initIndices() {
for (int i = 0; i < indices.length; i++) {
indices[i] = -1;
@@ -176,4 +186,25 @@ public class BlockInfoStriped extends BlockInfo {
}
return num;
}
+
+ /**
+ * Convert a complete block to an under construction block.
+ * @return BlockInfoUnderConstruction - an under construction block.
+ */
+ public BlockInfoStripedUnderConstruction convertToBlockUnderConstruction(
+ BlockUCState s, DatanodeStorageInfo[] targets) {
+ final BlockInfoStripedUnderConstruction ucBlock;
+ if(isComplete()) {
+ ucBlock = new BlockInfoStripedUnderConstruction(this, getDataBlockNum(),
+ getParityBlockNum(), s, targets);
+ ucBlock.setBlockCollection(getBlockCollection());
+ } else {
+ // the block is already under construction
+ ucBlock = (BlockInfoStripedUnderConstruction) this;
+ ucBlock.setBlockUCState(s);
+ ucBlock.setExpectedLocations(targets);
+ ucBlock.setBlockCollection(getBlockCollection());
+ }
+ return ucBlock;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f05e27ee/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStripedUnderConstruction.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStripedUnderConstruction.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStripedUnderConstruction.java
new file mode 100644
index 0000000..151241b2
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStripedUnderConstruction.java
@@ -0,0 +1,240 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.blockmanagement;
+
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState.COMPLETE;
+import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState.UNDER_CONSTRUCTION;
+
+/**
+ * Represents a striped block that is currently being constructed.
+ * This is usually the last block of a file opened for write or append.
+ */
+public class BlockInfoStripedUnderConstruction extends BlockInfoStriped {
+ private BlockUCState blockUCState;
+
+ /**
+ * Block replicas as assigned when the block was allocated.
+ *
+ * TODO: we need to update this attribute, along with the return type of
+ * getExpectedStorageLocations and LocatedBlock. For striped blocks, clients
+ * need to understand the index of each striped block in the block group.
+ */
+ private List<ReplicaUnderConstruction> replicas;
+
+ /**
+ * The new generation stamp, which this block will have
+ * after the recovery succeeds. Also used as a recovery id to identify
+ * the right recovery if any of the abandoned recoveries re-appear.
+ */
+ private long blockRecoveryId = 0;
+
+ /**
+ * Constructor with null storage targets.
+ */
+ public BlockInfoStripedUnderConstruction(Block blk, short dataBlockNum,
+ short parityBlockNum) {
+ this(blk, dataBlockNum, parityBlockNum, UNDER_CONSTRUCTION, null);
+ }
+
+ /**
+ * Create a striped block that is currently being constructed.
+ */
+ public BlockInfoStripedUnderConstruction(Block blk, short dataBlockNum,
+ short parityBlockNum, BlockUCState state, DatanodeStorageInfo[] targets) {
+ super(blk, dataBlockNum, parityBlockNum);
+ assert getBlockUCState() != COMPLETE :
+ "BlockInfoStripedUnderConstruction cannot be in COMPLETE state";
+ this.blockUCState = state;
+ setExpectedLocations(targets);
+ }
+
+ /**
+ * Convert an under construction striped block to a complete striped block.
+ *
+ * @return BlockInfoStriped - a complete block.
+ * @throws IOException if the state of the block
+ * (the generation stamp and the length) has not been committed by
+ * the client or it does not have at least a minimal number of replicas
+ * reported from data-nodes.
+ */
+ BlockInfoStriped convertToCompleteBlock() throws IOException {
+ assert getBlockUCState() != COMPLETE :
+ "Trying to convert a COMPLETE block";
+ return new BlockInfoStriped(this);
+ }
+
+ /** Set expected locations */
+ public void setExpectedLocations(DatanodeStorageInfo[] targets) {
+ int numLocations = targets == null ? 0 : targets.length;
+ this.replicas = new ArrayList<>(numLocations);
+ for(int i = 0; i < numLocations; i++) {
+ replicas.add(new ReplicaUnderConstruction(this, targets[i],
+ ReplicaState.RBW));
+ }
+ }
+
+ /**
+ * Create array of expected replica locations
+ * (as has been assigned by chooseTargets()).
+ */
+ public DatanodeStorageInfo[] getExpectedStorageLocations() {
+ int numLocations = getNumExpectedLocations();
+ DatanodeStorageInfo[] storages = new DatanodeStorageInfo[numLocations];
+ for (int i = 0; i < numLocations; i++) {
+ storages[i] = replicas.get(i).getExpectedStorageLocation();
+ }
+ return storages;
+ }
+
+ /** Get the number of expected locations */
+ public int getNumExpectedLocations() {
+ return replicas == null ? 0 : replicas.size();
+ }
+
+ /**
+ * Return the state of the block under construction.
+ * @see BlockUCState
+ */
+ @Override // BlockInfo
+ public BlockUCState getBlockUCState() {
+ return blockUCState;
+ }
+
+ void setBlockUCState(BlockUCState s) {
+ blockUCState = s;
+ }
+
+ /** Get block recovery ID */
+ public long getBlockRecoveryId() {
+ return blockRecoveryId;
+ }
+
+ /**
+ * Process the recorded replicas. When about to commit or finish the
+ * pipeline recovery sort out bad replicas.
+ * @param genStamp The final generation stamp for the block.
+ */
+ public void setGenerationStampAndVerifyReplicas(long genStamp) {
+ // Set the generation stamp for the block.
+ setGenerationStamp(genStamp);
+ if (replicas == null)
+ return;
+
+ // Remove the replicas with wrong gen stamp.
+ // The replica list is unchanged.
+ for (ReplicaUnderConstruction r : replicas) {
+ if (genStamp != r.getGenerationStamp()) {
+ r.getExpectedStorageLocation().removeBlock(this);
+ NameNode.blockStateChangeLog.info("BLOCK* Removing stale replica "
+ + "from location: {}", r.getExpectedStorageLocation());
+ }
+ }
+ }
+
+ /**
+ * Commit block's length and generation stamp as reported by the client.
+ * Set block state to {@link BlockUCState#COMMITTED}.
+ * @param block - contains client reported block length and generation
+ */
+ void commitBlock(Block block) throws IOException {
+ if (getBlockId() != block.getBlockId()) {
+ throw new IOException("Trying to commit inconsistent block: id = "
+ + block.getBlockId() + ", expected id = " + getBlockId());
+ }
+ blockUCState = BlockUCState.COMMITTED;
+ this.set(getBlockId(), block.getNumBytes(), block.getGenerationStamp());
+ // Sort out invalid replicas.
+ setGenerationStampAndVerifyReplicas(block.getGenerationStamp());
+ }
+
+ /**
+ * Initialize lease recovery for this striped block.
+ */
+ public void initializeBlockRecovery(long recoveryId) {
+ setBlockUCState(BlockUCState.UNDER_RECOVERY);
+ blockRecoveryId = recoveryId;
+ if (replicas == null || replicas.size() == 0) {
+ NameNode.blockStateChangeLog.warn("BLOCK*" +
+ " BlockInfoUnderConstruction.initLeaseRecovery:" +
+ " No blocks found, lease removed.");
+ }
+ // TODO we need to implement different recovery logic here
+ }
+
+ void addReplicaIfNotPresent(DatanodeStorageInfo storage, Block block,
+ ReplicaState rState) {
+ Iterator<ReplicaUnderConstruction> it = replicas.iterator();
+ while (it.hasNext()) {
+ ReplicaUnderConstruction r = it.next();
+ DatanodeStorageInfo expectedLocation = r.getExpectedStorageLocation();
+ if (expectedLocation == storage) {
+ // Record the gen stamp from the report
+ r.setGenerationStamp(block.getGenerationStamp());
+ return;
+ } else if (expectedLocation != null &&
+ expectedLocation.getDatanodeDescriptor() ==
+ storage.getDatanodeDescriptor()) {
+ // The Datanode reported that the block is on a different storage
+ // than the one chosen by BlockPlacementPolicy. This can occur as
+ // we allow Datanodes to choose the target storage. Update our
+ // state by removing the stale entry and adding a new one.
+ it.remove();
+ break;
+ }
+ }
+ replicas.add(new ReplicaUnderConstruction(block, storage, rState));
+ }
+
+ @Override
+ public String toString() {
+ final StringBuilder b = new StringBuilder(100);
+ appendStringTo(b);
+ return b.toString();
+ }
+
+ @Override
+ public void appendStringTo(StringBuilder sb) {
+ super.appendStringTo(sb);
+ appendUCParts(sb);
+ }
+
+ private void appendUCParts(StringBuilder sb) {
+ sb.append("{UCState=").append(blockUCState).append(", replicas=[");
+ if (replicas != null) {
+ Iterator<ReplicaUnderConstruction> iter = replicas.iterator();
+ if (iter.hasNext()) {
+ iter.next().appendStringTo(sb);
+ while (iter.hasNext()) {
+ sb.append(", ");
+ iter.next().appendStringTo(sb);
+ }
+ }
+ }
+ sb.append("]}");
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f05e27ee/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index 5ba140e..5cdaa32 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -539,8 +539,8 @@ public class BlockManager {
int usableReplicas = numReplicas.liveReplicas() +
numReplicas.decommissionedReplicas();
- if (block instanceof BlockInfoContiguous) {
- BlockCollection bc = ((BlockInfoContiguous) block).getBlockCollection();
+ if (block instanceof BlockInfo) {
+ BlockCollection bc = ((BlockInfo) block).getBlockCollection();
String fileName = (bc == null) ? "[orphaned]" : bc.getName();
out.print(fileName + ": ");
}
@@ -594,15 +594,14 @@ public class BlockManager {
* @throws IOException if the block does not have at least a minimal number
* of replicas reported from data-nodes.
*/
- private static boolean commitBlock(
- final BlockInfoContiguousUnderConstruction block,
+ private static boolean commitBlock(final BlockInfo block,
final Block commitBlock) throws IOException {
if (block.getBlockUCState() == BlockUCState.COMMITTED)
return false;
assert block.getNumBytes() <= commitBlock.getNumBytes() :
"commitBlock length is less than the stored one "
+ commitBlock.getNumBytes() + " vs. " + block.getNumBytes();
- block.commitBlock(commitBlock);
+ BlockInfo.commitBlock(block, commitBlock);
return true;
}
@@ -620,16 +619,16 @@ public class BlockManager {
Block commitBlock) throws IOException {
if(commitBlock == null)
return false; // not committing, this is a block allocation retry
- BlockInfoContiguous lastBlock = bc.getLastBlock();
+ BlockInfo lastBlock = bc.getLastBlock();
if(lastBlock == null)
return false; // no blocks in file yet
if(lastBlock.isComplete())
return false; // already completed (e.g. by syncBlock)
- final boolean b = commitBlock(
- (BlockInfoContiguousUnderConstruction)lastBlock, commitBlock);
- if(countNodes(lastBlock).liveReplicas() >= minReplication)
- completeBlock(bc, bc.numBlocks()-1, false);
+ final boolean b = commitBlock(lastBlock, commitBlock);
+ if (countNodes(lastBlock).liveReplicas() >= minReplication) {
+ completeBlock(bc, bc.numBlocks() - 1, false);
+ }
return b;
}
@@ -642,22 +641,25 @@ public class BlockManager {
*/
private BlockInfo completeBlock(final BlockCollection bc,
final int blkIndex, boolean force) throws IOException {
- if(blkIndex < 0)
+ if (blkIndex < 0) {
return null;
- BlockInfoContiguous curBlock = bc.getBlocks()[blkIndex];
- if (curBlock.isComplete())
+ }
+ BlockInfo curBlock = bc.getBlocks()[blkIndex];
+ if (curBlock.isComplete()) {
return curBlock;
- // TODO: support BlockInfoStripedUC
- BlockInfoContiguousUnderConstruction ucBlock =
- (BlockInfoContiguousUnderConstruction)curBlock;
- int numNodes = ucBlock.numNodes();
- if (!force && numNodes < minReplication)
+ }
+
+ int numNodes = curBlock.numNodes();
+ if (!force && numNodes < minReplication) {
throw new IOException("Cannot complete block: " +
"block does not satisfy minimal replication requirement.");
- if(!force && ucBlock.getBlockUCState() != BlockUCState.COMMITTED)
+ }
+ if (!force && curBlock.getBlockUCState() != BlockUCState.COMMITTED) {
throw new IOException(
"Cannot complete block: block has not been COMMITTED by the client");
- BlockInfoContiguous completeBlock = ucBlock.convertToCompleteBlock();
+ }
+
+ final BlockInfo completeBlock = BlockInfo.convertToCompleteBlock(curBlock);
// replace penultimate block in file
bc.setBlock(blkIndex, completeBlock);
@@ -675,10 +677,9 @@ public class BlockManager {
return blocksMap.replaceBlock(completeBlock);
}
- // TODO: support BlockInfoStrippedUC
private BlockInfo completeBlock(final BlockCollection bc,
final BlockInfo block, boolean force) throws IOException {
- BlockInfoContiguous[] fileBlocks = bc.getBlocks();
+ BlockInfo[] fileBlocks = bc.getBlocks();
for (int idx = 0; idx < fileBlocks.length; idx++) {
if (fileBlocks[idx] == block) {
return completeBlock(bc, idx, force);
@@ -694,6 +695,7 @@ public class BlockManager {
*/
public BlockInfo forceCompleteBlock(final BlockCollection bc,
final BlockInfoContiguousUnderConstruction block) throws IOException {
+ // TODO: support BlockInfoStripedUC for editlog
block.commitBlock(block);
return completeBlock(bc, block, true);
}
@@ -715,7 +717,7 @@ public class BlockManager {
*/
public LocatedBlock convertLastBlockToUnderConstruction(
BlockCollection bc, long bytesToRemove) throws IOException {
- BlockInfoContiguous oldBlock = bc.getLastBlock();
+ BlockInfo oldBlock = bc.getLastBlock();
if(oldBlock == null ||
bc.getPreferredBlockSize() == oldBlock.getNumBytes() - bytesToRemove)
return null;
@@ -724,8 +726,10 @@ public class BlockManager {
DatanodeStorageInfo[] targets = getStorages(oldBlock);
- BlockInfoContiguousUnderConstruction ucBlock = bc.setLastBlock(oldBlock,
- targets);
+ // convert the last block to UC
+ bc.convertLastBlockToUC(oldBlock, targets);
+ // get the new created uc block
+ BlockInfo ucBlock = bc.getLastBlock();
blocksMap.replaceBlock(ucBlock);
// Remove block from replication queue.
@@ -767,11 +771,10 @@ public class BlockManager {
return locations;
}
- private List<LocatedBlock> createLocatedBlockList(
- final BlockInfoContiguous[] blocks,
+ private List<LocatedBlock> createLocatedBlockList(final BlockInfo[] blocks,
final long offset, final long length, final int nrBlocksToReturn,
final AccessMode mode) throws IOException {
- int curBlk = 0;
+ int curBlk;
long curPos = 0, blkSize = 0;
int nrBlocks = (blocks[0].getNumBytes() == 0) ? 0 : blocks.length;
for (curBlk = 0; curBlk < nrBlocks; curBlk++) {
@@ -784,10 +787,10 @@ public class BlockManager {
}
if (nrBlocks > 0 && curBlk == nrBlocks) // offset >= end of file
- return Collections.<LocatedBlock>emptyList();
+ return Collections.emptyList();
long endOff = offset + length;
- List<LocatedBlock> results = new ArrayList<LocatedBlock>(blocks.length);
+ List<LocatedBlock> results = new ArrayList<>(blocks.length);
do {
results.add(createLocatedBlock(blocks[curBlk], curPos, mode));
curPos += blocks[curBlk].getNumBytes();
@@ -798,9 +801,9 @@ public class BlockManager {
return results;
}
- private LocatedBlock createLocatedBlock(final BlockInfoContiguous[] blocks,
+ private LocatedBlock createLocatedBlock(final BlockInfo[] blocks,
final long endPos, final AccessMode mode) throws IOException {
- int curBlk = 0;
+ int curBlk;
long curPos = 0;
int nrBlocks = (blocks[0].getNumBytes() == 0) ? 0 : blocks.length;
for (curBlk = 0; curBlk < nrBlocks; curBlk++) {
@@ -814,8 +817,9 @@ public class BlockManager {
return createLocatedBlock(blocks[curBlk], curPos, mode);
}
- private LocatedBlock createLocatedBlock(final BlockInfoContiguous blk, final long pos,
- final BlockTokenSecretManager.AccessMode mode) throws IOException {
+ private LocatedBlock createLocatedBlock(final BlockInfo blk,
+ final long pos, final BlockTokenSecretManager.AccessMode mode)
+ throws IOException {
final LocatedBlock lb = createLocatedBlock(blk, pos);
if (mode != null) {
setBlockToken(lb, mode);
@@ -824,8 +828,8 @@ public class BlockManager {
}
/** @return a LocatedBlock for the given block */
- private LocatedBlock createLocatedBlock(final BlockInfoContiguous blk, final long pos
- ) throws IOException {
+ private LocatedBlock createLocatedBlock(final BlockInfo blk,
+ final long pos) throws IOException {
if (blk instanceof BlockInfoContiguousUnderConstruction) {
if (blk.isComplete()) {
throw new IOException(
@@ -838,6 +842,7 @@ public class BlockManager {
final ExtendedBlock eb = new ExtendedBlock(namesystem.getBlockPoolId(), blk);
return new LocatedBlock(eb, storages, pos, false);
}
+ // TODO support BlockInfoStripedUC
// get block locations
final int numCorruptNodes = countNodes(blk).corruptReplicas();
@@ -872,7 +877,7 @@ public class BlockManager {
}
/** Create a LocatedBlocks. */
- public LocatedBlocks createLocatedBlocks(final BlockInfoContiguous[] blocks,
+ public LocatedBlocks createLocatedBlocks(final BlockInfo[] blocks,
final long fileSizeExcludeBlocksUnderConstruction,
final boolean isFileUnderConstruction, final long offset,
final long length, final boolean needBlockToken,
@@ -895,7 +900,7 @@ public class BlockManager {
final LocatedBlock lastlb;
final boolean isComplete;
if (!inSnapshot) {
- final BlockInfoContiguous last = blocks[blocks.length - 1];
+ final BlockInfo last = blocks[blocks.length - 1];
final long lastPos = last.isComplete()?
fileSizeExcludeBlocksUnderConstruction - last.getNumBytes()
: fileSizeExcludeBlocksUnderConstruction;
@@ -1708,12 +1713,15 @@ public class BlockManager {
* reported by the datanode in the block report.
*/
static class StatefulBlockInfo {
- final BlockInfoContiguousUnderConstruction storedBlock;
+ final BlockInfo storedBlock; // should be UC block
final Block reportedBlock;
final ReplicaState reportedState;
- StatefulBlockInfo(BlockInfoContiguousUnderConstruction storedBlock,
+ StatefulBlockInfo(BlockInfo storedBlock,
Block reportedBlock, ReplicaState reportedState) {
+ Preconditions.checkArgument(
+ storedBlock instanceof BlockInfoContiguousUnderConstruction ||
+ storedBlock instanceof BlockInfoStripedUnderConstruction);
this.storedBlock = storedBlock;
this.reportedBlock = reportedBlock;
this.reportedState = reportedState;
@@ -2059,15 +2067,12 @@ public class BlockManager {
// If block is under construction, add this replica to its list
if (isBlockUnderConstruction(storedBlock, ucState, reportedState)) {
- ((BlockInfoContiguousUnderConstruction) storedBlock)
- .addReplicaIfNotPresent(storageInfo, iblk, reportedState);
+ BlockInfo.addReplica(storedBlock, storageInfo, iblk, reportedState);
// OpenFileBlocks only inside snapshots also will be added to safemode
// threshold. So we need to update such blocks to safemode
// refer HDFS-5283
- BlockInfoContiguousUnderConstruction blockUC =
- (BlockInfoContiguousUnderConstruction) storedBlock;
- if (namesystem.isInSnapshot(blockUC)) {
- int numOfReplicas = blockUC.getNumExpectedLocations();
+ if (namesystem.isInSnapshot(storedBlock.getBlockCollection())) {
+ int numOfReplicas = BlockInfo.getNumExpectedLocations(storedBlock);
namesystem.incrementSafeBlockCount(numOfReplicas);
}
//and fall through to next clause
@@ -2090,7 +2095,7 @@ public class BlockManager {
// place a delimiter in the list which separates blocks
// that have been reported from those that have not
Block delimiterBlock = new Block();
- BlockInfoContiguous delimiter = new BlockInfoContiguous(delimiterBlock,
+ BlockInfo delimiter = new BlockInfoContiguous(delimiterBlock,
(short) 1);
AddBlockResult result = storageInfo.addBlock(delimiter, delimiterBlock);
assert result == AddBlockResult.ADDED
@@ -2220,9 +2225,8 @@ public class BlockManager {
}
if (isBlockUnderConstruction(storedBlock, ucState, reportedState)) {
- toUC.add(new StatefulBlockInfo(
- (BlockInfoContiguousUnderConstruction) storedBlock,
- new Block(block), reportedState));
+ toUC.add(new StatefulBlockInfo(storedBlock, new Block(block),
+ reportedState));
return storedBlock;
}
@@ -2406,9 +2410,8 @@ public class BlockManager {
void addStoredBlockUnderConstruction(StatefulBlockInfo ucBlock,
DatanodeStorageInfo storageInfo) throws IOException {
- BlockInfoContiguousUnderConstruction block = ucBlock.storedBlock;
- block.addReplicaIfNotPresent(
- storageInfo, ucBlock.reportedBlock, ucBlock.reportedState);
+ BlockInfo block = ucBlock.storedBlock;
+ BlockInfo.addReplica(block, storageInfo, ucBlock.reportedBlock, ucBlock.reportedState);
if (ucBlock.reportedState == ReplicaState.FINALIZED &&
!block.findDatanode(storageInfo.getDatanodeDescriptor())) {
@@ -2468,7 +2471,8 @@ public class BlockManager {
assert block != null && namesystem.hasWriteLock();
BlockInfo storedBlock;
DatanodeDescriptor node = storageInfo.getDatanodeDescriptor();
- if (block instanceof BlockInfoContiguousUnderConstruction) {
+ if (block instanceof BlockInfoContiguousUnderConstruction ||
+ block instanceof BlockInfoStripedUnderConstruction) {
//refresh our copy in case the block got completed in another thread
storedBlock = getStoredBlock(block);
} else {
@@ -2484,7 +2488,6 @@ public class BlockManager {
return block;
}
BlockCollection bc = storedBlock.getBlockCollection();
- assert bc != null : "Block must belong to a file";
// add block to the datanode
AddBlockResult result = storageInfo.addBlock(storedBlock, reportedBlock);
@@ -3443,7 +3446,7 @@ public class BlockManager {
if (!this.shouldCheckForEnoughRacks) {
return true;
}
- boolean enoughRacks = false;;
+ boolean enoughRacks = false;
Collection<DatanodeDescriptor> corruptNodes =
corruptReplicas.getNodes(b);
int numExpectedReplicas = getReplication(b);
@@ -3489,21 +3492,15 @@ public class BlockManager {
return this.neededReplications.getCorruptReplOneBlockSize();
}
- public BlockInfoContiguous addBlockCollection(BlockInfoContiguous block,
+ public BlockInfo addBlockCollection(BlockInfo block,
BlockCollection bc) {
- // TODO
- return (BlockInfoContiguous) blocksMap.addBlockCollection(block, bc);
+ return blocksMap.addBlockCollection(block, bc);
}
public BlockCollection getBlockCollection(Block b) {
return blocksMap.getBlockCollection(b);
}
- /** @return an iterator of the datanodes. */
- public Iterable<DatanodeStorageInfo> getStorages(final Block block) {
- return blocksMap.getStorages(block);
- }
-
public int numCorruptReplicas(Block block) {
return corruptReplicas.numCorruptReplicas(block);
}
@@ -3517,26 +3514,6 @@ public class BlockManager {
public int getCapacity() {
return blocksMap.getCapacity();
}
-
- /**
- * Return a range of corrupt replica block ids. Up to numExpectedBlocks
- * blocks starting at the next block after startingBlockId are returned
- * (fewer if numExpectedBlocks blocks are unavailable). If startingBlockId
- * is null, up to numExpectedBlocks blocks are returned from the beginning.
- * If startingBlockId cannot be found, null is returned.
- *
- * @param numExpectedBlocks Number of block ids to return.
- * 0 <= numExpectedBlocks <= 100
- * @param startingBlockId Block id from which to start. If null, start at
- * beginning.
- * @return Up to numExpectedBlocks blocks from startingBlockId if it exists
- *
- */
- public long[] getCorruptReplicaBlockIds(int numExpectedBlocks,
- Long startingBlockId) {
- return corruptReplicas.getCorruptReplicaBlockIds(numExpectedBlocks,
- startingBlockId);
- }
/**
* Return an iterator over the set of blocks for which there are no replicas.
@@ -3650,7 +3627,7 @@ public class BlockManager {
datanodeManager.clearPendingQueues();
postponedMisreplicatedBlocks.clear();
postponedMisreplicatedBlocksCount.set(0);
- };
+ }
private static class ReplicationWork {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f05e27ee/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java
index 79d7713..a1290a0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java
@@ -369,7 +369,7 @@ public class CacheReplicationMonitor extends Thread implements Closeable {
* @param file The file.
*/
private void rescanFile(CacheDirective directive, INodeFile file) {
- BlockInfoContiguous[] blockInfos = file.getBlocks();
+ BlockInfo[] blockInfos = file.getBlocks();
// Increment the "needed" statistics
directive.addFilesNeeded(1);
@@ -394,7 +394,7 @@ public class CacheReplicationMonitor extends Thread implements Closeable {
}
long cachedTotal = 0;
- for (BlockInfoContiguous blockInfo : blockInfos) {
+ for (BlockInfo blockInfo : blockInfos) {
if (!blockInfo.getBlockUCState().equals(BlockUCState.COMPLETE)) {
// We don't try to cache blocks that are under construction.
LOG.trace("Directive {}: can't cache block {} because it is in state "
@@ -452,8 +452,8 @@ public class CacheReplicationMonitor extends Thread implements Closeable {
file.getFullPathName(), cachedTotal, neededTotal);
}
- private String findReasonForNotCaching(CachedBlock cblock,
- BlockInfoContiguous blockInfo) {
+ private String findReasonForNotCaching(CachedBlock cblock,
+ BlockInfo blockInfo) {
if (blockInfo == null) {
// Somehow, a cache report with the block arrived, but the block
// reports from the DataNode haven't (yet?) described such a block.
@@ -513,7 +513,7 @@ public class CacheReplicationMonitor extends Thread implements Closeable {
iter.remove();
}
}
- BlockInfoContiguous blockInfo = namesystem.getStoredBlock(new Block(cblock.getBlockId()));
+ BlockInfo blockInfo = namesystem.getStoredBlock(new Block(cblock.getBlockId()));
String reason = findReasonForNotCaching(cblock, blockInfo);
int neededCached = 0;
if (reason != null) {
@@ -627,7 +627,7 @@ public class CacheReplicationMonitor extends Thread implements Closeable {
List<DatanodeDescriptor> pendingCached) {
// To figure out which replicas can be cached, we consult the
// blocksMap. We don't want to try to cache a corrupt replica, though.
- BlockInfoContiguous blockInfo = namesystem.getStoredBlock(new Block(cachedBlock.getBlockId()));
+ BlockInfo blockInfo = namesystem.getStoredBlock(new Block(cachedBlock.getBlockId()));
if (blockInfo == null) {
LOG.debug("Block {}: can't add new cached replicas," +
" because there is no record of this block " +
@@ -665,7 +665,7 @@ public class CacheReplicationMonitor extends Thread implements Closeable {
Iterator<CachedBlock> it = datanode.getPendingCached().iterator();
while (it.hasNext()) {
CachedBlock cBlock = it.next();
- BlockInfoContiguous info =
+ BlockInfo info =
namesystem.getStoredBlock(new Block(cBlock.getBlockId()));
if (info != null) {
pendingBytes -= info.getNumBytes();
@@ -675,7 +675,7 @@ public class CacheReplicationMonitor extends Thread implements Closeable {
// Add pending uncached blocks from effective capacity
while (it.hasNext()) {
CachedBlock cBlock = it.next();
- BlockInfoContiguous info =
+ BlockInfo info =
namesystem.getStoredBlock(new Block(cBlock.getBlockId()));
if (info != null) {
pendingBytes += info.getNumBytes();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f05e27ee/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirConcatOp.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirConcatOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirConcatOp.java
index 31a6af7..576c0b0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirConcatOp.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirConcatOp.java
@@ -143,6 +143,7 @@ class FSDirConcatOp {
throw new HadoopIllegalArgumentException("concat: source file " + src
+ " is invalid or empty or underConstruction");
}
+
// source file's preferred block size cannot be greater than the target
// file
if (srcINodeFile.getPreferredBlockSize() >
@@ -152,6 +153,11 @@ class FSDirConcatOp {
+ " which is greater than the target file's preferred block size "
+ targetINode.getPreferredBlockSize());
}
+ // TODO currently we do not support concatenating EC files
+ if (srcINodeFile.isStriped()) {
+ throw new HadoopIllegalArgumentException("concat: the src file " + src
+ + " is with striped blocks");
+ }
si.add(srcINodeFile);
}
@@ -228,7 +234,7 @@ class FSDirConcatOp {
int count = 0;
for (INodeFile nodeToRemove : srcList) {
if(nodeToRemove != null) {
- nodeToRemove.setBlocks(null);
+ nodeToRemove.setContiguousBlocks(null);
nodeToRemove.getParent().removeChild(nodeToRemove);
fsd.getINodeMap().remove(nodeToRemove);
count++;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f05e27ee/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
index 44b2b17..e9808d3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
@@ -54,6 +54,7 @@ import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
import org.apache.hadoop.hdfs.protocol.SnapshotAccessControlException;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos;
import org.apache.hadoop.hdfs.protocolPB.PBHelper;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguousUnderConstruction;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
@@ -1058,7 +1059,7 @@ public class FSDirectory implements Closeable {
unprotectedTruncate(iip, newLength, collectedBlocks, mtime, null);
if(! onBlockBoundary) {
- BlockInfoContiguous oldBlock = file.getLastBlock();
+ BlockInfo oldBlock = file.getLastBlock();
Block tBlk =
getFSNamesystem().prepareFileForTruncate(iip,
clientName, clientMachine, file.computeFileSize() - newLength,
@@ -1067,7 +1068,7 @@ public class FSDirectory implements Closeable {
tBlk.getNumBytes() == truncateBlock.getNumBytes() :
"Should be the same block.";
if(oldBlock.getBlockId() != tBlk.getBlockId() &&
- !file.isBlockInLatestSnapshot(oldBlock)) {
+ !file.isBlockInLatestSnapshot((BlockInfoContiguous) oldBlock)) {
getBlockManager().removeBlockFromMap(oldBlock);
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f05e27ee/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
index df9c585..392a670 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
@@ -45,7 +45,7 @@ import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
-import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
import org.apache.hadoop.hdfs.server.common.Storage.FormatConfirmable;
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
@@ -773,10 +773,10 @@ public class FSEditLog implements LogsPurgeable {
public void logAddBlock(String path, INodeFile file) {
Preconditions.checkArgument(file.isUnderConstruction());
- BlockInfoContiguous[] blocks = file.getBlocks();
+ BlockInfo[] blocks = file.getBlocks();
Preconditions.checkState(blocks != null && blocks.length > 0);
- BlockInfoContiguous pBlock = blocks.length > 1 ? blocks[blocks.length - 2] : null;
- BlockInfoContiguous lastBlock = blocks[blocks.length - 1];
+ BlockInfo pBlock = blocks.length > 1 ? blocks[blocks.length - 2] : null;
+ BlockInfo lastBlock = blocks[blocks.length - 1];
AddBlockOp op = AddBlockOp.getInstance(cache.get()).setPath(path)
.setPenultimateBlock(pBlock).setLastBlock(lastBlock);
logEdit(op);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f05e27ee/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
index ad661ca..e0c823e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.XAttrSetFlag;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
@@ -507,7 +508,7 @@ public class FSEditLogLoader {
}
INodeFile oldFile = INodeFile.valueOf(fsDir.getINode(path), path);
// add the new block to the INodeFile
- addNewBlock(fsDir, addBlockOp, oldFile);
+ addNewBlock(addBlockOp, oldFile);
break;
}
case OP_SET_REPLICATION: {
@@ -936,15 +937,15 @@ public class FSEditLogLoader {
/**
* Add a new block into the given INodeFile
+ * TODO support adding striped block
*/
- private void addNewBlock(FSDirectory fsDir, AddBlockOp op, INodeFile file)
- throws IOException {
- BlockInfoContiguous[] oldBlocks = file.getBlocks();
+ private void addNewBlock(AddBlockOp op, INodeFile file) throws IOException {
+ BlockInfo[] oldBlocks = file.getBlocks();
Block pBlock = op.getPenultimateBlock();
Block newBlock= op.getLastBlock();
if (pBlock != null) { // the penultimate block is not null
- Preconditions.checkState(oldBlocks != null && oldBlocks.length > 0);
+ assert oldBlocks != null && oldBlocks.length > 0;
// compare pBlock with the last block of oldBlocks
Block oldLastBlock = oldBlocks[oldBlocks.length - 1];
if (oldLastBlock.getBlockId() != pBlock.getBlockId()
@@ -974,12 +975,13 @@ public class FSEditLogLoader {
/**
* Update in-memory data structures with new block information.
+ * TODO support adding striped block
* @throws IOException
*/
private void updateBlocks(FSDirectory fsDir, BlockListUpdatingOp op,
INodesInPath iip, INodeFile file) throws IOException {
// Update its block list
- BlockInfoContiguous[] oldBlocks = file.getBlocks();
+ BlockInfo[] oldBlocks = file.getBlocks();
Block[] newBlocks = op.getBlocks();
String path = op.getPath();
@@ -988,7 +990,7 @@ public class FSEditLogLoader {
// First, update blocks in common
for (int i = 0; i < oldBlocks.length && i < newBlocks.length; i++) {
- BlockInfoContiguous oldBlock = oldBlocks[i];
+ BlockInfo oldBlock = oldBlocks[i];
Block newBlock = newBlocks[i];
boolean isLastBlock = i == newBlocks.length - 1;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f05e27ee/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java
index cce991f..d62b804 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java
@@ -51,6 +51,7 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.LayoutFlags;
import org.apache.hadoop.hdfs.protocol.LayoutVersion;
import org.apache.hadoop.hdfs.protocol.LayoutVersion.Feature;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguousUnderConstruction;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
@@ -684,7 +685,7 @@ public class FSImageFormat {
public void updateBlocksMap(INodeFile file) {
// Add file->block mapping
- final BlockInfoContiguous[] blocks = file.getBlocks();
+ final BlockInfo[] blocks = file.getBlocks();
if (blocks != null) {
final BlockManager bm = namesystem.getBlockManager();
for (int i = 0; i < blocks.length; i++) {
@@ -952,9 +953,9 @@ public class FSImageFormat {
FileUnderConstructionFeature uc = cons.getFileUnderConstructionFeature();
oldnode.toUnderConstruction(uc.getClientName(), uc.getClientMachine());
if (oldnode.numBlocks() > 0) {
- BlockInfoContiguous ucBlock = cons.getLastBlock();
+ BlockInfo ucBlock = cons.getLastBlock();
// we do not replace the inode, just replace the last block of oldnode
- BlockInfoContiguous info = namesystem.getBlockManager().addBlockCollection(
+ BlockInfo info = namesystem.getBlockManager().addBlockCollection(
ucBlock, oldnode);
oldnode.setBlock(oldnode.numBlocks() - 1, info);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f05e27ee/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java
index b758458..a025bb0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java
@@ -41,9 +41,13 @@ import org.apache.hadoop.fs.XAttr;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockProto;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StripedBlockProto;
import org.apache.hadoop.hdfs.protocolPB.PBHelper;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguousUnderConstruction;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStripedUnderConstruction;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.namenode.FSImageFormatProtobuf.LoaderContext;
import org.apache.hadoop.hdfs.server.namenode.FSImageFormatProtobuf.SaverContext;
@@ -52,6 +56,7 @@ import org.apache.hadoop.hdfs.server.namenode.FsImageProto.FilesUnderConstructio
import org.apache.hadoop.hdfs.server.namenode.FsImageProto.INodeDirectorySection;
import org.apache.hadoop.hdfs.server.namenode.FsImageProto.INodeSection;
import org.apache.hadoop.hdfs.server.namenode.FsImageProto.INodeSection.AclFeatureProto;
+import org.apache.hadoop.hdfs.server.namenode.FsImageProto.INodeSection.StripedBlocksFeature;
import org.apache.hadoop.hdfs.server.namenode.FsImageProto.INodeSection.XAttrCompactProto;
import org.apache.hadoop.hdfs.server.namenode.FsImageProto.INodeSection.XAttrFeatureProto;
import org.apache.hadoop.hdfs.server.namenode.FsImageProto.INodeSection.QuotaByStorageTypeEntryProto;
@@ -210,7 +215,7 @@ public final class FSImageFormatPBINode {
public static void updateBlocksMap(INodeFile file, BlockManager bm) {
// Add file->block mapping
- final BlockInfoContiguous[] blocks = file.getBlocks();
+ final BlockInfo[] blocks = file.getBlocks();
if (blocks != null) {
for (int i = 0; i < blocks.length; i++) {
file.setBlock(i, bm.addBlockCollection(blocks[i], file));
@@ -345,16 +350,30 @@ public final class FSImageFormatPBINode {
loadXAttrs(f.getXAttrs(), state.getStringTable())));
}
+ FileWithStripedBlocksFeature stripeFeature = null;
+ if (f.hasStripedBlocks()) {
+ StripedBlocksFeature sb = f.getStripedBlocks();
+ stripeFeature = file.addStripedBlocksFeature();
+ for (StripedBlockProto sp : sb.getBlocksList()) {
+ stripeFeature.addBlock(PBHelper.convert(sp));
+ }
+ }
+
// under-construction information
if (f.hasFileUC()) {
INodeSection.FileUnderConstructionFeature uc = f.getFileUC();
file.toUnderConstruction(uc.getClientName(), uc.getClientMachine());
- if (blocks.length > 0) {
- BlockInfoContiguous lastBlk = file.getLastBlock();
- // replace the last block of file
- file.setBlock(file.numBlocks() - 1, new BlockInfoContiguousUnderConstruction(
- lastBlk, replication));
+ BlockInfo lastBlk = file.getLastBlock();
+ // replace the last block of file
+ final BlockInfo ucBlk;
+ if (stripeFeature != null) {
+ BlockInfoStriped striped = (BlockInfoStriped) lastBlk;
+ ucBlk = new BlockInfoStripedUnderConstruction(striped,
+ striped.getDataBlockNum(), striped.getParityBlockNum());
+ } else {
+ ucBlk = new BlockInfoContiguousUnderConstruction(lastBlk, replication);
}
+ file.setBlock(file.numBlocks() - 1, ucBlk);
}
return file;
}
@@ -617,6 +636,19 @@ public final class FSImageFormatPBINode {
}
}
+ FileWithStripedBlocksFeature sb = n.getStripedBlocksFeature();
+ if (sb != null) {
+ StripedBlocksFeature.Builder builder =
+ StripedBlocksFeature.newBuilder();
+ BlockInfoStriped[] sblocks = sb.getBlocks();
+ if (sblocks != null) {
+ for (BlockInfoStriped sblk : sblocks) {
+ builder.addBlocks(PBHelper.convert(sblk));
+ }
+ }
+ b.setStripedBlocks(builder.build());
+ }
+
FileUnderConstructionFeature uc = n.getFileUnderConstructionFeature();
if (uc != null) {
INodeSection.FileUnderConstructionFeature f =
@@ -645,7 +677,7 @@ public final class FSImageFormatPBINode {
r.writeDelimitedTo(out);
}
- private final INodeSection.INode.Builder buildINodeCommon(INode n) {
+ private INodeSection.INode.Builder buildINodeCommon(INode n) {
return INodeSection.INode.newBuilder()
.setId(n.getId())
.setName(ByteString.copyFrom(n.getLocalNameBytes()));
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f05e27ee/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index ebdcfbf..88e946b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@ -203,8 +203,10 @@ import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSecretMan
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSecretManager.SecretManagerState;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockCollection;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockIdManager;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguousUnderConstruction;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStripedUnderConstruction;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
@@ -1961,6 +1963,11 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
final BlockStoragePolicy lpPolicy =
blockManager.getStoragePolicy("LAZY_PERSIST");
+ // not support truncating file with striped blocks
+ if (file.isStriped()) {
+ throw new UnsupportedOperationException(
+ "Cannot truncate file with striped block " + src);
+ }
if (lpPolicy != null &&
lpPolicy.getId() == file.getStoragePolicyID()) {
throw new UnsupportedOperationException(
@@ -2043,8 +2050,12 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
leaseManager.addLease(
file.getFileUnderConstructionFeature().getClientName(), src);
boolean shouldRecoverNow = (newBlock == null);
- BlockInfoContiguous oldBlock = file.getLastBlock();
- boolean shouldCopyOnTruncate = shouldCopyOnTruncate(file, oldBlock);
+
+ BlockInfo oldBlock = file.getLastBlock();
+ assert oldBlock instanceof BlockInfoContiguous;
+
+ boolean shouldCopyOnTruncate = shouldCopyOnTruncate(file,
+ (BlockInfoContiguous) oldBlock);
if(newBlock == null) {
newBlock = (shouldCopyOnTruncate) ? createNewBlock(file.isStriped()) :
new Block(oldBlock.getBlockId(), oldBlock.getNumBytes(),
@@ -2059,7 +2070,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
file.getBlockReplication());
truncatedBlockUC.setNumBytes(oldBlock.getNumBytes() - lastBlockDelta);
truncatedBlockUC.setTruncateBlock(oldBlock);
- file.setLastBlock(truncatedBlockUC, blockManager.getStorages(oldBlock));
+ file.convertLastBlockToUC(truncatedBlockUC,
+ blockManager.getStorages(oldBlock));
getBlockManager().addBlockCollection(truncatedBlockUC, file);
NameNode.stateChangeLog.info("BLOCK* prepareFileForTruncate: "
@@ -2640,6 +2652,13 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
+ src + " for client " + clientMachine);
}
INodeFile myFile = INodeFile.valueOf(inode, src, true);
+
+ // not support appending file with striped blocks
+ if (myFile.isStriped()) {
+ throw new UnsupportedOperationException(
+ "Cannot truncate file with striped block " + src);
+ }
+
final BlockStoragePolicy lpPolicy =
blockManager.getStoragePolicy("LAZY_PERSIST");
if (lpPolicy != null &&
@@ -2651,7 +2670,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
recoverLeaseInternal(RecoverLeaseOp.APPEND_FILE,
iip, src, holder, clientMachine, false);
- final BlockInfoContiguous lastBlock = myFile.getLastBlock();
+ final BlockInfoContiguous lastBlock =
+ (BlockInfoContiguous) myFile.getLastBlock();
// Check that the block has at least minimum replication.
if(lastBlock != null && lastBlock.isComplete() &&
!getBlockManager().isSufficientlyReplicated(lastBlock)) {
@@ -2707,7 +2727,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
}
}
} else {
- BlockInfoContiguous lastBlock = file.getLastBlock();
+ BlockInfo lastBlock = file.getLastBlock();
if (lastBlock != null) {
ExtendedBlock blk = new ExtendedBlock(this.getBlockPoolId(), lastBlock);
ret = new LocatedBlock(blk, new DatanodeInfo[0]);
@@ -2886,7 +2906,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
op.getExceptionMessage(src, holder, clientMachine,
"lease recovery is in progress. Try again later."));
} else {
- final BlockInfoContiguous lastBlock = file.getLastBlock();
+ final BlockInfo lastBlock = file.getLastBlock();
if (lastBlock != null
&& lastBlock.getBlockUCState() == BlockUCState.UNDER_RECOVERY) {
throw new RecoveryInProgressException(
@@ -3073,9 +3093,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
return onRetryBlock[0];
} else {
// add new chosen targets to already allocated block and return
- BlockInfoContiguous lastBlockInFile = pendingFile.getLastBlock();
- ((BlockInfoContiguousUnderConstruction) lastBlockInFile)
- .setExpectedLocations(targets);
+ BlockInfo lastBlockInFile = pendingFile.getLastBlock();
+ BlockInfo.setExpectedLocations(lastBlockInFile, targets);
offset = pendingFile.computeFileSize();
return makeLocatedBlock(lastBlockInFile, targets, offset);
}
@@ -3165,7 +3184,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
}
}
final INodeFile pendingFile = checkLease(src, clientName, inode, fileId);
- BlockInfoContiguous lastBlockInFile = pendingFile.getLastBlock();
+ BlockInfo lastBlockInFile = pendingFile.getLastBlock();
if (!Block.matchingIdAndGenStamp(previousBlock, lastBlockInFile)) {
// The block that the client claims is the current last block
// doesn't match up with what we think is the last block. There are
@@ -3193,7 +3212,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
// changed the namesystem state yet.
// We run this analysis again in Part II where case 4 is impossible.
- BlockInfoContiguous penultimateBlock = pendingFile.getPenultimateBlock();
+ BlockInfo penultimateBlock = pendingFile.getPenultimateBlock();
if (previous == null &&
lastBlockInFile != null &&
lastBlockInFile.getNumBytes() >= pendingFile.getPreferredBlockSize() &&
@@ -3220,8 +3239,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
src + ". Returning previously allocated block " + lastBlockInFile);
long offset = pendingFile.computeFileSize();
onRetryBlock[0] = makeLocatedBlock(lastBlockInFile,
- ((BlockInfoContiguousUnderConstruction)lastBlockInFile)
- .getExpectedStorageLocations(), offset);
+ BlockInfo.getExpectedStorageLocations(lastBlockInFile), offset);
return new FileState(pendingFile, src, iip);
} else {
// Case 3
@@ -3512,6 +3530,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
* @param isStriped is the file under striping or contigunous layout?
* @throws QuotaExceededException If addition of block exceeds space quota
*/
+ // TODO: support striped block
BlockInfoContiguous saveAllocatedBlock(String src, INodesInPath inodesInPath,
Block newBlock, DatanodeStorageInfo[] targets, boolean isStriped)
throws IOException {
@@ -3545,16 +3564,16 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
try {
if (checkall) {
// check all blocks of the file.
- for (BlockInfoContiguous block: v.getBlocks()) {
- if (!isCompleteBlock(src, block, blockManager.minReplication)) {
+ for (BlockInfo block: v.getBlocks()) {
+ if (!isCompleteBlock(src, block)) {
return false;
}
}
} else {
// check the penultimate block of this file
- BlockInfoContiguous b = v.getPenultimateBlock();
+ BlockInfo b = v.getPenultimateBlock();
if (b != null
- && !isCompleteBlock(src, b, blockManager.minReplication)) {
+ && !isCompleteBlock(src, b)) {
return false;
}
}
@@ -3564,16 +3583,19 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
}
}
- private static boolean isCompleteBlock(String src, BlockInfoContiguous b,
- int minRepl) {
+ private boolean isCompleteBlock(String src, BlockInfo b) {
if (!b.isComplete()) {
- final BlockInfoContiguousUnderConstruction uc =
- (BlockInfoContiguousUnderConstruction) b;
final int numNodes = b.numNodes();
- LOG.info("BLOCK* " + b + " is not COMPLETE (ucState = "
- + uc.getBlockUCState() + ", replication# = " + numNodes
- + (numNodes < minRepl? " < ": " >= ")
- + " minimum = " + minRepl + ") in file " + src);
+ final int min;
+ final BlockUCState state = b.getBlockUCState();
+ if (b instanceof BlockInfoStripedUnderConstruction) {
+ min = ((BlockInfoStripedUnderConstruction) b).getDataBlockNum();
+ } else {
+ min = blockManager.minReplication;
+ }
+ LOG.info("BLOCK* " + b + " is not COMPLETE (ucState = " + state
+ + ", replication# = " + numNodes + (numNodes < min ? " < " : " >= ")
+ + " minimum = " + min + ") in file " + src);
return false;
}
return true;
@@ -3758,7 +3780,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
for (Block b : blocks.getToDeleteList()) {
if (trackBlockCounts) {
- BlockInfoContiguous bi = getStoredBlock(b);
+ BlockInfo bi = getStoredBlock(b);
if (bi.isComplete()) {
numRemovedComplete++;
if (bi.numNodes() >= blockManager.minReplication) {
@@ -3982,10 +4004,10 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
final INodeFile pendingFile = iip.getLastINode().asFile();
int nrBlocks = pendingFile.numBlocks();
- BlockInfoContiguous[] blocks = pendingFile.getBlocks();
+ BlockInfo[] blocks = pendingFile.getBlocks();
int nrCompleteBlocks;
- BlockInfoContiguous curBlock = null;
+ BlockInfo curBlock = null;
for(nrCompleteBlocks = 0; nrCompleteBlocks < nrBlocks; nrCompleteBlocks++) {
curBlock = blocks[nrCompleteBlocks];
if(!curBlock.isComplete())
@@ -4020,12 +4042,12 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
// The last block is not COMPLETE, and
// that the penultimate block if exists is either COMPLETE or COMMITTED
- final BlockInfoContiguous lastBlock = pendingFile.getLastBlock();
+ final BlockInfo lastBlock = pendingFile.getLastBlock();
BlockUCState lastBlockState = lastBlock.getBlockUCState();
- BlockInfoContiguous penultimateBlock = pendingFile.getPenultimateBlock();
+ BlockInfo penultimateBlock = pendingFile.getPenultimateBlock();
// If penultimate block doesn't exist then its minReplication is met
- boolean penultimateBlockMinReplication = penultimateBlock == null ? true :
+ boolean penultimateBlockMinReplication = penultimateBlock == null ||
blockManager.checkMinReplication(penultimateBlock);
switch(lastBlockState) {
@@ -4055,6 +4077,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
throw new AlreadyBeingCreatedException(message);
case UNDER_CONSTRUCTION:
case UNDER_RECOVERY:
+ // TODO support Striped block's recovery
final BlockInfoContiguousUnderConstruction uc =
(BlockInfoContiguousUnderConstruction)lastBlock;
// determine if last block was intended to be truncated
@@ -4166,14 +4189,13 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
blockManager.checkReplication(pendingFile);
}
- public BlockInfoContiguous getStoredBlock(Block block) {
- return (BlockInfoContiguous) blockManager.getStoredBlock(block);
+ public BlockInfo getStoredBlock(Block block) {
+ return blockManager.getStoredBlock(block);
}
@Override
- public boolean isInSnapshot(BlockInfoContiguousUnderConstruction blockUC) {
+ public boolean isInSnapshot(BlockCollection bc) {
assert hasReadLock();
- final BlockCollection bc = blockUC.getBlockCollection();
if (bc == null || !(bc instanceof INodeFile)
|| !bc.isUnderConstruction()) {
return false;
@@ -4224,7 +4246,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
checkNameNodeSafeMode(
"Cannot commitBlockSynchronization while in safe mode");
- final BlockInfoContiguous storedBlock = getStoredBlock(
+ final BlockInfo storedBlock = getStoredBlock(
ExtendedBlock.getLocalBlock(oldBlock));
if (storedBlock == null) {
if (deleteblock) {
@@ -4274,11 +4296,9 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
return;
}
- BlockInfoContiguousUnderConstruction truncatedBlock =
- (BlockInfoContiguousUnderConstruction) iFile.getLastBlock();
- long recoveryId = truncatedBlock.getBlockRecoveryId();
- boolean copyTruncate =
- truncatedBlock.getBlockId() != storedBlock.getBlockId();
+ BlockInfo ucBlock = iFile.getLastBlock();
+ long recoveryId = BlockInfo.getBlockRecoveryId(ucBlock);
+ boolean copyTruncate = ucBlock.getBlockId() != storedBlock.getBlockId();
if(recoveryId != newgenerationstamp) {
throw new IOException("The recovery id " + newgenerationstamp
+ " does not match current recovery id "
@@ -4291,8 +4311,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
if (remove) {
blockManager.removeBlock(storedBlock);
}
- }
- else {
+ } else {
// update last block
if(!copyTruncate) {
storedBlock.setGenerationStamp(newgenerationstamp);
@@ -4326,7 +4345,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
trimmedTargets.get(i).getStorageInfo(trimmedStorages.get(i));
if (storageInfo != null) {
if(copyTruncate) {
- storageInfo.addBlock(truncatedBlock, truncatedBlock);
+ storageInfo.addBlock(ucBlock, ucBlock);
} else {
storageInfo.addBlock(storedBlock, storedBlock);
}
@@ -4340,9 +4359,9 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
trimmedTargets.toArray(new DatanodeID[trimmedTargets.size()]),
trimmedStorages.toArray(new String[trimmedStorages.size()]));
if(copyTruncate) {
- iFile.setLastBlock(truncatedBlock, trimmedStorageInfos);
+ iFile.convertLastBlockToUC(ucBlock, trimmedStorageInfos);
} else {
- iFile.setLastBlock(storedBlock, trimmedStorageInfos);
+ iFile.convertLastBlockToUC(storedBlock, trimmedStorageInfos);
if (closeFile) {
blockManager.markBlockReplicasAsCorrupt(storedBlock,
oldGenerationStamp, oldNumBytes, trimmedStorageInfos);
@@ -4352,8 +4371,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
if (closeFile) {
if(copyTruncate) {
- src = closeFileCommitBlocks(iFile, truncatedBlock);
- if(!iFile.isBlockInLatestSnapshot(storedBlock)) {
+ src = closeFileCommitBlocks(iFile, ucBlock);
+ if(!iFile.isBlockInLatestSnapshot((BlockInfoContiguous) storedBlock)) {
blockManager.removeBlock(storedBlock);
}
} else {
@@ -4386,7 +4405,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
* @throws IOException on error
*/
@VisibleForTesting
- String closeFileCommitBlocks(INodeFile pendingFile, BlockInfoContiguous storedBlock)
+ String closeFileCommitBlocks(INodeFile pendingFile, BlockInfo storedBlock)
throws IOException {
final INodesInPath iip = INodesInPath.fromINode(pendingFile);
final String src = iip.getPath();
@@ -4696,7 +4715,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
while (it.hasNext()) {
Block b = it.next();
- BlockInfoContiguous blockInfo = getStoredBlock(b);
+ BlockInfo blockInfo = getStoredBlock(b);
if (blockInfo.getBlockCollection().getStoragePolicyID() == lpPolicy.getId()) {
filesToDelete.add(blockInfo.getBlockCollection());
}
@@ -5647,7 +5666,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
SafeModeInfo safeMode = this.safeMode;
if (safeMode == null) // mostly true
return;
- BlockInfoContiguous storedBlock = getStoredBlock(b);
+ BlockInfo storedBlock = getStoredBlock(b);
if (storedBlock.isComplete()) {
safeMode.decrementSafeBlockCount((short)blockManager.countNodes(b).liveReplicas());
}
@@ -6192,7 +6211,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
+ "access token for block " + block);
// check stored block state
- BlockInfoContiguous storedBlock = getStoredBlock(ExtendedBlock.getLocalBlock(block));
+ BlockInfo storedBlock = getStoredBlock(ExtendedBlock.getLocalBlock(block));
if (storedBlock == null ||
storedBlock.getBlockUCState() != BlockUCState.UNDER_CONSTRUCTION) {
throw new IOException(block +
@@ -6321,8 +6340,11 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
assert hasWriteLock();
// check the vadility of the block and lease holder name
final INodeFile pendingFile = checkUCBlock(oldBlock, clientName);
- final BlockInfoContiguousUnderConstruction blockinfo
- = (BlockInfoContiguousUnderConstruction)pendingFile.getLastBlock();
+ final BlockInfo lastBlock = pendingFile.getLastBlock();
+ // when updating pipeline, the last block must be contiguous block
+ assert lastBlock instanceof BlockInfoContiguousUnderConstruction;
+ BlockInfoContiguousUnderConstruction blockinfo =
+ (BlockInfoContiguousUnderConstruction) lastBlock;
// check new GS & length: this is not expected
if (newBlock.getGenerationStamp() <= blockinfo.getGenerationStamp() ||
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f05e27ee/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileUnderConstructionFeature.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileUnderConstructionFeature.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileUnderConstructionFeature.java
index 1ebdde6..a8e2e00 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileUnderConstructionFeature.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileUnderConstructionFeature.java
@@ -20,8 +20,10 @@ package org.apache.hadoop.hdfs.server.namenode;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguousUnderConstruction;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStripedUnderConstruction;
import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo;
/**
@@ -58,12 +60,12 @@ public class FileUnderConstructionFeature implements INode.Feature {
*/
void updateLengthOfLastBlock(INodeFile f, long lastBlockLength)
throws IOException {
- BlockInfoContiguous lastBlock = f.getLastBlock();
+ BlockInfo lastBlock = f.getLastBlock();
assert (lastBlock != null) : "The last block for path "
+ f.getFullPathName() + " is null when updating its length";
- assert (lastBlock instanceof BlockInfoContiguousUnderConstruction)
+ assert !lastBlock.isComplete()
: "The last block for path " + f.getFullPathName()
- + " is not a BlockInfoUnderConstruction when updating its length";
+ + " is not a BlockInfoUnderConstruction when updating its length";
lastBlock.setNumBytes(lastBlockLength);
}
@@ -74,11 +76,10 @@ public class FileUnderConstructionFeature implements INode.Feature {
*/
void cleanZeroSizeBlock(final INodeFile f,
final BlocksMapUpdateInfo collectedBlocks) {
- final BlockInfoContiguous[] blocks = f.getBlocks();
+ final BlockInfo[] blocks = f.getBlocks();
if (blocks != null && blocks.length > 0
- && blocks[blocks.length - 1] instanceof BlockInfoContiguousUnderConstruction) {
- BlockInfoContiguousUnderConstruction lastUC =
- (BlockInfoContiguousUnderConstruction) blocks[blocks.length - 1];
+ && !blocks[blocks.length - 1].isComplete()) {
+ BlockInfo lastUC = blocks[blocks.length - 1];
if (lastUC.getNumBytes() == 0) {
// this is a 0-sized block. do not need check its UC state here
collectedBlocks.addDeleteBlock(lastUC);