You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sh...@apache.org on 2015/01/25 02:37:29 UTC
[04/10] hadoop git commit: HDFS-7056. Snapshot support for truncate.
Contributed by Konstantin Shvachko and Plamen Jeliazkov.
HDFS-7056. Snapshot support for truncate. Contributed by Konstantin Shvachko and Plamen Jeliazkov.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/00a7ebab
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/00a7ebab
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/00a7ebab
Branch: refs/heads/branch-2
Commit: 00a7ebab223f2d3f566c2a431b8fc39eadfb643b
Parents: 6ff9bde
Author: Konstantin V Shvachko <sh...@apache.org>
Authored: Sat Jan 24 16:06:41 2015 -0800
Committer: Konstantin V Shvachko <sh...@apache.org>
Committed: Sat Jan 24 16:06:41 2015 -0800
----------------------------------------------------------------------
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 2 +
.../hadoop/hdfs/protocol/ClientProtocol.java | 2 +-
...rDatanodeProtocolServerSideTranslatorPB.java | 6 +-
.../InterDatanodeProtocolTranslatorPB.java | 5 +-
.../apache/hadoop/hdfs/protocolPB/PBHelper.java | 13 +-
.../BlockInfoUnderConstruction.java | 20 +-
.../server/blockmanagement/BlockManager.java | 5 +-
.../server/blockmanagement/DatanodeManager.java | 32 +-
.../hdfs/server/common/HdfsServerConstants.java | 7 -
.../hadoop/hdfs/server/datanode/DataNode.java | 37 +-
.../server/datanode/fsdataset/FsDatasetSpi.java | 4 +-
.../datanode/fsdataset/impl/FsDatasetImpl.java | 82 ++-
.../server/namenode/FSDirStatAndListingOp.java | 2 +-
.../hdfs/server/namenode/FSDirectory.java | 36 +-
.../hadoop/hdfs/server/namenode/FSEditLog.java | 6 +-
.../hdfs/server/namenode/FSEditLogLoader.java | 3 +-
.../hdfs/server/namenode/FSEditLogOp.java | 22 +
.../hdfs/server/namenode/FSNamesystem.java | 203 +++++--
.../hadoop/hdfs/server/namenode/INode.java | 15 +-
.../hadoop/hdfs/server/namenode/INodeFile.java | 165 ++++--
.../server/namenode/NameNodeLayoutVersion.java | 5 +-
.../snapshot/AbstractINodeDiffList.java | 15 +-
.../snapshot/FSImageFormatPBSnapshot.java | 23 +
.../hdfs/server/namenode/snapshot/FileDiff.java | 42 +-
.../server/namenode/snapshot/FileDiffList.java | 98 ++++
.../snapshot/FileWithSnapshotFeature.java | 24 +-
.../server/protocol/BlockRecoveryCommand.java | 21 +-
.../server/protocol/InterDatanodeProtocol.java | 3 +-
.../src/main/proto/InterDatanodeProtocol.proto | 2 +
.../hadoop-hdfs/src/main/proto/fsimage.proto | 1 +
.../hadoop-hdfs/src/main/proto/hdfs.proto | 6 +-
.../server/datanode/SimulatedFSDataset.java | 1 +
.../hdfs/server/datanode/TestBlockRecovery.java | 53 +-
.../impl/TestInterDatanodeProtocol.java | 9 +-
.../TestCommitBlockSynchronization.java | 3 +
.../hdfs/server/namenode/TestFileTruncate.java | 525 +++++++++++++++++--
.../hadoop-hdfs/src/test/resources/editsStored | Bin 5791 -> 5586 bytes
.../src/test/resources/editsStored.xml | 238 ++++-----
38 files changed, 1348 insertions(+), 388 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/00a7ebab/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 889a799..3c33e36 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -21,6 +21,8 @@ Release 2.7.0 - UNRELEASED
HDFS-3107. Introduce truncate. (Plamen Jeliazkov via shv)
+ HDFS-7056. Snapshot support for truncate. (Plamen Jeliazkov and shv)
+
IMPROVEMENTS
HDFS-7055. Add tracing to DFSInputStream (cmccabe)
http://git-wip-us.apache.org/repos/asf/hadoop/blob/00a7ebab/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
index 749f387..cfd1c67 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
@@ -537,7 +537,7 @@ public interface ClientProtocol {
* @param src existing file
* @param newLength the target size
*
- * @return true if and client does not need to wait for block recovery,
+ * @return true if client does not need to wait for block recovery,
* false if client needs to wait for block recovery.
*
* @throws AccessControlException If access is denied
http://git-wip-us.apache.org/repos/asf/hadoop/blob/00a7ebab/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/InterDatanodeProtocolServerSideTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/InterDatanodeProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/InterDatanodeProtocolServerSideTranslatorPB.java
index 087c697..ba0a8fc 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/InterDatanodeProtocolServerSideTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/InterDatanodeProtocolServerSideTranslatorPB.java
@@ -76,12 +76,12 @@ public class InterDatanodeProtocolServerSideTranslatorPB implements
final String storageID;
try {
storageID = impl.updateReplicaUnderRecovery(
- PBHelper.convert(request.getBlock()),
- request.getRecoveryId(), request.getNewLength());
+ PBHelper.convert(request.getBlock()), request.getRecoveryId(),
+ request.getNewBlockId(), request.getNewLength());
} catch (IOException e) {
throw new ServiceException(e);
}
return UpdateReplicaUnderRecoveryResponseProto.newBuilder()
.setStorageUuid(storageID).build();
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/00a7ebab/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/InterDatanodeProtocolTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/InterDatanodeProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/InterDatanodeProtocolTranslatorPB.java
index 5174d86..fee62a4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/InterDatanodeProtocolTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/InterDatanodeProtocolTranslatorPB.java
@@ -102,11 +102,12 @@ public class InterDatanodeProtocolTranslatorPB implements
@Override
public String updateReplicaUnderRecovery(ExtendedBlock oldBlock,
- long recoveryId, long newLength) throws IOException {
+ long recoveryId, long newBlockId, long newLength) throws IOException {
UpdateReplicaUnderRecoveryRequestProto req =
UpdateReplicaUnderRecoveryRequestProto.newBuilder()
.setBlock(PBHelper.convert(oldBlock))
- .setNewLength(newLength).setRecoveryId(recoveryId).build();
+ .setNewLength(newLength).setNewBlockId(newBlockId)
+ .setRecoveryId(recoveryId).build();
try {
return rpcProxy.updateReplicaUnderRecovery(NULL_CONTROLLER, req
).getStorageUuid();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/00a7ebab/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
index 2a88803..7c705cf 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
@@ -608,16 +608,19 @@ public class PBHelper {
return null;
}
LocatedBlockProto lb = PBHelper.convert((LocatedBlock)b);
- return RecoveringBlockProto.newBuilder().setBlock(lb)
- .setNewGenStamp(b.getNewGenerationStamp())
- .setTruncateFlag(b.getTruncateFlag()).build();
+ RecoveringBlockProto.Builder builder = RecoveringBlockProto.newBuilder();
+ builder.setBlock(lb).setNewGenStamp(b.getNewGenerationStamp());
+ if(b.getNewBlock() != null)
+ builder.setTruncateBlock(PBHelper.convert(b.getNewBlock()));
+ return builder.build();
}
public static RecoveringBlock convert(RecoveringBlockProto b) {
ExtendedBlock block = convert(b.getBlock().getB());
DatanodeInfo[] locs = convert(b.getBlock().getLocsList());
- return new RecoveringBlock(block, locs, b.getNewGenStamp(),
- b.getTruncateFlag());
+ return (b.hasTruncateBlock()) ?
+ new RecoveringBlock(block, locs, PBHelper.convert(b.getTruncateBlock())) :
+ new RecoveringBlock(block, locs, b.getNewGenStamp());
}
public static DatanodeInfoProto.AdminState convert(
http://git-wip-us.apache.org/repos/asf/hadoop/blob/00a7ebab/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoUnderConstruction.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoUnderConstruction.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoUnderConstruction.java
index 28b179d..8a811ba 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoUnderConstruction.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoUnderConstruction.java
@@ -55,6 +55,11 @@ public class BlockInfoUnderConstruction extends BlockInfo {
private long blockRecoveryId = 0;
/**
+ * The block source to use in the event of copy-on-write truncate.
+ */
+ private Block truncateBlock;
+
+ /**
* ReplicaUnderConstruction contains information about replicas while
* they are under construction.
* The GS, the length and the state of the replica is as reported by
@@ -229,6 +234,15 @@ public class BlockInfoUnderConstruction extends BlockInfo {
return blockRecoveryId;
}
+ /** Get recover block */
+ public Block getTruncateBlock() {
+ return truncateBlock;
+ }
+
+ public void setTruncateBlock(Block recoveryBlock) {
+ this.truncateBlock = recoveryBlock;
+ }
+
/**
* Process the recorded replicas. When about to commit or finish the
* pipeline recovery sort out bad replicas.
@@ -273,11 +287,7 @@ public class BlockInfoUnderConstruction extends BlockInfo {
* make it primary.
*/
public void initializeBlockRecovery(long recoveryId) {
- initializeBlockRecovery(BlockUCState.UNDER_RECOVERY, recoveryId);
- }
-
- public void initializeBlockRecovery(BlockUCState s, long recoveryId) {
- setBlockUCState(s);
+ setBlockUCState(BlockUCState.UNDER_RECOVERY);
blockRecoveryId = recoveryId;
if (replicas.size() == 0) {
NameNode.blockStateChangeLog.warn("BLOCK*"
http://git-wip-us.apache.org/repos/asf/hadoop/blob/00a7ebab/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index 19b3ce7..4ac04ec 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -700,13 +700,14 @@ public class BlockManager {
* The client is supposed to allocate a new block with the next call.
*
* @param bc file
+ * @param bytesToRemove num of bytes to remove from block
* @return the last block locations if the block is partial or null otherwise
*/
public LocatedBlock convertLastBlockToUnderConstruction(
- BlockCollection bc) throws IOException {
+ BlockCollection bc, long bytesToRemove) throws IOException {
BlockInfo oldBlock = bc.getLastBlock();
if(oldBlock == null ||
- bc.getPreferredBlockSize() == oldBlock.getNumBytes())
+ bc.getPreferredBlockSize() == oldBlock.getNumBytes() - bytesToRemove)
return null;
assert oldBlock == getStoredBlock(oldBlock) :
"last block of the file is not in blocksMap";
http://git-wip-us.apache.org/repos/asf/hadoop/blob/00a7ebab/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
index fb9a5d6..dfb9cae 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
@@ -32,7 +32,6 @@ import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.protocol.*;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.BlockTargetPair;
-import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.CachedBlocksList;
import org.apache.hadoop.hdfs.server.namenode.CachedBlock;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
@@ -1432,26 +1431,37 @@ public class DatanodeManager {
recoveryLocations.add(storages[i]);
}
}
+ // If we are performing a truncate recovery than set recovery fields
+ // to old block.
+ boolean truncateRecovery = b.getTruncateBlock() != null;
+ boolean copyOnTruncateRecovery = truncateRecovery &&
+ b.getTruncateBlock().getBlockId() != b.getBlockId();
+ ExtendedBlock primaryBlock = (copyOnTruncateRecovery) ?
+ new ExtendedBlock(blockPoolId, b.getTruncateBlock()) :
+ new ExtendedBlock(blockPoolId, b);
// If we only get 1 replica after eliminating stale nodes, then choose all
// replicas for recovery and let the primary data node handle failures.
+ DatanodeInfo[] recoveryInfos;
if (recoveryLocations.size() > 1) {
if (recoveryLocations.size() != storages.length) {
LOG.info("Skipped stale nodes for recovery : " +
(storages.length - recoveryLocations.size()));
}
- boolean isTruncate = b.getBlockUCState().equals(
- HdfsServerConstants.BlockUCState.BEING_TRUNCATED);
- brCommand.add(new RecoveringBlock(
- new ExtendedBlock(blockPoolId, b),
- DatanodeStorageInfo.toDatanodeInfos(recoveryLocations),
- b.getBlockRecoveryId(), isTruncate));
+ recoveryInfos =
+ DatanodeStorageInfo.toDatanodeInfos(recoveryLocations);
} else {
// If too many replicas are stale, then choose all replicas to participate
// in block recovery.
- brCommand.add(new RecoveringBlock(
- new ExtendedBlock(blockPoolId, b),
- DatanodeStorageInfo.toDatanodeInfos(storages),
- b.getBlockRecoveryId()));
+ recoveryInfos = DatanodeStorageInfo.toDatanodeInfos(storages);
+ }
+ if(truncateRecovery) {
+ Block recoveryBlock = (copyOnTruncateRecovery) ? b :
+ b.getTruncateBlock();
+ brCommand.add(new RecoveringBlock(primaryBlock, recoveryInfos,
+ recoveryBlock));
+ } else {
+ brCommand.add(new RecoveringBlock(primaryBlock, recoveryInfos,
+ b.getBlockRecoveryId()));
}
}
return new DatanodeCommand[] { brCommand };
http://git-wip-us.apache.org/repos/asf/hadoop/blob/00a7ebab/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java
index f2e7ff4..9bba2c9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java
@@ -300,13 +300,6 @@ public final class HdfsServerConstants {
*/
UNDER_RECOVERY,
/**
- * The block is being truncated.<br>
- * When a file is truncated its last block may need to be truncated
- * and needs to go through a recovery procedure,
- * which synchronizes the existing replicas contents.
- */
- BEING_TRUNCATED,
- /**
* The block is committed.<br>
* The client reported that all bytes are written to data-nodes
* with the given generation stamp and block length, but no
http://git-wip-us.apache.org/repos/asf/hadoop/blob/00a7ebab/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
index 46b6e2c..dc6734c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
@@ -2537,14 +2537,16 @@ public class DataNode extends ReconfigurableBase
*/
@Override // InterDatanodeProtocol
public String updateReplicaUnderRecovery(final ExtendedBlock oldBlock,
- final long recoveryId, final long newLength) throws IOException {
+ final long recoveryId, final long newBlockId, final long newLength)
+ throws IOException {
final String storageID = data.updateReplicaUnderRecovery(oldBlock,
- recoveryId, newLength);
+ recoveryId, newBlockId, newLength);
// Notify the namenode of the updated block info. This is important
// for HA, since otherwise the standby node may lose track of the
// block locations until the next block report.
ExtendedBlock newBlock = new ExtendedBlock(oldBlock);
newBlock.setGenerationStamp(recoveryId);
+ newBlock.setBlockId(newBlockId);
newBlock.setNumBytes(newLength);
notifyNamenodeReceivedBlock(newBlock, "", storageID);
return storageID;
@@ -2566,10 +2568,12 @@ public class DataNode extends ReconfigurableBase
this.rInfo = rInfo;
}
- void updateReplicaUnderRecovery(String bpid, long recoveryId, long newLength
- ) throws IOException {
+ void updateReplicaUnderRecovery(String bpid, long recoveryId,
+ long newBlockId, long newLength)
+ throws IOException {
final ExtendedBlock b = new ExtendedBlock(bpid, rInfo);
- storageID = datanode.updateReplicaUnderRecovery(b, recoveryId, newLength);
+ storageID = datanode.updateReplicaUnderRecovery(b, recoveryId, newBlockId,
+ newLength);
}
@Override
@@ -2651,8 +2655,12 @@ public class DataNode extends ReconfigurableBase
final String bpid = block.getBlockPoolId();
DatanodeProtocolClientSideTranslatorPB nn =
getActiveNamenodeForBP(block.getBlockPoolId());
-
+
long recoveryId = rBlock.getNewGenerationStamp();
+ boolean isTruncateRecovery = rBlock.getNewBlock() != null;
+ long blockId = (isTruncateRecovery) ?
+ rBlock.getNewBlock().getBlockId() : block.getBlockId();
+
if (LOG.isDebugEnabled()) {
LOG.debug("block=" + block + ", (length=" + block.getNumBytes()
+ "), syncList=" + syncList);
@@ -2686,7 +2694,7 @@ public class DataNode extends ReconfigurableBase
// Calculate list of nodes that will participate in the recovery
// and the new block size
List<BlockRecord> participatingList = new ArrayList<BlockRecord>();
- final ExtendedBlock newBlock = new ExtendedBlock(bpid, block.getBlockId(),
+ final ExtendedBlock newBlock = new ExtendedBlock(bpid, blockId,
-1, recoveryId);
switch(bestState) {
case FINALIZED:
@@ -2698,10 +2706,7 @@ public class DataNode extends ReconfigurableBase
r.rInfo.getNumBytes() == finalizedLength)
participatingList.add(r);
}
- if(rBlock.getTruncateFlag())
- newBlock.setNumBytes(rBlock.getBlock().getNumBytes());
- else
- newBlock.setNumBytes(finalizedLength);
+ newBlock.setNumBytes(finalizedLength);
break;
case RBW:
case RWR:
@@ -2713,21 +2718,21 @@ public class DataNode extends ReconfigurableBase
participatingList.add(r);
}
}
- if(rBlock.getTruncateFlag())
- newBlock.setNumBytes(rBlock.getBlock().getNumBytes());
- else
- newBlock.setNumBytes(minLength);
+ newBlock.setNumBytes(minLength);
break;
case RUR:
case TEMPORARY:
assert false : "bad replica state: " + bestState;
}
+ if(isTruncateRecovery)
+ newBlock.setNumBytes(rBlock.getNewBlock().getNumBytes());
List<DatanodeID> failedList = new ArrayList<DatanodeID>();
final List<BlockRecord> successList = new ArrayList<BlockRecord>();
for(BlockRecord r : participatingList) {
try {
- r.updateReplicaUnderRecovery(bpid, recoveryId, newBlock.getNumBytes());
+ r.updateReplicaUnderRecovery(bpid, recoveryId, blockId,
+ newBlock.getNumBytes());
successList.add(r);
} catch (IOException e) {
InterDatanodeProtocol.LOG.warn("Failed to updateBlock (newblock="
http://git-wip-us.apache.org/repos/asf/hadoop/blob/00a7ebab/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
index d218146..92a99b5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
@@ -419,7 +419,7 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
* @return the ID of storage that stores the block
*/
public String updateReplicaUnderRecovery(ExtendedBlock oldBlock,
- long recoveryId, long newLength) throws IOException;
+ long recoveryId, long newBlockId, long newLength) throws IOException;
/**
* add new block pool ID
@@ -516,4 +516,4 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
*/
public ReplicaInfo moveBlockAcrossStorage(final ExtendedBlock block,
StorageType targetStorageType) throws IOException;
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/00a7ebab/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
index 70e084f..ff10e1e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
@@ -687,6 +687,12 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
final File destDir = DatanodeUtil.idToBlockDir(destRoot, blockId);
final File dstFile = new File(destDir, srcFile.getName());
final File dstMeta = FsDatasetUtil.getMetaFile(dstFile, genStamp);
+ return copyBlockFiles(srcMeta, srcFile, dstMeta, dstFile, calculateChecksum);
+ }
+
+ static File[] copyBlockFiles(File srcMeta, File srcFile, File dstMeta,
+ File dstFile, boolean calculateChecksum)
+ throws IOException {
if (calculateChecksum) {
computeChecksum(srcMeta, dstMeta, srcFile);
} else {
@@ -2217,6 +2223,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
public synchronized String updateReplicaUnderRecovery(
final ExtendedBlock oldBlock,
final long recoveryId,
+ final long newBlockId,
final long newlength) throws IOException {
//get replica
final String bpid = oldBlock.getBlockPoolId();
@@ -2249,13 +2256,26 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
//update replica
final FinalizedReplica finalized = updateReplicaUnderRecovery(oldBlock
- .getBlockPoolId(), (ReplicaUnderRecovery) replica, recoveryId, newlength);
- assert finalized.getBlockId() == oldBlock.getBlockId()
- && finalized.getGenerationStamp() == recoveryId
- && finalized.getNumBytes() == newlength
- : "Replica information mismatched: oldBlock=" + oldBlock
- + ", recoveryId=" + recoveryId + ", newlength=" + newlength
- + ", finalized=" + finalized;
+ .getBlockPoolId(), (ReplicaUnderRecovery) replica, recoveryId,
+ newBlockId, newlength);
+
+ boolean copyTruncate = newBlockId != oldBlock.getBlockId();
+ if(!copyTruncate) {
+ assert finalized.getBlockId() == oldBlock.getBlockId()
+ && finalized.getGenerationStamp() == recoveryId
+ && finalized.getNumBytes() == newlength
+ : "Replica information mismatched: oldBlock=" + oldBlock
+ + ", recoveryId=" + recoveryId + ", newlength=" + newlength
+ + ", newBlockId=" + newBlockId + ", finalized=" + finalized;
+ } else {
+ assert finalized.getBlockId() == oldBlock.getBlockId()
+ && finalized.getGenerationStamp() == oldBlock.getGenerationStamp()
+ && finalized.getNumBytes() == oldBlock.getNumBytes()
+ : "Finalized and old information mismatched: oldBlock=" + oldBlock
+ + ", genStamp=" + oldBlock.getGenerationStamp()
+ + ", len=" + oldBlock.getNumBytes()
+ + ", finalized=" + finalized;
+ }
//check replica files after update
checkReplicaFiles(finalized);
@@ -2268,6 +2288,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
String bpid,
ReplicaUnderRecovery rur,
long recoveryId,
+ long newBlockId,
long newlength) throws IOException {
//check recovery id
if (rur.getRecoveryID() != recoveryId) {
@@ -2275,26 +2296,63 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
+ ", rur=" + rur);
}
+ boolean copyOnTruncate = newBlockId > 0L && rur.getBlockId() != newBlockId;
+ File blockFile;
+ File metaFile;
// bump rur's GS to be recovery id
- bumpReplicaGS(rur, recoveryId);
+ if(!copyOnTruncate) {
+ bumpReplicaGS(rur, recoveryId);
+ blockFile = rur.getBlockFile();
+ metaFile = rur.getMetaFile();
+ } else {
+ File[] copiedReplicaFiles =
+ copyReplicaWithNewBlockIdAndGS(rur, bpid, newBlockId, recoveryId);
+ blockFile = copiedReplicaFiles[1];
+ metaFile = copiedReplicaFiles[0];
+ }
//update length
- final File replicafile = rur.getBlockFile();
if (rur.getNumBytes() < newlength) {
throw new IOException("rur.getNumBytes() < newlength = " + newlength
+ ", rur=" + rur);
}
if (rur.getNumBytes() > newlength) {
rur.unlinkBlock(1);
- truncateBlock(replicafile, rur.getMetaFile(), rur.getNumBytes(), newlength);
- // update RUR with the new length
- rur.setNumBytes(newlength);
+ truncateBlock(blockFile, metaFile, rur.getNumBytes(), newlength);
+ if(!copyOnTruncate) {
+ // update RUR with the new length
+ rur.setNumBytes(newlength);
+ } else {
+ // Copying block to a new block with new blockId.
+ // Not truncating original block.
+ ReplicaBeingWritten newReplicaInfo = new ReplicaBeingWritten(
+ newBlockId, recoveryId, rur.getVolume(), blockFile.getParentFile(),
+ newlength);
+ newReplicaInfo.setNumBytes(newlength);
+ volumeMap.add(bpid, newReplicaInfo);
+ finalizeReplica(bpid, newReplicaInfo);
+ }
}
// finalize the block
return finalizeReplica(bpid, rur);
}
+ private File[] copyReplicaWithNewBlockIdAndGS(
+ ReplicaUnderRecovery replicaInfo, String bpid, long newBlkId, long newGS)
+ throws IOException {
+ String blockFileName = Block.BLOCK_FILE_PREFIX + newBlkId;
+ FsVolumeReference v = volumes.getNextVolume(
+ replicaInfo.getVolume().getStorageType(), replicaInfo.getNumBytes());
+ final File tmpDir = ((FsVolumeImpl) v.getVolume())
+ .getBlockPoolSlice(bpid).getTmpDir();
+ final File destDir = DatanodeUtil.idToBlockDir(tmpDir, newBlkId);
+ final File dstBlockFile = new File(destDir, blockFileName);
+ final File dstMetaFile = FsDatasetUtil.getMetaFile(dstBlockFile, newGS);
+ return copyBlockFiles(replicaInfo.getMetaFile(), replicaInfo.getBlockFile(),
+ dstMetaFile, dstBlockFile, true);
+ }
+
@Override // FsDatasetSpi
public synchronized long getReplicaVisibleLength(final ExtendedBlock block)
throws IOException {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/00a7ebab/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirStatAndListingOp.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirStatAndListingOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirStatAndListingOp.java
index dc0fe1f..cb3da19 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirStatAndListingOp.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirStatAndListingOp.java
@@ -424,7 +424,7 @@ class FSDirStatAndListingOp {
fileNode.computeFileSizeNotIncludingLastUcBlock() : size;
loc = fsd.getFSNamesystem().getBlockManager().createLocatedBlocks(
- fileNode.getBlocks(), fileSize, isUc, 0L, size, false,
+ fileNode.getBlocks(snapshot), fileSize, isUc, 0L, size, false,
inSnapshot, feInfo);
if (loc == null) {
loc = new LocatedBlocks();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/00a7ebab/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
index d4d1ecb..c171448 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
@@ -957,18 +957,31 @@ public class FSDirectory implements Closeable {
* Unlike FSNamesystem.truncate, this will not schedule block recovery.
*/
void unprotectedTruncate(String src, String clientName, String clientMachine,
- long newLength, long mtime)
+ long newLength, long mtime, Block truncateBlock)
throws UnresolvedLinkException, QuotaExceededException,
SnapshotAccessControlException, IOException {
INodesInPath iip = getINodesInPath(src, true);
+ INodeFile file = iip.getLastINode().asFile();
BlocksMapUpdateInfo collectedBlocks = new BlocksMapUpdateInfo();
boolean onBlockBoundary =
unprotectedTruncate(iip, newLength, collectedBlocks, mtime);
if(! onBlockBoundary) {
- getFSNamesystem().prepareFileForWrite(src,
- iip, clientName, clientMachine, false, false);
+ BlockInfo oldBlock = file.getLastBlock();
+ Block tBlk =
+ getFSNamesystem().prepareFileForTruncate(iip,
+ clientName, clientMachine, file.computeFileSize() - newLength,
+ truncateBlock);
+ assert Block.matchingIdAndGenStamp(tBlk, truncateBlock) &&
+ tBlk.getNumBytes() == truncateBlock.getNumBytes() :
+ "Should be the same block.";
+ if(oldBlock.getBlockId() != tBlk.getBlockId() &&
+ !file.isBlockInLatestSnapshot(oldBlock)) {
+ getBlockManager().removeBlockFromMap(oldBlock);
+ }
}
+ assert onBlockBoundary == (truncateBlock == null) :
+ "truncateBlock is null iff on block boundary: " + truncateBlock;
getFSNamesystem().removeBlocksAndUpdateSafemodeTotal(collectedBlocks);
}
@@ -987,7 +1000,8 @@ public class FSDirectory implements Closeable {
/**
* Truncate has the following properties:
* 1.) Any block deletions occur now.
- * 2.) INode length is truncated now – clients can only read up to new length.
+ * 2.) INode length is truncated now – new clients can only read up to
+ * the truncated length.
* 3.) INode will be set to UC and lastBlock set to UNDER_RECOVERY.
* 4.) NN will trigger DN truncation recovery and waits for DNs to report.
* 5.) File is considered UNDER_RECOVERY until truncation recovery completes.
@@ -1000,20 +1014,16 @@ public class FSDirectory implements Closeable {
long mtime) throws IOException {
assert hasWriteLock();
INodeFile file = iip.getLastINode().asFile();
+ int latestSnapshot = iip.getLatestSnapshotId();
+ file.recordModification(latestSnapshot, true);
long oldDiskspace = file.diskspaceConsumed();
long remainingLength =
file.collectBlocksBeyondMax(newLength, collectedBlocks);
+ file.excludeSnapshotBlocks(latestSnapshot, collectedBlocks);
file.setModificationTime(mtime);
updateCount(iip, 0, file.diskspaceConsumed() - oldDiskspace, true);
- // If on block boundary, then return
- long lastBlockDelta = remainingLength - newLength;
- if(lastBlockDelta == 0)
- return true;
- // Set new last block length
- BlockInfo lastBlock = file.getLastBlock();
- assert lastBlock.getNumBytes() - lastBlockDelta > 0 : "wrong block size";
- lastBlock.setNumBytes(lastBlock.getNumBytes() - lastBlockDelta);
- return false;
+ // return whether on a block boundary
+ return (remainingLength - newLength) == 0;
}
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/00a7ebab/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
index 955a291..00a52ea 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
@@ -39,6 +39,7 @@ import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.permission.PermissionStatus;
import org.apache.hadoop.fs.XAttr;
import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
@@ -902,13 +903,14 @@ public class FSEditLog implements LogsPurgeable {
* Add truncate file record to edit log
*/
void logTruncate(String src, String clientName, String clientMachine,
- long size, long timestamp) {
+ long size, long timestamp, Block truncateBlock) {
TruncateOp op = TruncateOp.getInstance(cache.get())
.setPath(src)
.setClientName(clientName)
.setClientMachine(clientMachine)
.setNewLength(size)
- .setTimestamp(timestamp);
+ .setTimestamp(timestamp)
+ .setTruncateBlock(truncateBlock);
logEdit(op);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/00a7ebab/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
index 49122ff..91fa765 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
@@ -861,7 +861,8 @@ public class FSEditLogLoader {
case OP_TRUNCATE: {
TruncateOp truncateOp = (TruncateOp) op;
fsDir.unprotectedTruncate(truncateOp.src, truncateOp.clientName,
- truncateOp.clientMachine, truncateOp.newLength, truncateOp.timestamp);
+ truncateOp.clientMachine, truncateOp.newLength, truncateOp.timestamp,
+ truncateOp.truncateBlock);
break;
}
case OP_SET_STORAGE_POLICY: {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/00a7ebab/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java
index 6988180..d645a26 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java
@@ -2613,6 +2613,7 @@ public abstract class FSEditLogOp {
String clientMachine;
long newLength;
long timestamp;
+ Block truncateBlock;
private TruncateOp() {
super(OP_TRUNCATE);
@@ -2656,6 +2657,11 @@ public abstract class FSEditLogOp {
return this;
}
+ TruncateOp setTruncateBlock(Block truncateBlock) {
+ this.truncateBlock = truncateBlock;
+ return this;
+ }
+
@Override
void readFields(DataInputStream in, int logVersion) throws IOException {
src = FSImageSerialization.readString(in);
@@ -2663,6 +2669,10 @@ public abstract class FSEditLogOp {
clientMachine = FSImageSerialization.readString(in);
newLength = FSImageSerialization.readLong(in);
timestamp = FSImageSerialization.readLong(in);
+ Block[] blocks =
+ FSImageSerialization.readCompactBlockArray(in, logVersion);
+ assert blocks.length <= 1 : "Truncate op should have 1 or 0 blocks";
+ truncateBlock = (blocks.length == 0) ? null : blocks[0];
}
@Override
@@ -2672,6 +2682,12 @@ public abstract class FSEditLogOp {
FSImageSerialization.writeString(clientMachine, out);
FSImageSerialization.writeLong(newLength, out);
FSImageSerialization.writeLong(timestamp, out);
+ int size = truncateBlock != null ? 1 : 0;
+ Block[] blocks = new Block[size];
+ if (truncateBlock != null) {
+ blocks[0] = truncateBlock;
+ }
+ FSImageSerialization.writeCompactBlockArray(blocks, out);
}
@Override
@@ -2683,6 +2699,8 @@ public abstract class FSEditLogOp {
Long.toString(newLength));
XMLUtils.addSaxString(contentHandler, "TIMESTAMP",
Long.toString(timestamp));
+ if(truncateBlock != null)
+ FSEditLogOp.blockToXml(contentHandler, truncateBlock);
}
@Override
@@ -2692,6 +2710,8 @@ public abstract class FSEditLogOp {
this.clientMachine = st.getValue("CLIENTMACHINE");
this.newLength = Long.parseLong(st.getValue("NEWLENGTH"));
this.timestamp = Long.parseLong(st.getValue("TIMESTAMP"));
+ if (st.hasChildren("BLOCK"))
+ this.truncateBlock = FSEditLogOp.blockFromXml(st);
}
@Override
@@ -2707,6 +2727,8 @@ public abstract class FSEditLogOp {
builder.append(newLength);
builder.append(", timestamp=");
builder.append(timestamp);
+ builder.append(", truncateBlock=");
+ builder.append(truncateBlock);
builder.append(", opCode=");
builder.append(opCode);
builder.append(", txid=");
http://git-wip-us.apache.org/repos/asf/hadoop/blob/00a7ebab/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index 7ddab52..2539c95 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@ -1800,8 +1800,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
: dir.getFileEncryptionInfo(inode, iip.getPathSnapshotId(), iip);
final LocatedBlocks blocks = blockManager.createLocatedBlocks(
- inode.getBlocks(), fileSize, isUc, offset, length, needBlockToken,
- iip.isSnapshot(), feInfo);
+ inode.getBlocks(iip.getPathSnapshotId()), fileSize,
+ isUc, offset, length, needBlockToken, iip.isSnapshot(), feInfo);
// Set caching information for the located blocks.
for (LocatedBlock lb : blocks.getLocatedBlocks()) {
@@ -1939,7 +1939,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
* Truncation at block boundary is atomic, otherwise it requires
* block recovery to truncate the last block of the file.
*
- * @return true if and client does not need to wait for block recovery,
+ * @return true if client does not need to wait for block recovery,
* false if client needs to wait for block recovery.
*/
boolean truncate(String src, long newLength,
@@ -2001,44 +2001,119 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
dir.checkPathAccess(pc, iip, FsAction.WRITE);
}
INodeFile file = iip.getLastINode().asFile();
- // Data will be lost after truncate occurs so it cannot support snapshots.
- if(file.isInLatestSnapshot(iip.getLatestSnapshotId()))
- throw new HadoopIllegalArgumentException(
- "Cannot truncate file with snapshot.");
// Opening an existing file for write. May need lease recovery.
recoverLeaseInternal(iip, src, clientName, clientMachine, false);
- // Refresh INode as the file could have been closed
- iip = dir.getINodesInPath4Write(src, true);
file = INodeFile.valueOf(iip.getLastINode(), src);
// Truncate length check.
long oldLength = file.computeFileSize();
- if(oldLength == newLength)
+ if(oldLength == newLength) {
return true;
- if(oldLength < newLength)
+ }
+ if(oldLength < newLength) {
throw new HadoopIllegalArgumentException(
"Cannot truncate to a larger file size. Current size: " + oldLength +
", truncate size: " + newLength + ".");
+ }
// Perform INodeFile truncation.
BlocksMapUpdateInfo collectedBlocks = new BlocksMapUpdateInfo();
boolean onBlockBoundary = dir.truncate(iip, newLength,
collectedBlocks, mtime);
-
+ Block truncateBlock = null;
if(! onBlockBoundary) {
// Open file for write, but don't log into edits
- prepareFileForWrite(src, iip, clientName, clientMachine, false, false);
- file = INodeFile.valueOf(dir.getINode4Write(src), src);
- initializeBlockRecovery(file);
+ long lastBlockDelta = file.computeFileSize() - newLength;
+ assert lastBlockDelta > 0 : "delta is 0 only if on block bounday";
+ truncateBlock = prepareFileForTruncate(iip, clientName, clientMachine,
+ lastBlockDelta, null);
}
- getEditLog().logTruncate(src, clientName, clientMachine, newLength, mtime);
+ getEditLog().logTruncate(src, clientName, clientMachine, newLength, mtime,
+ truncateBlock);
removeBlocks(collectedBlocks);
return onBlockBoundary;
}
- void initializeBlockRecovery(INodeFile inodeFile) throws IOException {
- BlockInfo lastBlock = inodeFile.getLastBlock();
- long recoveryId = nextGenerationStamp(blockIdManager.isLegacyBlock(lastBlock));
- ((BlockInfoUnderConstruction)lastBlock).initializeBlockRecovery(
- BlockUCState.BEING_TRUNCATED, recoveryId);
+ /**
+ * Convert current INode to UnderConstruction.
+ * Recreate lease.
+ * Create new block for the truncated copy.
+ * Schedule truncation of the replicas.
+ *
+ * @return the returned block will be written to editLog and passed back into
+ * this method upon loading.
+ */
+ Block prepareFileForTruncate(INodesInPath iip,
+ String leaseHolder,
+ String clientMachine,
+ long lastBlockDelta,
+ Block newBlock)
+ throws IOException {
+ INodeFile file = iip.getLastINode().asFile();
+ String src = iip.getPath();
+ file.recordModification(iip.getLatestSnapshotId());
+ file.toUnderConstruction(leaseHolder, clientMachine);
+ assert file.isUnderConstruction() : "inode should be under construction.";
+ leaseManager.addLease(
+ file.getFileUnderConstructionFeature().getClientName(), src);
+ boolean shouldRecoverNow = (newBlock == null);
+ BlockInfo oldBlock = file.getLastBlock();
+ boolean shouldCopyOnTruncate = shouldCopyOnTruncate(file, oldBlock);
+ if(newBlock == null) {
+ newBlock = (shouldCopyOnTruncate) ? createNewBlock() :
+ new Block(oldBlock.getBlockId(), oldBlock.getNumBytes(),
+ nextGenerationStamp(blockIdManager.isLegacyBlock(oldBlock)));
+ }
+
+ BlockInfoUnderConstruction truncatedBlockUC;
+ if(shouldCopyOnTruncate) {
+ // Add new truncateBlock into blocksMap and
+ // use oldBlock as a source for copy-on-truncate recovery
+ truncatedBlockUC = new BlockInfoUnderConstruction(newBlock,
+ file.getBlockReplication());
+ truncatedBlockUC.setNumBytes(oldBlock.getNumBytes() - lastBlockDelta);
+ truncatedBlockUC.setTruncateBlock(oldBlock);
+ file.setLastBlock(truncatedBlockUC, blockManager.getStorages(oldBlock));
+ getBlockManager().addBlockCollection(truncatedBlockUC, file);
+
+ NameNode.stateChangeLog.info("BLOCK* prepareFileForTruncate: "
+ + "Scheduling copy-on-truncate to new size "
+ + truncatedBlockUC.getNumBytes() + " new block " + newBlock
+ + " old block " + truncatedBlockUC.getTruncateBlock());
+ } else {
+ // Use new generation stamp for in-place truncate recovery
+ blockManager.convertLastBlockToUnderConstruction(file, lastBlockDelta);
+ oldBlock = file.getLastBlock();
+ assert !oldBlock.isComplete() : "oldBlock should be under construction";
+ truncatedBlockUC = (BlockInfoUnderConstruction) oldBlock;
+ truncatedBlockUC.setTruncateBlock(new Block(oldBlock));
+ truncatedBlockUC.getTruncateBlock().setNumBytes(
+ oldBlock.getNumBytes() - lastBlockDelta);
+ truncatedBlockUC.getTruncateBlock().setGenerationStamp(
+ newBlock.getGenerationStamp());
+
+ NameNode.stateChangeLog.debug("BLOCK* prepareFileForTruncate: "
+ + "Scheduling in-place block truncate to new size "
+ + truncatedBlockUC.getTruncateBlock().getNumBytes()
+ + " block=" + truncatedBlockUC);
+ }
+ if(shouldRecoverNow)
+ truncatedBlockUC.initializeBlockRecovery(newBlock.getGenerationStamp());
+
+ // update the quota: use the preferred block size for UC block
+ final long diff =
+ file.getPreferredBlockSize() - truncatedBlockUC.getNumBytes();
+ dir.updateSpaceConsumed(iip, 0, diff * file.getBlockReplication());
+ return newBlock;
+ }
+
+ /**
+ * Defines if a replica needs to be copied on truncate or
+ * can be truncated in place.
+ */
+ boolean shouldCopyOnTruncate(INodeFile file, BlockInfo blk) {
+ if(!isUpgradeFinalized()) {
+ return true;
+ }
+ return file.isBlockInLatestSnapshot(blk);
}
/**
@@ -2565,7 +2640,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
leaseManager.addLease(
file.getFileUnderConstructionFeature().getClientName(), src);
- LocatedBlock ret = blockManager.convertLastBlockToUnderConstruction(file);
+ LocatedBlock ret =
+ blockManager.convertLastBlockToUnderConstruction(file, 0);
if (ret != null) {
// update the quota: use the preferred block size for UC block
final long diff = file.getPreferredBlockSize() - ret.getBlockSize();
@@ -2628,7 +2704,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
return false;
}
- private void recoverLeaseInternal(INodesInPath iip,
+ void recoverLeaseInternal(INodesInPath iip,
String src, String holder, String clientMachine, boolean force)
throws IOException {
assert hasWriteLock();
@@ -2690,8 +2766,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
} else {
final BlockInfo lastBlock = file.getLastBlock();
if (lastBlock != null
- && (lastBlock.getBlockUCState() == BlockUCState.UNDER_RECOVERY ||
- lastBlock.getBlockUCState() == BlockUCState.BEING_TRUNCATED)) {
+ && lastBlock.getBlockUCState() == BlockUCState.UNDER_RECOVERY) {
throw new RecoveryInProgressException("Recovery in progress, file ["
+ src + "], " + "lease owner [" + lease.getHolder() + "]");
} else {
@@ -3845,8 +3920,18 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
throw new AlreadyBeingCreatedException(message);
case UNDER_CONSTRUCTION:
case UNDER_RECOVERY:
- case BEING_TRUNCATED:
final BlockInfoUnderConstruction uc = (BlockInfoUnderConstruction)lastBlock;
+ // determine if last block was intended to be truncated
+ Block recoveryBlock = uc.getTruncateBlock();
+ boolean truncateRecovery = recoveryBlock != null;
+ boolean copyOnTruncate = truncateRecovery &&
+ recoveryBlock.getBlockId() != uc.getBlockId();
+ assert !copyOnTruncate ||
+ recoveryBlock.getBlockId() < uc.getBlockId() &&
+ recoveryBlock.getGenerationStamp() < uc.getGenerationStamp() &&
+ recoveryBlock.getNumBytes() > uc.getNumBytes() :
+ "wrong recoveryBlock";
+
// setup the last block locations from the blockManager if not known
if (uc.getNumExpectedLocations() == 0) {
uc.setExpectedLocations(blockManager.getStorages(lastBlock));
@@ -3867,9 +3952,12 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
// start recovery of the last block for this file
long blockRecoveryId = nextGenerationStamp(blockIdManager.isLegacyBlock(uc));
lease = reassignLease(lease, src, recoveryLeaseHolder, pendingFile);
- if (uc.getBlockUCState() != BlockUCState.BEING_TRUNCATED) {
- uc.initializeBlockRecovery(blockRecoveryId);
+ if(copyOnTruncate) {
+ uc.setGenerationStamp(blockRecoveryId);
+ } else if(truncateRecovery) {
+ recoveryBlock.setGenerationStamp(blockRecoveryId);
}
+ uc.initializeBlockRecovery(blockRecoveryId);
leaseManager.renewLease(lease);
// Cannot close file right now, since the last block requires recovery.
// This may potentially cause infinite loop in lease recovery
@@ -3979,11 +4067,11 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
return true;
}
- void commitBlockSynchronization(ExtendedBlock lastblock,
+ void commitBlockSynchronization(ExtendedBlock oldBlock,
long newgenerationstamp, long newlength,
boolean closeFile, boolean deleteblock, DatanodeID[] newtargets,
String[] newtargetstorages) throws IOException {
- LOG.info("commitBlockSynchronization(lastblock=" + lastblock
+ LOG.info("commitBlockSynchronization(oldBlock=" + oldBlock
+ ", newgenerationstamp=" + newgenerationstamp
+ ", newlength=" + newlength
+ ", newtargets=" + Arrays.asList(newtargets)
@@ -4002,17 +4090,17 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
checkNameNodeSafeMode(
"Cannot commitBlockSynchronization while in safe mode");
final BlockInfo storedBlock = getStoredBlock(
- ExtendedBlock.getLocalBlock(lastblock));
+ ExtendedBlock.getLocalBlock(oldBlock));
if (storedBlock == null) {
if (deleteblock) {
// This may be a retry attempt so ignore the failure
// to locate the block.
if (LOG.isDebugEnabled()) {
- LOG.debug("Block (=" + lastblock + ") not found");
+ LOG.debug("Block (=" + oldBlock + ") not found");
}
return;
} else {
- throw new IOException("Block (=" + lastblock + ") not found");
+ throw new IOException("Block (=" + oldBlock + ") not found");
}
}
//
@@ -4039,34 +4127,40 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
+ iFile.getFullPathName() + ", likely due to delayed block"
+ " removal");
}
- if (!iFile.isUnderConstruction() || storedBlock.isComplete()) {
+ if ((!iFile.isUnderConstruction() || storedBlock.isComplete()) &&
+ iFile.getLastBlock().isComplete()) {
if (LOG.isDebugEnabled()) {
- LOG.debug("Unexpected block (=" + lastblock
+ LOG.debug("Unexpected block (=" + oldBlock
+ ") since the file (=" + iFile.getLocalName()
+ ") is not under construction");
}
return;
}
- long recoveryId =
- ((BlockInfoUnderConstruction)storedBlock).getBlockRecoveryId();
+ BlockInfoUnderConstruction truncatedBlock =
+ (BlockInfoUnderConstruction) iFile.getLastBlock();
+ long recoveryId = truncatedBlock.getBlockRecoveryId();
+ boolean copyTruncate =
+ truncatedBlock.getBlockId() != storedBlock.getBlockId();
if(recoveryId != newgenerationstamp) {
throw new IOException("The recovery id " + newgenerationstamp
+ " does not match current recovery id "
- + recoveryId + " for block " + lastblock);
+ + recoveryId + " for block " + oldBlock);
}
if (deleteblock) {
- Block blockToDel = ExtendedBlock.getLocalBlock(lastblock);
+ Block blockToDel = ExtendedBlock.getLocalBlock(oldBlock);
boolean remove = iFile.removeLastBlock(blockToDel);
if (remove) {
- blockManager.removeBlockFromMap(storedBlock);
+ blockManager.removeBlock(storedBlock);
}
}
else {
// update last block
- storedBlock.setGenerationStamp(newgenerationstamp);
- storedBlock.setNumBytes(newlength);
+ if(!copyTruncate) {
+ storedBlock.setGenerationStamp(newgenerationstamp);
+ storedBlock.setNumBytes(newlength);
+ }
// find the DatanodeDescriptor objects
// There should be no locations in the blockManager till now because the
@@ -4096,7 +4190,11 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
DatanodeStorageInfo storageInfo =
trimmedTargets.get(i).getStorageInfo(trimmedStorages.get(i));
if (storageInfo != null) {
- storageInfo.addBlock(storedBlock);
+ if(copyTruncate) {
+ storageInfo.addBlock(truncatedBlock);
+ } else {
+ storageInfo.addBlock(storedBlock);
+ }
}
}
}
@@ -4106,11 +4204,22 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
blockManager.getDatanodeManager().getDatanodeStorageInfos(
trimmedTargets.toArray(new DatanodeID[trimmedTargets.size()]),
trimmedStorages.toArray(new String[trimmedStorages.size()]));
- iFile.setLastBlock(storedBlock, trimmedStorageInfos);
+ if(copyTruncate) {
+ iFile.setLastBlock(truncatedBlock, trimmedStorageInfos);
+ } else {
+ iFile.setLastBlock(storedBlock, trimmedStorageInfos);
+ }
}
if (closeFile) {
- src = closeFileCommitBlocks(iFile, storedBlock);
+ if(copyTruncate) {
+ src = closeFileCommitBlocks(iFile, truncatedBlock);
+ if(!iFile.isBlockInLatestSnapshot(storedBlock)) {
+ blockManager.removeBlock(storedBlock);
+ }
+ } else {
+ src = closeFileCommitBlocks(iFile, storedBlock);
+ }
} else {
// If this commit does not want to close the file, persist blocks
src = iFile.getFullPathName();
@@ -4121,13 +4230,13 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
}
getEditLog().logSync();
if (closeFile) {
- LOG.info("commitBlockSynchronization(newblock=" + lastblock
+ LOG.info("commitBlockSynchronization(oldBlock=" + oldBlock
+ ", file=" + src
+ ", newgenerationstamp=" + newgenerationstamp
+ ", newlength=" + newlength
+ ", newtargets=" + Arrays.asList(newtargets) + ") successful");
} else {
- LOG.info("commitBlockSynchronization(" + lastblock + ") successful");
+ LOG.info("commitBlockSynchronization(" + oldBlock + ") successful");
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/00a7ebab/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java
index 41b2391..58ef536 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java
@@ -228,7 +228,8 @@ public abstract class INode implements INodeAttributes, Diff.Element<byte[]> {
/** Is this inode in the latest snapshot? */
public final boolean isInLatestSnapshot(final int latestSnapshotId) {
- if (latestSnapshotId == Snapshot.CURRENT_STATE_ID) {
+ if (latestSnapshotId == Snapshot.CURRENT_STATE_ID ||
+ latestSnapshotId == Snapshot.NO_SNAPSHOT_ID) {
return false;
}
// if parent is a reference node, parent must be a renamed node. We can
@@ -817,11 +818,15 @@ public abstract class INode implements INodeAttributes, Diff.Element<byte[]> {
* @param toDelete the to-be-deleted block
*/
public void addDeleteBlock(Block toDelete) {
- if (toDelete != null) {
- toDeleteList.add(toDelete);
- }
+ assert toDelete != null : "toDelete is null";
+ toDeleteList.add(toDelete);
}
-
+
+ public void removeDeleteBlock(Block block) {
+ assert block != null : "block is null";
+ toDeleteList.remove(block);
+ }
+
/**
* Clear {@link BlocksMapUpdateInfo#toDeleteList}
*/
http://git-wip-us.apache.org/repos/asf/hadoop/blob/00a7ebab/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
index 92a6fec..0bbfd72 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
@@ -24,7 +24,9 @@ import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.Arrays;
+import java.util.HashSet;
import java.util.List;
+import java.util.Set;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.permission.PermissionStatus;
@@ -304,6 +306,11 @@ public class INodeFile extends INodeWithAdditionalFields
@Override
public void recordModification(final int latestSnapshotId)
throws QuotaExceededException {
+ recordModification(latestSnapshotId, false);
+ }
+
+ public void recordModification(final int latestSnapshotId, boolean withBlocks)
+ throws QuotaExceededException {
if (isInLatestSnapshot(latestSnapshotId)
&& !shouldRecordInSrcSnapshot(latestSnapshotId)) {
// the file is in snapshot, create a snapshot feature if it does not have
@@ -312,10 +319,10 @@ public class INodeFile extends INodeWithAdditionalFields
sf = addSnapshotFeature(null);
}
// record self in the diff list if necessary
- sf.getDiffs().saveSelf2Snapshot(latestSnapshotId, this, null);
+ sf.getDiffs().saveSelf2Snapshot(latestSnapshotId, this, null, withBlocks);
}
}
-
+
public FileDiffList getDiffs() {
FileWithSnapshotFeature sf = this.getFileWithSnapshotFeature();
if (sf != null) {
@@ -415,6 +422,20 @@ public class INodeFile extends INodeWithAdditionalFields
return this.blocks;
}
+ /** @return blocks of the file corresponding to the snapshot. */
+ public BlockInfo[] getBlocks(int snapshot) {
+ if(snapshot == CURRENT_STATE_ID || getDiffs() == null)
+ return getBlocks();
+ FileDiff diff = getDiffs().getDiffById(snapshot);
+ BlockInfo[] snapshotBlocks = diff == null ? getBlocks() : diff.getBlocks();
+ if(snapshotBlocks != null)
+ return snapshotBlocks;
+ // Blocks are not in the current snapshot
+ // Find next snapshot with blocks present or return current file blocks
+ snapshotBlocks = getDiffs().findLaterSnapshotBlocks(diff.getSnapshotId());
+ return (snapshotBlocks == null) ? getBlocks() : snapshotBlocks;
+ }
+
void updateBlockCollection() {
if (blocks != null) {
for(BlockInfo b : blocks) {
@@ -509,13 +530,13 @@ public class INodeFile extends INodeWithAdditionalFields
}
clear();
removedINodes.add(this);
-
FileWithSnapshotFeature sf = getFileWithSnapshotFeature();
if (sf != null) {
+ sf.getDiffs().destroyAndCollectSnapshotBlocks(collectedBlocks);
sf.clearDiffs();
}
}
-
+
@Override
public String getName() {
// Get the full path name of this inode.
@@ -554,39 +575,23 @@ public class INodeFile extends INodeWithAdditionalFields
@Override
public final ContentSummaryComputationContext computeContentSummary(
final ContentSummaryComputationContext summary) {
- computeContentSummary4Snapshot(summary.getCounts());
- computeContentSummary4Current(summary.getCounts());
- return summary;
- }
-
- private void computeContentSummary4Snapshot(final Content.Counts counts) {
- // file length and diskspace only counted for the latest state of the file
- // i.e. either the current state or the last snapshot
+ final Content.Counts counts = summary.getCounts();
FileWithSnapshotFeature sf = getFileWithSnapshotFeature();
- if (sf != null) {
+ if (sf == null) {
+ counts.add(Content.LENGTH, computeFileSize());
+ counts.add(Content.FILE, 1);
+ } else {
final FileDiffList diffs = sf.getDiffs();
final int n = diffs.asList().size();
counts.add(Content.FILE, n);
if (n > 0 && sf.isCurrentFileDeleted()) {
counts.add(Content.LENGTH, diffs.getLast().getFileSize());
- }
-
- if (sf.isCurrentFileDeleted()) {
- final long lastFileSize = diffs.getLast().getFileSize();
- counts.add(Content.DISKSPACE, lastFileSize * getBlockReplication());
+ } else {
+ counts.add(Content.LENGTH, computeFileSize());
}
}
- }
-
- private void computeContentSummary4Current(final Content.Counts counts) {
- FileWithSnapshotFeature sf = this.getFileWithSnapshotFeature();
- if (sf != null && sf.isCurrentFileDeleted()) {
- return;
- }
-
- counts.add(Content.LENGTH, computeFileSize());
- counts.add(Content.FILE, 1);
counts.add(Content.DISKSPACE, diskspaceConsumed());
+ return summary;
}
/** The same as computeFileSize(null). */
@@ -651,9 +656,36 @@ public class INodeFile extends INodeWithAdditionalFields
return size;
}
+ /**
+ * Compute size consumed by all blocks of the current file,
+ * including blocks in its snapshots.
+ * Use preferred block size for the last block if it is under construction.
+ */
public final long diskspaceConsumed() {
- // use preferred block size for the last block if it is under construction
- return computeFileSize(true, true) * getBlockReplication();
+ FileWithSnapshotFeature sf = getFileWithSnapshotFeature();
+ if(sf == null) {
+ return computeFileSize(true, true) * getBlockReplication();
+ }
+
+ // Collect all distinct blocks
+ long size = 0;
+ Set<Block> allBlocks = new HashSet<Block>(Arrays.asList(getBlocks()));
+ List<FileDiff> diffs = sf.getDiffs().asList();
+ for(FileDiff diff : diffs) {
+ BlockInfo[] diffBlocks = diff.getBlocks();
+ if (diffBlocks != null) {
+ allBlocks.addAll(Arrays.asList(diffBlocks));
+ }
+ }
+ for(Block block : allBlocks) {
+ size += block.getNumBytes();
+ }
+ // check if the last block is under construction
+ BlockInfo lastBlock = getLastBlock();
+ if(lastBlock != null && lastBlock instanceof BlockInfoUnderConstruction) {
+ size += getPreferredBlockSize() - lastBlock.getNumBytes();
+ }
+ return size * getBlockReplication();
}
public final long diskspaceConsumed(int lastSnapshotId) {
@@ -706,7 +738,7 @@ public class INodeFile extends INodeWithAdditionalFields
final BlockInfo[] oldBlocks = getBlocks();
if (oldBlocks == null)
return 0;
- //find the minimum n such that the size of the first n blocks > max
+ // find the minimum n such that the size of the first n blocks > max
int n = 0;
long size = 0;
for(; n < oldBlocks.length && max > size; n++) {
@@ -716,23 +748,78 @@ public class INodeFile extends INodeWithAdditionalFields
return size;
// starting from block n, the data is beyond max.
- // resize the array.
+ // resize the array.
+ truncateBlocksTo(n);
+
+ // collect the blocks beyond max
+ if (collectedBlocks != null) {
+ for(; n < oldBlocks.length; n++) {
+ collectedBlocks.addDeleteBlock(oldBlocks[n]);
+ }
+ }
+ return size;
+ }
+
+ void truncateBlocksTo(int n) {
final BlockInfo[] newBlocks;
if (n == 0) {
newBlocks = BlockInfo.EMPTY_ARRAY;
} else {
newBlocks = new BlockInfo[n];
- System.arraycopy(oldBlocks, 0, newBlocks, 0, n);
+ System.arraycopy(getBlocks(), 0, newBlocks, 0, n);
}
// set new blocks
setBlocks(newBlocks);
+ }
- // collect the blocks beyond max
- if (collectedBlocks != null) {
- for(; n < oldBlocks.length; n++) {
- collectedBlocks.addDeleteBlock(oldBlocks[n]);
- }
+ public void collectBlocksBeyondSnapshot(BlockInfo[] snapshotBlocks,
+ BlocksMapUpdateInfo collectedBlocks) {
+ BlockInfo[] oldBlocks = getBlocks();
+ if(snapshotBlocks == null || oldBlocks == null)
+ return;
+ // Skip blocks in common between the file and the snapshot
+ int n = 0;
+ while(n < oldBlocks.length && n < snapshotBlocks.length &&
+ oldBlocks[n] == snapshotBlocks[n]) {
+ n++;
}
- return size;
+ truncateBlocksTo(n);
+ // Collect the remaining blocks of the file
+ while(n < oldBlocks.length) {
+ collectedBlocks.addDeleteBlock(oldBlocks[n++]);
+ }
+ }
+
+ /** Exclude blocks collected for deletion that belong to a snapshot. */
+ void excludeSnapshotBlocks(int snapshotId,
+ BlocksMapUpdateInfo collectedBlocks) {
+ if(collectedBlocks == null || collectedBlocks.getToDeleteList().isEmpty())
+ return;
+ FileWithSnapshotFeature sf = getFileWithSnapshotFeature();
+ if(sf == null)
+ return;
+ BlockInfo[] snapshotBlocks =
+ getDiffs().findEarlierSnapshotBlocks(snapshotId);
+ if(snapshotBlocks == null)
+ return;
+ List<Block> toDelete = collectedBlocks.getToDeleteList();
+ for(Block blk : snapshotBlocks) {
+ if(toDelete.contains(blk))
+ collectedBlocks.removeDeleteBlock(blk);
+ }
+ }
+
+ /**
+ * @return true if the block is contained in a snapshot or false otherwise.
+ */
+ boolean isBlockInLatestSnapshot(BlockInfo block) {
+ FileWithSnapshotFeature sf = this.getFileWithSnapshotFeature();
+ if (sf == null || sf.getDiffs() == null)
+ return false;
+ BlockInfo[] snapshotBlocks =
+ getDiffs().findEarlierSnapshotBlocks(getDiffs().getLastSnapshotId());
+ if(snapshotBlocks == null)
+ return false;
+ return Arrays.asList(snapshotBlocks).contains(block);
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/00a7ebab/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeLayoutVersion.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeLayoutVersion.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeLayoutVersion.java
index 512913b..d742c6d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeLayoutVersion.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeLayoutVersion.java
@@ -69,8 +69,9 @@ public class NameNodeLayoutVersion {
CREATE_OVERWRITE(-58, "Use single editlog record for " +
"creating file with overwrite"),
XATTRS_NAMESPACE_EXT(-59, "Increase number of xattr namespaces"),
- BLOCK_STORAGE_POLICY(-60, "Block Storage policy");
-
+ BLOCK_STORAGE_POLICY(-60, "Block Storage policy"),
+ TRUNCATE(-61, "Truncate");
+
private final FeatureInfo info;
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/00a7ebab/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/AbstractINodeDiffList.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/AbstractINodeDiffList.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/AbstractINodeDiffList.java
index d918495..b330215 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/AbstractINodeDiffList.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/AbstractINodeDiffList.java
@@ -163,9 +163,12 @@ abstract class AbstractINodeDiffList<N extends INode,
* id, otherwise <=.
* @return The id of the latest snapshot before the given snapshot.
*/
- private final int getPrior(int anchorId, boolean exclusive) {
+ public final int getPrior(int anchorId, boolean exclusive) {
if (anchorId == Snapshot.CURRENT_STATE_ID) {
- return getLastSnapshotId();
+ int last = getLastSnapshotId();
+ if(exclusive && last == anchorId)
+ return Snapshot.NO_SNAPSHOT_ID;
+ return last;
}
final int i = Collections.binarySearch(diffs, anchorId);
if (exclusive) { // must be the one before
@@ -290,10 +293,11 @@ abstract class AbstractINodeDiffList<N extends INode,
}
/** Save the snapshot copy to the latest snapshot. */
- public void saveSelf2Snapshot(int latestSnapshotId, N currentINode,
+ public D saveSelf2Snapshot(int latestSnapshotId, N currentINode,
A snapshotCopy) throws QuotaExceededException {
+ D diff = null;
if (latestSnapshotId != Snapshot.CURRENT_STATE_ID) {
- D diff = checkAndAddLatestSnapshotDiff(latestSnapshotId, currentINode);
+ diff = checkAndAddLatestSnapshotDiff(latestSnapshotId, currentINode);
if (diff.snapshotINode == null) {
if (snapshotCopy == null) {
snapshotCopy = createSnapshotCopy(currentINode);
@@ -301,6 +305,7 @@ abstract class AbstractINodeDiffList<N extends INode,
diff.saveSnapshotCopy(snapshotCopy);
}
}
+ return diff;
}
@Override
@@ -312,4 +317,4 @@ abstract class AbstractINodeDiffList<N extends INode,
public String toString() {
return getClass().getSimpleName() + ": " + diffs;
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/00a7ebab/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FSImageFormatPBSnapshot.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FSImageFormatPBSnapshot.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FSImageFormatPBSnapshot.java
index 9de9c6d..7ff16b8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FSImageFormatPBSnapshot.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FSImageFormatPBSnapshot.java
@@ -36,6 +36,10 @@ import java.util.Map;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.permission.PermissionStatus;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockProto;
+import org.apache.hadoop.hdfs.protocolPB.PBHelper;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.namenode.AclEntryStatusFormat;
import org.apache.hadoop.hdfs.server.namenode.AclFeature;
import org.apache.hadoop.hdfs.server.namenode.FSDirectory;
@@ -230,6 +234,20 @@ public class FSImageFormatPBSnapshot {
FileDiff diff = new FileDiff(pbf.getSnapshotId(), copy, null,
pbf.getFileSize());
+ List<BlockProto> bpl = pbf.getBlocksList();
+ BlockInfo[] blocks = new BlockInfo[bpl.size()];
+ for(int j = 0, e = bpl.size(); j < e; ++j) {
+ Block blk = PBHelper.convert(bpl.get(j));
+ BlockInfo storedBlock = fsn.getBlockManager().getStoredBlock(blk);
+ if(storedBlock == null) {
+ storedBlock = fsn.getBlockManager().addBlockCollection(
+ new BlockInfo(blk, copy.getFileReplication()), file);
+ }
+ blocks[j] = storedBlock;
+ }
+ if(blocks.length > 0) {
+ diff.setBlocks(blocks);
+ }
diffs.addFirst(diff);
}
file.addSnapshotFeature(diffs);
@@ -473,6 +491,11 @@ public class FSImageFormatPBSnapshot {
SnapshotDiffSection.FileDiff.Builder fb = SnapshotDiffSection.FileDiff
.newBuilder().setSnapshotId(diff.getSnapshotId())
.setFileSize(diff.getFileSize());
+ if(diff.getBlocks() != null) {
+ for(Block block : diff.getBlocks()) {
+ fb.addBlocks(PBHelper.convert(block));
+ }
+ }
INodeFileAttributes copy = diff.snapshotINode;
if (copy != null) {
fb.setName(ByteString.copyFrom(copy.getLocalNameBytes()))
http://git-wip-us.apache.org/repos/asf/hadoop/blob/00a7ebab/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FileDiff.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FileDiff.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FileDiff.java
index 919ab56..7b52dc9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FileDiff.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FileDiff.java
@@ -19,8 +19,10 @@ package org.apache.hadoop.hdfs.server.namenode.snapshot;
import java.io.DataOutput;
import java.io.IOException;
+import java.util.Arrays;
import java.util.List;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.namenode.FSImageSerialization;
import org.apache.hadoop.hdfs.server.namenode.INode;
import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo;
@@ -37,10 +39,13 @@ public class FileDiff extends
/** The file size at snapshot creation time. */
private final long fileSize;
+ /** A copy of the INodeFile block list. Used in truncate. */
+ private BlockInfo[] blocks;
FileDiff(int snapshotId, INodeFile file) {
super(snapshotId, null, null);
fileSize = file.computeFileSize();
+ blocks = null;
}
/** Constructor used by FSImage loading */
@@ -48,20 +53,40 @@ public class FileDiff extends
FileDiff posteriorDiff, long fileSize) {
super(snapshotId, snapshotINode, posteriorDiff);
this.fileSize = fileSize;
+ blocks = null;
}
/** @return the file size in the snapshot. */
public long getFileSize() {
return fileSize;
}
-
+
+ /**
+ * Copy block references into the snapshot
+ * up to the current {@link #fileSize}.
+ * Should be done only once.
+ */
+ public void setBlocks(BlockInfo[] blocks) {
+ if(this.blocks != null)
+ return;
+ int numBlocks = 0;
+ for(long s = 0; numBlocks < blocks.length && s < fileSize; numBlocks++)
+ s += blocks[numBlocks].getNumBytes();
+ this.blocks = Arrays.copyOf(blocks, numBlocks);
+ }
+
+ public BlockInfo[] getBlocks() {
+ return blocks;
+ }
+
@Override
Quota.Counts combinePosteriorAndCollectBlocks(INodeFile currentINode,
FileDiff posterior, BlocksMapUpdateInfo collectedBlocks,
final List<INode> removedINodes) {
- return currentINode.getFileWithSnapshotFeature()
- .updateQuotaAndCollectBlocks(currentINode, posterior, collectedBlocks,
- removedINodes);
+ FileWithSnapshotFeature sf = currentINode.getFileWithSnapshotFeature();
+ assert sf != null : "FileWithSnapshotFeature is null";
+ return sf.updateQuotaAndCollectBlocks(
+ currentINode, posterior, collectedBlocks, removedINodes);
}
@Override
@@ -91,4 +116,13 @@ public class FileDiff extends
.updateQuotaAndCollectBlocks(currentINode, this, collectedBlocks,
removedINodes);
}
+
+ public void destroyAndCollectSnapshotBlocks(
+ BlocksMapUpdateInfo collectedBlocks) {
+ if(blocks == null || collectedBlocks == null)
+ return;
+ for(BlockInfo blk : blocks)
+ collectedBlocks.addDeleteBlock(blk);
+ blocks = null;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/00a7ebab/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FileDiffList.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FileDiffList.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FileDiffList.java
index b0a973d..07652f4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FileDiffList.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FileDiffList.java
@@ -17,6 +17,13 @@
*/
package org.apache.hadoop.hdfs.server.namenode.snapshot;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
+import org.apache.hadoop.hdfs.server.namenode.INode;
+import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo;
import org.apache.hadoop.hdfs.server.namenode.INodeFile;
import org.apache.hadoop.hdfs.server.namenode.INodeFileAttributes;
@@ -33,4 +40,95 @@ public class FileDiffList extends
INodeFileAttributes createSnapshotCopy(INodeFile currentINode) {
return new INodeFileAttributes.SnapshotCopy(currentINode);
}
+
+ public void destroyAndCollectSnapshotBlocks(
+ BlocksMapUpdateInfo collectedBlocks) {
+ for(FileDiff d : asList())
+ d.destroyAndCollectSnapshotBlocks(collectedBlocks);
+ }
+
+ public void saveSelf2Snapshot(int latestSnapshotId, INodeFile iNodeFile,
+ INodeFileAttributes snapshotCopy, boolean withBlocks)
+ throws QuotaExceededException {
+ final FileDiff diff =
+ super.saveSelf2Snapshot(latestSnapshotId, iNodeFile, snapshotCopy);
+ if(withBlocks) // Store blocks if this is the first update
+ diff.setBlocks(iNodeFile.getBlocks());
+ }
+
+ public BlockInfo[] findEarlierSnapshotBlocks(int snapshotId) {
+ assert snapshotId != Snapshot.NO_SNAPSHOT_ID : "Wrong snapshot id";
+ if(snapshotId == Snapshot.CURRENT_STATE_ID) {
+ return null;
+ }
+ List<FileDiff> diffs = this.asList();
+ int i = Collections.binarySearch(diffs, snapshotId);
+ BlockInfo[] blocks = null;
+ for(i = i >= 0 ? i : -i; i < diffs.size(); i--) {
+ blocks = diffs.get(i).getBlocks();
+ if(blocks != null) {
+ break;
+ }
+ }
+ return blocks;
+ }
+
+ public BlockInfo[] findLaterSnapshotBlocks(int snapshotId) {
+ assert snapshotId != Snapshot.NO_SNAPSHOT_ID : "Wrong snapshot id";
+ if(snapshotId == Snapshot.CURRENT_STATE_ID) {
+ return null;
+ }
+ List<FileDiff> diffs = this.asList();
+ int i = Collections.binarySearch(diffs, snapshotId);
+ BlockInfo[] blocks = null;
+ for(i = i >= 0 ? i+1 : -i-1; i < diffs.size(); i++) {
+ blocks = diffs.get(i).getBlocks();
+ if(blocks != null) {
+ break;
+ }
+ }
+ return blocks;
+ }
+
+ /**
+ * Copy blocks from the removed snapshot into the previous snapshot
+ * up to the file length of the latter.
+ * Collect unused blocks of the removed snapshot.
+ */
+ void combineAndCollectSnapshotBlocks(INodeFile file,
+ FileDiff removed,
+ BlocksMapUpdateInfo collectedBlocks,
+ List<INode> removedINodes) {
+ BlockInfo[] removedBlocks = removed.getBlocks();
+ if(removedBlocks == null) {
+ FileWithSnapshotFeature sf = file.getFileWithSnapshotFeature();
+ assert sf != null : "FileWithSnapshotFeature is null";
+ if(sf.isCurrentFileDeleted())
+ sf.collectBlocksAndClear(file, collectedBlocks, removedINodes);
+ return;
+ }
+ int p = getPrior(removed.getSnapshotId(), true);
+ FileDiff earlierDiff = p == Snapshot.NO_SNAPSHOT_ID ? null : getDiffById(p);
+ // Copy blocks to the previous snapshot if not set already
+ if(earlierDiff != null)
+ earlierDiff.setBlocks(removedBlocks);
+ BlockInfo[] earlierBlocks =
+ (earlierDiff == null ? new BlockInfo[]{} : earlierDiff.getBlocks());
+ // Find later snapshot (or file itself) with blocks
+ BlockInfo[] laterBlocks = findLaterSnapshotBlocks(removed.getSnapshotId());
+ laterBlocks = (laterBlocks==null) ? file.getBlocks() : laterBlocks;
+ // Skip blocks, which belong to either the earlier or the later lists
+ int i = 0;
+ for(; i < removedBlocks.length; i++) {
+ if(i < earlierBlocks.length && removedBlocks[i] == earlierBlocks[i])
+ continue;
+ if(i < laterBlocks.length && removedBlocks[i] == laterBlocks[i])
+ continue;
+ break;
+ }
+ // Collect the remaining blocks of the file
+ while(i < removedBlocks.length) {
+ collectedBlocks.addDeleteBlock(removedBlocks[i++]);
+ }
+ }
}