You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by ar...@apache.org on 2013/11/15 00:57:00 UTC
svn commit: r1542125 [1/2] - in
/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project:
hadoop-hdfs-httpfs/src/main/libexec/ hadoop-hdfs/ hadoop-hdfs/src/main/java/
hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/
hadoop-hdfs/src/main/java/org/apache/h...
Author: arp
Date: Thu Nov 14 23:56:56 2013
New Revision: 1542125
URL: http://svn.apache.org/r1542125
Log:
Merging r1541618 through r1542122 from trunk to branch HDFS-2832
Added:
hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ContentSummaryComputationContext.java
- copied unchanged from r1542122, hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ContentSummaryComputationContext.java
hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/TokenAspect.java
- copied unchanged from r1542122, hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/TokenAspect.java
hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestTokenAspect.java
- copied unchanged from r1542122, hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestTokenAspect.java
Modified:
hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/ (props changed)
hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/libexec/httpfs-config.sh
hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/ (props changed)
hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java
hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoUnderConstruction.java
hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java
hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CorruptReplicasMap.java
hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java
hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlock.java
hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java
hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java
hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectoryWithQuota.java
hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeMap.java
hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeReference.java
hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeSymlink.java
hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeDirectorySnapshottable.java
hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeDirectoryWithSnapshot.java
hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DelegationTokenFetcher.java
hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/HftpFileSystem.java
hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/URLConnectionFactory.java
hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java
hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenRenewer
hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/hdfs/ (props changed)
hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/hdfs/dfshealth.html
hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/hdfs/dfshealth.jsp
hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/hdfs/index.html
hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java
hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestQuota.java
hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java
hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCorruptFilesJsp.java
hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestPathBasedCacheRequests.java
hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestSnapshotDeletion.java
hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestHftpDelegationToken.java
hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/tools/TestDelegationTokenRemoteFetcher.java
Propchange: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/
------------------------------------------------------------------------------
Merged /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs:r1541618-1542122
Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/libexec/httpfs-config.sh
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/libexec/httpfs-config.sh?rev=1542125&r1=1542124&r2=1542125&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/libexec/httpfs-config.sh (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/libexec/httpfs-config.sh Thu Nov 14 23:56:56 2013
@@ -55,7 +55,7 @@ print "Setting HTTPFS_HOME: ${H
#
if [ -e "${HTTPFS_HOME}/bin/httpfs-env.sh" ]; then
print "Sourcing: ${HTTPFS_HOME}/bin/httpfs-env.sh"
- source ${HTTPFS_HOME}/bin/HTTPFS-env.sh
+ source ${HTTPFS_HOME}/bin/httpfs-env.sh
grep "^ *export " ${HTTPFS_HOME}/bin/httpfs-env.sh | sed 's/ *export/ setting/'
fi
Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1542125&r1=1542124&r2=1542125&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Thu Nov 14 23:56:56 2013
@@ -13,6 +13,9 @@ Trunk (Unreleased)
HDFS-3125. Add JournalService to enable Journal Daemon. (suresh)
+ HDFS-5444. Choose default web UI based on browser capabilities. (Haohui Mai
+ via jing9)
+
IMPROVEMENTS
HDFS-4665. Move TestNetworkTopologyWithNodeGroup to common.
@@ -194,6 +197,8 @@ Trunk (Unreleased)
HDFS-5485. Add command-line support for modifyDirective. (cmccabe)
+ HDFS-5366. recaching improvements (cmccabe)
+
OPTIMIZATIONS
HDFS-5349. DNA_CACHE and DNA_UNCACHE should be by blockId only. (cmccabe)
@@ -475,6 +480,20 @@ Release 2.3.0 - UNRELEASED
HDFS-5495. Remove further JUnit3 usages from HDFS.
(Jarek Jarcec Cecho via wang)
+ HDFS-5325. Remove WebHdfsFileSystem#ConnRunner. (Haohui Mai via jing9)
+
+ HDFS-5488. Clean up TestHftpURLTimeout. (Haohui Mai via jing9)
+
+ HDFS-5440. Extract the logic of handling delegation tokens in HftpFileSystem
+ to the TokenAspect class. (Haohui Mai via jing9)
+
+ HDFS-5487. Introduce unit test for TokenAspect. (Haohui Mai via jing9)
+
+ HDFS-4995. Make getContentSummary less expensive. (kihwal)
+
+ HDFS-5506. Use URLConnectionFactory in DelegationTokenFetcher. (Haohui Mai
+ via jing9)
+
OPTIMIZATIONS
HDFS-5239. Allow FSNamesystem lock fairness to be configurable (daryn)
@@ -529,13 +548,20 @@ Release 2.3.0 - UNRELEASED
HDFS-5476. Snapshot: clean the blocks/files/directories under a renamed
file/directory while deletion. (jing9)
- HDFS-5325. Remove WebHdfsFileSystem#ConnRunner. (Haohui Mai via jing9)
-
- HDFS-5488. Clean up TestHftpURLTimeout. (Haohui Mai via jing9)
-
HDFS-5425. Renaming underconstruction file with snapshots can make NN failure on
restart. (jing9 and Vinay)
+ HDFS-5474. Deletesnapshot can make Namenode in safemode on NN restarts.
+ (Sathish via jing9)
+
+ HDFS-5075. httpfs-config.sh calls out incorrect env script name
+ (Timothy St. Clair via stevel)
+
+ HDFS-5504. In HA mode, OP_DELETE_SNAPSHOT is not decrementing the safemode threshold,
+ leads to NN safemode. (Vinay via jing9)
+
+ HDFS-5438. Flaws in block report processing can cause data loss. (kihwal)
+
Release 2.2.1 - UNRELEASED
INCOMPATIBLE CHANGES
Propchange: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/
------------------------------------------------------------------------------
Merged /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java:r1541618-1542122
Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java?rev=1542125&r1=1542124&r2=1542125&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java Thu Nov 14 23:56:56 2013
@@ -42,4 +42,8 @@ public class DFSClientFaultInjector {
public boolean uncorruptPacket() {
return false;
}
+
+ public boolean failPacket() {
+ return false;
+ }
}
Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java?rev=1542125&r1=1542124&r2=1542125&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java Thu Nov 14 23:56:56 2013
@@ -198,6 +198,8 @@ public class DFSConfigKeys extends Commo
public static final String DFS_LIST_LIMIT = "dfs.ls.limit";
public static final int DFS_LIST_LIMIT_DEFAULT = 1000;
+ public static final String DFS_CONTENT_SUMMARY_LIMIT_KEY = "dfs.content-summary.limit";
+ public static final int DFS_CONTENT_SUMMARY_LIMIT_DEFAULT = 0;
public static final String DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY = "dfs.datanode.failed.volumes.tolerated";
public static final int DFS_DATANODE_FAILED_VOLUMES_TOLERATED_DEFAULT = 0;
public static final String DFS_DATANODE_SYNCONCLOSE_KEY = "dfs.datanode.synconclose";
@@ -342,6 +344,8 @@ public class DFSConfigKeys extends Commo
public static final boolean DFS_DATANODE_TRANSFERTO_ALLOWED_DEFAULT = true;
public static final String DFS_HEARTBEAT_INTERVAL_KEY = "dfs.heartbeat.interval";
public static final long DFS_HEARTBEAT_INTERVAL_DEFAULT = 3;
+ public static final String DFS_NAMENODE_PATH_BASED_CACHE_RETRY_INTERVAL_MS = "dfs.namenode.path.based.cache.retry.interval.ms";
+ public static final long DFS_NAMENODE_PATH_BASED_CACHE_RETRY_INTERVAL_MS_DEFAULT = 60000L;
public static final String DFS_NAMENODE_DECOMMISSION_INTERVAL_KEY = "dfs.namenode.decommission.interval";
public static final int DFS_NAMENODE_DECOMMISSION_INTERVAL_DEFAULT = 30;
public static final String DFS_NAMENODE_DECOMMISSION_NODES_PER_INTERVAL_KEY = "dfs.namenode.decommission.nodes.per.interval";
Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java?rev=1542125&r1=1542124&r2=1542125&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java Thu Nov 14 23:56:56 2013
@@ -151,6 +151,7 @@ public class DFSOutputStream extends FSO
private final short blockReplication; // replication factor of file
private boolean shouldSyncBlock = false; // force blocks to disk upon close
private CachingStrategy cachingStrategy;
+ private boolean failPacket = false;
private static class Packet {
private static final long HEART_BEAT_SEQNO = -1L;
@@ -752,6 +753,16 @@ public class DFSOutputStream extends FSO
one.seqno + " but received " + seqno);
}
isLastPacketInBlock = one.lastPacketInBlock;
+
+ // Fail the packet write for testing in order to force a
+ // pipeline recovery.
+ if (DFSClientFaultInjector.get().failPacket() &&
+ isLastPacketInBlock) {
+ failPacket = true;
+ throw new IOException(
+ "Failing the last packet for testing.");
+ }
+
// update bytesAcked
block.setNumBytes(one.getLastByteOffsetBlock());
@@ -1044,7 +1055,18 @@ public class DFSOutputStream extends FSO
accessToken = lb.getBlockToken();
// set up the pipeline again with the remaining nodes
- success = createBlockOutputStream(nodes, newGS, isRecovery);
+ if (failPacket) { // for testing
+ success = createBlockOutputStream(nodes, newGS-1, isRecovery);
+ failPacket = false;
+ try {
+ // Give DNs time to send in bad reports. In real situations,
+ // good reports should follow bad ones, if client committed
+ // with those nodes.
+ Thread.sleep(2000);
+ } catch (InterruptedException ie) {}
+ } else {
+ success = createBlockOutputStream(nodes, newGS, isRecovery);
+ }
}
if (success) {
@@ -1904,7 +1926,9 @@ public class DFSOutputStream extends FSO
// be called during unit tests
private void completeFile(ExtendedBlock last) throws IOException {
long localstart = Time.now();
+ long localTimeout = 400;
boolean fileComplete = false;
+ int retries = dfsClient.getConf().nBlockWriteLocateFollowingRetry;
while (!fileComplete) {
fileComplete =
dfsClient.namenode.complete(src, dfsClient.clientName, last, fileId);
@@ -1920,7 +1944,13 @@ public class DFSOutputStream extends FSO
throw new IOException(msg);
}
try {
- Thread.sleep(400);
+ Thread.sleep(localTimeout);
+ if (retries == 0) {
+ throw new IOException("Unable to close file because the last block"
+ + " does not have enough number of replicas.");
+ }
+ retries--;
+ localTimeout *= 2;
if (Time.now() - localstart > 5000) {
DFSClient.LOG.info("Could not complete " + src + " retrying...");
}
Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoUnderConstruction.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoUnderConstruction.java?rev=1542125&r1=1542124&r2=1542125&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoUnderConstruction.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoUnderConstruction.java Thu Nov 14 23:56:56 2013
@@ -230,6 +230,29 @@ public class BlockInfoUnderConstruction
}
/**
+ * Process the recorded replicas. When about to commit or finish the
+ * pipeline recovery sort out bad replicas.
+ * @param genStamp The final generation stamp for the block.
+ */
+ public void setGenerationStampAndVerifyReplicas(long genStamp) {
+ if (replicas == null)
+ return;
+
+ // Remove the replicas with wrong gen stamp.
+ // The replica list is unchanged.
+ for (ReplicaUnderConstruction r : replicas) {
+ if (genStamp != r.getGenerationStamp()) {
+ r.getExpectedStorageLocation().removeBlock(this);
+ NameNode.blockStateChangeLog.info("BLOCK* Removing stale replica "
+ + "from location: " + r);
+ }
+ }
+
+ // Set the generation stamp for the block.
+ setGenerationStamp(genStamp);
+ }
+
+ /**
* Commit block's length and generation stamp as reported by the client.
* Set block state to {@link BlockUCState#COMMITTED}.
* @param block - contains client reported block length and generation
@@ -301,6 +324,8 @@ public class BlockInfoUnderConstruction
while (it.hasNext()) {
ReplicaUnderConstruction r = it.next();
if(r.getExpectedStorageLocation() == storage) {
+ // Record the gen stamp from the report
+ r.setGenerationStamp(block.getGenerationStamp());
return;
} else if (r.getExpectedStorageLocation().getDatanodeDescriptor() ==
storage.getDatanodeDescriptor()) {
Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java?rev=1542125&r1=1542124&r2=1542125&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java Thu Nov 14 23:56:56 2013
@@ -59,6 +59,7 @@ import org.apache.hadoop.hdfs.security.t
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager.AccessMode;
import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
+import org.apache.hadoop.hdfs.server.blockmanagement.CorruptReplicasMap.Reason;
import org.apache.hadoop.hdfs.server.blockmanagement.PendingDataNodeMessages.ReportedBlockInfo;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
@@ -1051,7 +1052,8 @@ public class BlockManager {
+ blk + " not found");
return;
}
- markBlockAsCorrupt(new BlockToMarkCorrupt(storedBlock, reason), dn, storageID);
+ markBlockAsCorrupt(new BlockToMarkCorrupt(storedBlock, reason,
+ Reason.CORRUPTION_REPORTED), dn, storageID);
}
private void markBlockAsCorrupt(BlockToMarkCorrupt b,
@@ -1074,7 +1076,8 @@ public class BlockManager {
node.addBlock(storageID, b.stored);
// Add this replica to corruptReplicas Map
- corruptReplicas.addToCorruptReplicasMap(b.corrupted, node, b.reason);
+ corruptReplicas.addToCorruptReplicasMap(b.corrupted, node, b.reason,
+ b.reasonCode);
if (countNodes(b.stored).liveReplicas() >= bc.getBlockReplication()) {
// the block is over-replicated so invalidate the replicas immediately
invalidateBlock(b, node);
@@ -1574,22 +1577,27 @@ public class BlockManager {
final BlockInfo stored;
/** The reason to mark corrupt. */
final String reason;
-
- BlockToMarkCorrupt(BlockInfo corrupted, BlockInfo stored, String reason) {
+ /** The reason code to be stored */
+ final Reason reasonCode;
+
+ BlockToMarkCorrupt(BlockInfo corrupted, BlockInfo stored, String reason,
+ Reason reasonCode) {
Preconditions.checkNotNull(corrupted, "corrupted is null");
Preconditions.checkNotNull(stored, "stored is null");
this.corrupted = corrupted;
this.stored = stored;
this.reason = reason;
+ this.reasonCode = reasonCode;
}
- BlockToMarkCorrupt(BlockInfo stored, String reason) {
- this(stored, stored, reason);
+ BlockToMarkCorrupt(BlockInfo stored, String reason, Reason reasonCode) {
+ this(stored, stored, reason, reasonCode);
}
- BlockToMarkCorrupt(BlockInfo stored, long gs, String reason) {
- this(new BlockInfo(stored), stored, reason);
+ BlockToMarkCorrupt(BlockInfo stored, long gs, String reason,
+ Reason reasonCode) {
+ this(new BlockInfo(stored), stored, reason, reasonCode);
//the corrupted block in datanode has a different generation stamp
corrupted.setGenerationStamp(gs);
}
@@ -1946,9 +1954,11 @@ assert storedBlock.findDatanode(dn) < 0
return storedBlock;
}
- //add replica if appropriate
+ // Add replica if appropriate. If the replica was previously corrupt
+ // but now okay, it might need to be updated.
if (reportedState == ReplicaState.FINALIZED
- && storedBlock.findDatanode(dn) < 0) {
+ && (storedBlock.findDatanode(dn) < 0
+ || corruptReplicas.isReplicaCorrupt(storedBlock, dn))) {
toAdd.add(storedBlock);
}
return storedBlock;
@@ -2039,12 +2049,13 @@ assert storedBlock.findDatanode(dn) < 0
return new BlockToMarkCorrupt(storedBlock, reportedGS,
"block is " + ucState + " and reported genstamp " + reportedGS
+ " does not match genstamp in block map "
- + storedBlock.getGenerationStamp());
+ + storedBlock.getGenerationStamp(), Reason.GENSTAMP_MISMATCH);
} else if (storedBlock.getNumBytes() != reported.getNumBytes()) {
return new BlockToMarkCorrupt(storedBlock,
"block is " + ucState + " and reported length " +
reported.getNumBytes() + " does not match " +
- "length in block map " + storedBlock.getNumBytes());
+ "length in block map " + storedBlock.getNumBytes(),
+ Reason.SIZE_MISMATCH);
} else {
return null; // not corrupt
}
@@ -2060,7 +2071,7 @@ assert storedBlock.findDatanode(dn) < 0
return new BlockToMarkCorrupt(storedBlock, reportedGS,
"reported " + reportedState + " replica with genstamp " + reportedGS
+ " does not match COMPLETE block's genstamp in block map "
- + storedBlock.getGenerationStamp());
+ + storedBlock.getGenerationStamp(), Reason.GENSTAMP_MISMATCH);
} else { // COMPLETE block, same genstamp
if (reportedState == ReplicaState.RBW) {
// If it's a RBW report for a COMPLETE block, it may just be that
@@ -2073,7 +2084,8 @@ assert storedBlock.findDatanode(dn) < 0
return null;
} else {
return new BlockToMarkCorrupt(storedBlock,
- "reported replica has invalid state " + reportedState);
+ "reported replica has invalid state " + reportedState,
+ Reason.INVALID_STATE);
}
}
case RUR: // should not be reported
@@ -2084,7 +2096,7 @@ assert storedBlock.findDatanode(dn) < 0
" on " + dn + " size " + storedBlock.getNumBytes();
// log here at WARN level since this is really a broken HDFS invariant
LOG.warn(msg);
- return new BlockToMarkCorrupt(storedBlock, msg);
+ return new BlockToMarkCorrupt(storedBlock, msg, Reason.INVALID_STATE);
}
}
@@ -2201,6 +2213,11 @@ assert storedBlock.findDatanode(dn) < 0
logAddStoredBlock(storedBlock, node);
}
} else {
+ // if the same block is added again and the replica was corrupt
+ // previously because of a wrong gen stamp, remove it from the
+ // corrupt block list.
+ corruptReplicas.removeFromCorruptReplicasMap(block, node,
+ Reason.GENSTAMP_MISMATCH);
curReplicaDelta = 0;
blockLog.warn("BLOCK* addStoredBlock: "
+ "Redundant addStoredBlock request received for " + storedBlock
@@ -2297,7 +2314,8 @@ assert storedBlock.findDatanode(dn) < 0
DatanodeDescriptor[] nodesCopy = nodes.toArray(new DatanodeDescriptor[0]);
for (DatanodeDescriptor node : nodesCopy) {
try {
- if (!invalidateBlock(new BlockToMarkCorrupt(blk, null), node)) {
+ if (!invalidateBlock(new BlockToMarkCorrupt(blk, null,
+ Reason.ANY), node)) {
removedFromBlocksMap = false;
}
} catch (IOException e) {
Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java?rev=1542125&r1=1542124&r2=1542125&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java Thu Nov 14 23:56:56 2013
@@ -204,6 +204,7 @@ public class CacheReplicationMonitor ext
namesystem.writeLock();
try {
rescanCachedBlockMap();
+ blockManager.getDatanodeManager().resetLastCachingDirectiveSentTime();
} finally {
namesystem.writeUnlock();
}
@@ -316,17 +317,21 @@ public class CacheReplicationMonitor ext
int numCached = cached.size();
if (numCached >= neededCached) {
// If we have enough replicas, drop all pending cached.
- for (DatanodeDescriptor datanode : pendingCached) {
+ for (Iterator<DatanodeDescriptor> iter = pendingCached.iterator();
+ iter.hasNext(); ) {
+ DatanodeDescriptor datanode = iter.next();
datanode.getPendingCached().remove(cblock);
+ iter.remove();
}
- pendingCached.clear();
}
if (numCached < neededCached) {
// If we don't have enough replicas, drop all pending uncached.
- for (DatanodeDescriptor datanode : pendingUncached) {
+ for (Iterator<DatanodeDescriptor> iter = pendingUncached.iterator();
+ iter.hasNext(); ) {
+ DatanodeDescriptor datanode = iter.next();
datanode.getPendingUncached().remove(cblock);
+ iter.remove();
}
- pendingUncached.clear();
}
int neededUncached = numCached -
(pendingUncached.size() + neededCached);
Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CorruptReplicasMap.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CorruptReplicasMap.java?rev=1542125&r1=1542124&r2=1542125&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CorruptReplicasMap.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CorruptReplicasMap.java Thu Nov 14 23:56:56 2013
@@ -36,8 +36,18 @@ import java.util.*;
@InterfaceAudience.Private
public class CorruptReplicasMap{
- private SortedMap<Block, Collection<DatanodeDescriptor>> corruptReplicasMap =
- new TreeMap<Block, Collection<DatanodeDescriptor>>();
+ /** The corruption reason code */
+ public static enum Reason {
+ NONE, // not specified.
+ ANY, // wildcard reason
+ GENSTAMP_MISMATCH, // mismatch in generation stamps
+ SIZE_MISMATCH, // mismatch in sizes
+ INVALID_STATE, // invalid state
+ CORRUPTION_REPORTED // client or datanode reported the corruption
+ }
+
+ private SortedMap<Block, Map<DatanodeDescriptor, Reason>> corruptReplicasMap =
+ new TreeMap<Block, Map<DatanodeDescriptor, Reason>>();
/**
* Mark the block belonging to datanode as corrupt.
@@ -48,9 +58,22 @@ public class CorruptReplicasMap{
*/
public void addToCorruptReplicasMap(Block blk, DatanodeDescriptor dn,
String reason) {
- Collection<DatanodeDescriptor> nodes = getNodes(blk);
+ addToCorruptReplicasMap(blk, dn, reason, Reason.NONE);
+ }
+
+ /**
+ * Mark the block belonging to datanode as corrupt.
+ *
+ * @param blk Block to be added to CorruptReplicasMap
+ * @param dn DatanodeDescriptor which holds the corrupt replica
+ * @param reason a textual reason (for logging purposes)
+ * @param reasonCode the enum representation of the reason
+ */
+ public void addToCorruptReplicasMap(Block blk, DatanodeDescriptor dn,
+ String reason, Reason reasonCode) {
+ Map <DatanodeDescriptor, Reason> nodes = corruptReplicasMap.get(blk);
if (nodes == null) {
- nodes = new TreeSet<DatanodeDescriptor>();
+ nodes = new HashMap<DatanodeDescriptor, Reason>();
corruptReplicasMap.put(blk, nodes);
}
@@ -61,8 +84,7 @@ public class CorruptReplicasMap{
reasonText = "";
}
- if (!nodes.contains(dn)) {
- nodes.add(dn);
+ if (!nodes.keySet().contains(dn)) {
NameNode.blockStateChangeLog.info("BLOCK NameSystem.addToCorruptReplicasMap: "+
blk.getBlockName() +
" added as corrupt on " + dn +
@@ -76,6 +98,8 @@ public class CorruptReplicasMap{
" by " + Server.getRemoteIp() +
reasonText);
}
+ // Add the node or update the reason.
+ nodes.put(dn, reasonCode);
}
/**
@@ -97,10 +121,24 @@ public class CorruptReplicasMap{
false if the replica is not in the map
*/
boolean removeFromCorruptReplicasMap(Block blk, DatanodeDescriptor datanode) {
- Collection<DatanodeDescriptor> datanodes = corruptReplicasMap.get(blk);
+ return removeFromCorruptReplicasMap(blk, datanode, Reason.ANY);
+ }
+
+ boolean removeFromCorruptReplicasMap(Block blk, DatanodeDescriptor datanode,
+ Reason reason) {
+ Map <DatanodeDescriptor, Reason> datanodes = corruptReplicasMap.get(blk);
+ boolean removed = false;
if (datanodes==null)
return false;
- if (datanodes.remove(datanode)) { // remove the replicas
+
+ // if reasons can be compared but don't match, return false.
+ Reason storedReason = datanodes.get(datanode);
+ if (reason != Reason.ANY && storedReason != null &&
+ reason != storedReason) {
+ return false;
+ }
+
+ if (datanodes.remove(datanode) != null) { // remove the replicas
if (datanodes.isEmpty()) {
// remove the block if there is no more corrupted replicas
corruptReplicasMap.remove(blk);
@@ -118,7 +156,10 @@ public class CorruptReplicasMap{
* @return collection of nodes. Null if does not exists
*/
Collection<DatanodeDescriptor> getNodes(Block blk) {
- return corruptReplicasMap.get(blk);
+ Map <DatanodeDescriptor, Reason> nodes = corruptReplicasMap.get(blk);
+ if (nodes == null)
+ return null;
+ return nodes.keySet();
}
/**
Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java?rev=1542125&r1=1542124&r2=1542125&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java Thu Nov 14 23:56:56 2013
@@ -170,6 +170,21 @@ public class DatanodeDescriptor extends
return pendingUncached;
}
+ /**
+ * The time when the last batch of caching directives was sent, in
+ * monotonic milliseconds.
+ */
+ private long lastCachingDirectiveSentTimeMs;
+
+ /**
+ * Head of the list of blocks on the datanode
+ */
+ private volatile BlockInfo blockList = null;
+ /**
+ * Number of blocks on the datanode
+ */
+ private int numBlocks = 0;
+
// isAlive == heartbeats.contains(this)
// This is an optimization, because contains takes O(n) time on Arraylist
public boolean isAlive = false;
@@ -661,5 +676,21 @@ public class DatanodeDescriptor extends
return storage;
}
}
+
+ /**
+ * @return The time at which we last sent caching directives to this
+ * DataNode, in monotonic milliseconds.
+ */
+ public long getLastCachingDirectiveSentTimeMs() {
+ return this.lastCachingDirectiveSentTimeMs;
+ }
+
+ /**
+ * @param time The time at which we last sent caching directives to this
+ * DataNode, in monotonic milliseconds.
+ */
+ public void setLastCachingDirectiveSentTimeMs(long time) {
+ this.lastCachingDirectiveSentTimeMs = time;
+ }
}
Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java?rev=1542125&r1=1542124&r2=1542125&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java Thu Nov 14 23:56:56 2013
@@ -149,7 +149,7 @@ public class DatanodeManager {
* Whether we should tell datanodes what to cache in replies to
* heartbeat messages.
*/
- private boolean sendCachingCommands = false;
+ private boolean shouldSendCachingCommands = false;
/**
* The number of datanodes for each software version. This list should change
@@ -159,6 +159,16 @@ public class DatanodeManager {
private HashMap<String, Integer> datanodesSoftwareVersions =
new HashMap<String, Integer>(4, 0.75f);
+ /**
+ * The minimum time between resending caching directives to Datanodes,
+ * in milliseconds.
+ *
+ * Note that when a rescan happens, we will send the new directives
+ * as soon as possible. This timeout only applies to resending
+ * directives that we've already sent.
+ */
+ private final long timeBetweenResendingCachingDirectivesMs;
+
DatanodeManager(final BlockManager blockManager, final Namesystem namesystem,
final Configuration conf) throws IOException {
this.namesystem = namesystem;
@@ -241,6 +251,9 @@ public class DatanodeManager {
DFSConfigKeys.DFS_NAMENODE_USE_STALE_DATANODE_FOR_WRITE_RATIO_KEY +
" = '" + ratioUseStaleDataNodesForWrite + "' is invalid. " +
"It should be a positive non-zero float value, not greater than 1.0f.");
+ this.timeBetweenResendingCachingDirectivesMs = conf.getLong(
+ DFSConfigKeys.DFS_NAMENODE_PATH_BASED_CACHE_RETRY_INTERVAL_MS,
+ DFSConfigKeys.DFS_NAMENODE_PATH_BASED_CACHE_RETRY_INTERVAL_MS_DEFAULT);
}
private static long getStaleIntervalFromConf(Configuration conf,
@@ -1297,17 +1310,28 @@ public class DatanodeManager {
cmds.add(new BlockCommand(DatanodeProtocol.DNA_INVALIDATE,
blockPoolId, blks));
}
- DatanodeCommand pendingCacheCommand =
- getCacheCommand(nodeinfo.getPendingCached(), nodeinfo,
- DatanodeProtocol.DNA_CACHE, blockPoolId);
- if (pendingCacheCommand != null) {
- cmds.add(pendingCacheCommand);
- }
- DatanodeCommand pendingUncacheCommand =
- getCacheCommand(nodeinfo.getPendingUncached(), nodeinfo,
- DatanodeProtocol.DNA_UNCACHE, blockPoolId);
- if (pendingUncacheCommand != null) {
- cmds.add(pendingUncacheCommand);
+ boolean sendingCachingCommands = false;
+ long nowMs = Time.monotonicNow();
+ if (shouldSendCachingCommands &&
+ ((nowMs - nodeinfo.getLastCachingDirectiveSentTimeMs()) >=
+ timeBetweenResendingCachingDirectivesMs)) {
+ DatanodeCommand pendingCacheCommand =
+ getCacheCommand(nodeinfo.getPendingCached(), nodeinfo,
+ DatanodeProtocol.DNA_CACHE, blockPoolId);
+ if (pendingCacheCommand != null) {
+ cmds.add(pendingCacheCommand);
+ sendingCachingCommands = true;
+ }
+ DatanodeCommand pendingUncacheCommand =
+ getCacheCommand(nodeinfo.getPendingUncached(), nodeinfo,
+ DatanodeProtocol.DNA_UNCACHE, blockPoolId);
+ if (pendingUncacheCommand != null) {
+ cmds.add(pendingUncacheCommand);
+ sendingCachingCommands = true;
+ }
+ if (sendingCachingCommands) {
+ nodeinfo.setLastCachingDirectiveSentTimeMs(nowMs);
+ }
}
blockManager.addKeyUpdateCommand(cmds, nodeinfo);
@@ -1345,19 +1369,13 @@ public class DatanodeManager {
if (length == 0) {
return null;
}
- // Read and clear the existing cache commands.
+ // Read the existing cache commands.
long[] blockIds = new long[length];
int i = 0;
for (Iterator<CachedBlock> iter = list.iterator();
iter.hasNext(); ) {
CachedBlock cachedBlock = iter.next();
blockIds[i++] = cachedBlock.getBlockId();
- iter.remove();
- }
- if (!sendCachingCommands) {
- // Do not send caching commands unless the FSNamesystem told us we
- // should.
- return null;
}
return new BlockIdCommand(action, poolId, blockIds);
}
@@ -1408,13 +1426,25 @@ public class DatanodeManager {
}
}
+ /**
+ * Reset the lastCachingDirectiveSentTimeMs field of all the DataNodes we
+ * know about.
+ */
+ public void resetLastCachingDirectiveSentTime() {
+ synchronized (datanodeMap) {
+ for (DatanodeDescriptor dn : datanodeMap.values()) {
+ dn.setLastCachingDirectiveSentTimeMs(0L);
+ }
+ }
+ }
+
@Override
public String toString() {
return getClass().getSimpleName() + ": " + host2DatanodeMap;
}
- public void setSendCachingCommands(boolean sendCachingCommands) {
- this.sendCachingCommands = sendCachingCommands;
+ public void setShouldSendCachingCommands(boolean shouldSendCachingCommands) {
+ this.shouldSendCachingCommands = shouldSendCachingCommands;
}
}
Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java?rev=1542125&r1=1542124&r2=1542125&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java Thu Nov 14 23:56:56 2013
@@ -289,6 +289,10 @@ public class FsDatasetCache {
mappableBlockMap.put(key, new Value(null, State.CACHING));
volumeExecutor.execute(
new CachingTask(key, blockFileName, length, genstamp));
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Initiating caching for Block with id " + blockId +
+ ", pool " + bpid);
+ }
}
synchronized void uncacheBlock(String bpid, long blockId) {
@@ -427,6 +431,10 @@ public class FsDatasetCache {
mappableBlock.close();
}
numBlocksFailedToCache.incrementAndGet();
+
+ synchronized (FsDatasetCache.this) {
+ mappableBlockMap.remove(key);
+ }
}
}
}
Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlock.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlock.java?rev=1542125&r1=1542124&r2=1542125&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlock.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlock.java Thu Nov 14 23:56:56 2013
@@ -44,20 +44,6 @@ import com.google.common.base.Preconditi
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class MappableBlock implements Closeable {
- public static interface Mlocker {
- void mlock(MappedByteBuffer mmap, long length) throws IOException;
- }
-
- private static class PosixMlocker implements Mlocker {
- public void mlock(MappedByteBuffer mmap, long length)
- throws IOException {
- NativeIO.POSIX.mlock(mmap, length);
- }
- }
-
- @VisibleForTesting
- public static Mlocker mlocker = new PosixMlocker();
-
private MappedByteBuffer mmap;
private final long length;
@@ -96,7 +82,7 @@ public class MappableBlock implements Cl
throw new IOException("Block InputStream has no FileChannel.");
}
mmap = blockChannel.map(MapMode.READ_ONLY, 0, length);
- mlocker.mlock(mmap, length);
+ NativeIO.POSIX.cacheManipulator.mlock(blockFileName, mmap, length);
verifyChecksum(length, metaIn, blockChannel, blockFileName);
mappableBlock = new MappableBlock(mmap, length);
} finally {
Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java?rev=1542125&r1=1542124&r2=1542125&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java Thu Nov 14 23:56:56 2013
@@ -114,7 +114,9 @@ public class FSDirectory implements Clos
private final int maxComponentLength;
private final int maxDirItems;
private final int lsLimit; // max list limit
+ private final int contentCountLimit; // max content summary counts per run
private final INodeMap inodeMap; // Synchronized by dirLock
+ private long yieldCount = 0; // keep track of lock yield count.
// lock to protect the directory and BlockMap
private ReentrantReadWriteLock dirLock;
@@ -145,6 +147,14 @@ public class FSDirectory implements Clos
return this.dirLock.getReadHoldCount() > 0;
}
+ public int getReadHoldCount() {
+ return this.dirLock.getReadHoldCount();
+ }
+
+ public int getWriteHoldCount() {
+ return this.dirLock.getWriteHoldCount();
+ }
+
/**
* Caches frequently used file names used in {@link INode} to reuse
* byte[] objects and reduce heap usage.
@@ -161,6 +171,10 @@ public class FSDirectory implements Clos
DFSConfigKeys.DFS_LIST_LIMIT, DFSConfigKeys.DFS_LIST_LIMIT_DEFAULT);
this.lsLimit = configuredLimit>0 ?
configuredLimit : DFSConfigKeys.DFS_LIST_LIMIT_DEFAULT;
+
+ this.contentCountLimit = conf.getInt(
+ DFSConfigKeys.DFS_CONTENT_SUMMARY_LIMIT_KEY,
+ DFSConfigKeys.DFS_CONTENT_SUMMARY_LIMIT_DEFAULT);
// filesystem limits
this.maxComponentLength = conf.getInt(
@@ -2296,13 +2310,26 @@ public class FSDirectory implements Clos
throw new FileNotFoundException("File does not exist: " + srcs);
}
else {
- return targetNode.computeContentSummary();
+ // Make it relinquish locks everytime contentCountLimit entries are
+ // processed. 0 means disabled. I.e. blocking for the entire duration.
+ ContentSummaryComputationContext cscc =
+
+ new ContentSummaryComputationContext(this, getFSNamesystem(),
+ contentCountLimit);
+ ContentSummary cs = targetNode.computeAndConvertContentSummary(cscc);
+ yieldCount += cscc.getYieldCount();
+ return cs;
}
} finally {
readUnlock();
}
}
+ @VisibleForTesting
+ public long getYieldCount() {
+ return yieldCount;
+ }
+
public INodeMap getINodeMap() {
return inodeMap;
}
Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java?rev=1542125&r1=1542124&r2=1542125&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java Thu Nov 14 23:56:56 2013
@@ -593,7 +593,7 @@ public class FSEditLogLoader {
fsNamesys.getSnapshotManager().deleteSnapshot(
deleteSnapshotOp.snapshotRoot, deleteSnapshotOp.snapshotName,
collectedBlocks, removedINodes);
- fsNamesys.removeBlocks(collectedBlocks);
+ fsNamesys.removeBlocksAndUpdateSafemodeTotal(collectedBlocks);
collectedBlocks.clear();
fsNamesys.dir.removeFromInodeMap(removedINodes);
removedINodes.clear();
Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1542125&r1=1542124&r2=1542125&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Thu Nov 14 23:56:56 2013
@@ -1009,7 +1009,7 @@ public class FSNamesystem implements Nam
nnEditLogRoller.start();
cacheManager.activate();
- blockManager.getDatanodeManager().setSendCachingCommands(true);
+ blockManager.getDatanodeManager().setShouldSendCachingCommands(true);
} finally {
writeUnlock();
startingActiveService = false;
@@ -1060,7 +1060,7 @@ public class FSNamesystem implements Nam
dir.fsImage.updateLastAppliedTxIdFromWritten();
}
cacheManager.deactivate();
- blockManager.getDatanodeManager().setSendCachingCommands(false);
+ blockManager.getDatanodeManager().setShouldSendCachingCommands(false);
} finally {
writeUnlock();
}
@@ -1297,6 +1297,14 @@ public class FSNamesystem implements Nam
return hasReadLock() || hasWriteLock();
}
+ public int getReadHoldCount() {
+ return this.fsLock.getReadHoldCount();
+ }
+
+ public int getWriteHoldCount() {
+ return this.fsLock.getWriteHoldCount();
+ }
+
NamespaceInfo getNamespaceInfo() {
readLock();
try {
@@ -3305,6 +3313,18 @@ public class FSNamesystem implements Nam
return;
}
+ removeBlocksAndUpdateSafemodeTotal(blocks);
+ }
+
+ /**
+ * Removes the blocks from blocksmap and updates the safemode blocks total
+ *
+ * @param blocks
+ * An instance of {@link BlocksMapUpdateInfo} which contains a list
+ * of blocks that need to be removed from blocksMap
+ */
+ void removeBlocksAndUpdateSafemodeTotal(BlocksMapUpdateInfo blocks) {
+ assert hasWriteLock();
// In the case that we are a Standby tailing edits from the
// active while in safe-mode, we need to track the total number
// of blocks and safe blocks in the system.
@@ -3325,9 +3345,9 @@ public class FSNamesystem implements Nam
}
if (trackBlockCounts) {
if (LOG.isDebugEnabled()) {
- LOG.debug("Adjusting safe-mode totals for deletion of " + src + ":" +
- "decreasing safeBlocks by " + numRemovedSafe +
- ", totalBlocks by " + numRemovedComplete);
+ LOG.debug("Adjusting safe-mode totals for deletion."
+ + "decreasing safeBlocks by " + numRemovedSafe
+ + ", totalBlocks by " + numRemovedComplete);
}
adjustSafeModeBlockTotals(-numRemovedSafe, -numRemovedComplete);
}
@@ -5883,8 +5903,8 @@ public class FSNamesystem implements Nam
}
// Update old block with the new generation stamp and new length
- blockinfo.setGenerationStamp(newBlock.getGenerationStamp());
blockinfo.setNumBytes(newBlock.getNumBytes());
+ blockinfo.setGenerationStampAndVerifyReplicas(newBlock.getGenerationStamp());
// find the DatanodeDescriptor objects
final DatanodeStorageInfo[] storages = blockManager.getDatanodeManager()
@@ -6953,6 +6973,7 @@ public class FSNamesystem implements Nam
return; // Return previous response
}
boolean success = false;
+ BlocksMapUpdateInfo collectedBlocks = new BlocksMapUpdateInfo();
writeLock();
try {
checkOperation(OperationCategory.WRITE);
@@ -6961,7 +6982,6 @@ public class FSNamesystem implements Nam
checkOwner(pc, snapshotRoot);
}
- BlocksMapUpdateInfo collectedBlocks = new BlocksMapUpdateInfo();
List<INode> removedINodes = new ChunkedArrayList<INode>();
dir.writeLock();
try {
@@ -6972,8 +6992,6 @@ public class FSNamesystem implements Nam
dir.writeUnlock();
}
removedINodes.clear();
- this.removeBlocks(collectedBlocks);
- collectedBlocks.clear();
getEditLog().logDeleteSnapshot(snapshotRoot, snapshotName,
cacheEntry != null);
success = true;
@@ -6982,7 +7000,10 @@ public class FSNamesystem implements Nam
RetryCache.setState(cacheEntry, success);
}
getEditLog().logSync();
-
+
+ removeBlocks(collectedBlocks);
+ collectedBlocks.clear();
+
if (auditLog.isInfoEnabled() && isExternalInvocation()) {
String rootPath = Snapshot.getSnapshotPath(snapshotRoot, snapshotName);
logAuditEvent(true, "deleteSnapshot", rootPath, null, null);
Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java?rev=1542125&r1=1542124&r2=1542125&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java Thu Nov 14 23:56:56 2013
@@ -371,10 +371,18 @@ public abstract class INode implements I
public abstract void destroyAndCollectBlocks(
BlocksMapUpdateInfo collectedBlocks, List<INode> removedINodes);
- /** Compute {@link ContentSummary}. */
+ /** Compute {@link ContentSummary}. Blocking call */
public final ContentSummary computeContentSummary() {
- final Content.Counts counts = computeContentSummary(
- Content.Counts.newInstance());
+ return computeAndConvertContentSummary(
+ new ContentSummaryComputationContext());
+ }
+
+ /**
+ * Compute {@link ContentSummary}.
+ */
+ public final ContentSummary computeAndConvertContentSummary(
+ ContentSummaryComputationContext summary) {
+ Content.Counts counts = computeContentSummary(summary).getCounts();
return new ContentSummary(counts.get(Content.LENGTH),
counts.get(Content.FILE) + counts.get(Content.SYMLINK),
counts.get(Content.DIRECTORY), getNsQuota(),
@@ -384,10 +392,12 @@ public abstract class INode implements I
/**
* Count subtree content summary with a {@link Content.Counts}.
*
- * @param counts The subtree counts for returning.
- * @return The same objects as the counts parameter.
+ * @param summary the context object holding counts for the subtree.
+ * @return The same objects as summary.
*/
- public abstract Content.Counts computeContentSummary(Content.Counts counts);
+ public abstract ContentSummaryComputationContext computeContentSummary(
+ ContentSummaryComputationContext summary);
+
/**
* Check and add namespace/diskspace consumed to itself and the ancestors.
Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java?rev=1542125&r1=1542124&r2=1542125&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java Thu Nov 14 23:56:56 2013
@@ -466,12 +466,45 @@ public class INodeDirectory extends INod
}
@Override
- public Content.Counts computeContentSummary(final Content.Counts counts) {
- for (INode child : getChildrenList(null)) {
- child.computeContentSummary(counts);
+ public ContentSummaryComputationContext computeContentSummary(
+ ContentSummaryComputationContext summary) {
+ ReadOnlyList<INode> childrenList = getChildrenList(null);
+ // Explicit traversing is done to enable repositioning after relinquishing
+ // and reacquiring locks.
+ for (int i = 0; i < childrenList.size(); i++) {
+ INode child = childrenList.get(i);
+ byte[] childName = child.getLocalNameBytes();
+
+ long lastYieldCount = summary.getYieldCount();
+ child.computeContentSummary(summary);
+
+ // Check whether the computation was paused in the subtree.
+ // The counts may be off, but traversing the rest of children
+ // should be made safe.
+ if (lastYieldCount == summary.getYieldCount()) {
+ continue;
+ }
+
+ // The locks were released and reacquired. Check parent first.
+ if (getParent() == null) {
+ // Stop further counting and return whatever we have so far.
+ break;
+ }
+
+ // Obtain the children list again since it may have been modified.
+ childrenList = getChildrenList(null);
+ // Reposition in case the children list is changed. Decrement by 1
+ // since it will be incremented when loops.
+ i = nextChild(childrenList, childName) - 1;
}
- counts.add(Content.DIRECTORY, 1);
- return counts;
+
+ // Increment the directory count for this directory.
+ summary.getCounts().add(Content.DIRECTORY, 1);
+
+ // Relinquish and reacquire locks if necessary.
+ summary.yield();
+
+ return summary;
}
/**
Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectoryWithQuota.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectoryWithQuota.java?rev=1542125&r1=1542124&r2=1542125&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectoryWithQuota.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectoryWithQuota.java Thu Nov 14 23:56:56 2013
@@ -107,12 +107,16 @@ public class INodeDirectoryWithQuota ext
}
@Override
- public Content.Counts computeContentSummary(
- final Content.Counts counts) {
- final long original = counts.get(Content.DISKSPACE);
- super.computeContentSummary(counts);
- checkDiskspace(counts.get(Content.DISKSPACE) - original);
- return counts;
+ public ContentSummaryComputationContext computeContentSummary(
+ final ContentSummaryComputationContext summary) {
+ final long original = summary.getCounts().get(Content.DISKSPACE);
+ long oldYieldCount = summary.getYieldCount();
+ super.computeContentSummary(summary);
+ // Check only when the content has not changed in the middle.
+ if (oldYieldCount == summary.getYieldCount()) {
+ checkDiskspace(summary.getCounts().get(Content.DISKSPACE) - original);
+ }
+ return summary;
}
private void checkDiskspace(final long computed) {
Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java?rev=1542125&r1=1542124&r2=1542125&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java Thu Nov 14 23:56:56 2013
@@ -342,11 +342,11 @@ public class INodeFile extends INodeWith
}
@Override
- public final Content.Counts computeContentSummary(
- final Content.Counts counts) {
- computeContentSummary4Snapshot(counts);
- computeContentSummary4Current(counts);
- return counts;
+ public final ContentSummaryComputationContext computeContentSummary(
+ final ContentSummaryComputationContext summary) {
+ computeContentSummary4Snapshot(summary.getCounts());
+ computeContentSummary4Current(summary.getCounts());
+ return summary;
}
private void computeContentSummary4Snapshot(final Content.Counts counts) {
Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeMap.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeMap.java?rev=1542125&r1=1542124&r2=1542125&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeMap.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeMap.java Thu Nov 14 23:56:56 2013
@@ -107,7 +107,8 @@ public class INodeMap {
}
@Override
- public Content.Counts computeContentSummary(Content.Counts counts) {
+ public ContentSummaryComputationContext computeContentSummary(
+ ContentSummaryComputationContext summary) {
return null;
}
Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeReference.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeReference.java?rev=1542125&r1=1542124&r2=1542125&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeReference.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeReference.java Thu Nov 14 23:56:56 2013
@@ -278,8 +278,9 @@ public abstract class INodeReference ext
}
@Override
- public Content.Counts computeContentSummary(Content.Counts counts) {
- return referred.computeContentSummary(counts);
+ public ContentSummaryComputationContext computeContentSummary(
+ ContentSummaryComputationContext summary) {
+ return referred.computeContentSummary(summary);
}
@Override
@@ -444,12 +445,13 @@ public abstract class INodeReference ext
}
@Override
- public final Content.Counts computeContentSummary(Content.Counts counts) {
+ public final ContentSummaryComputationContext computeContentSummary(
+ ContentSummaryComputationContext summary) {
//only count diskspace for WithName
final Quota.Counts q = Quota.Counts.newInstance();
computeQuotaUsage(q, false, lastSnapshotId);
- counts.add(Content.DISKSPACE, q.get(Quota.DISKSPACE));
- return counts;
+ summary.getCounts().add(Content.DISKSPACE, q.get(Quota.DISKSPACE));
+ return summary;
}
@Override
@@ -688,4 +690,4 @@ public abstract class INodeReference ext
}
}
}
-}
\ No newline at end of file
+}
Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeSymlink.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeSymlink.java?rev=1542125&r1=1542124&r2=1542125&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeSymlink.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeSymlink.java Thu Nov 14 23:56:56 2013
@@ -98,9 +98,10 @@ public class INodeSymlink extends INodeW
}
@Override
- public Content.Counts computeContentSummary(final Content.Counts counts) {
- counts.add(Content.SYMLINK, 1);
- return counts;
+ public ContentSummaryComputationContext computeContentSummary(
+ final ContentSummaryComputationContext summary) {
+ summary.getCounts().add(Content.SYMLINK, 1);
+ return summary;
}
@Override
Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeDirectorySnapshottable.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeDirectorySnapshottable.java?rev=1542125&r1=1542124&r2=1542125&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeDirectorySnapshottable.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeDirectorySnapshottable.java Thu Nov 14 23:56:56 2013
@@ -38,6 +38,7 @@ import org.apache.hadoop.hdfs.protocol.S
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport.DiffReportEntry;
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport.DiffType;
import org.apache.hadoop.hdfs.server.namenode.Content;
+import org.apache.hadoop.hdfs.server.namenode.ContentSummaryComputationContext;
import org.apache.hadoop.hdfs.server.namenode.INode;
import org.apache.hadoop.hdfs.server.namenode.INodeDirectory;
import org.apache.hadoop.hdfs.server.namenode.INodeFile;
@@ -342,11 +343,12 @@ public class INodeDirectorySnapshottable
}
@Override
- public Content.Counts computeContentSummary(final Content.Counts counts) {
- super.computeContentSummary(counts);
- counts.add(Content.SNAPSHOT, snapshotsByNames.size());
- counts.add(Content.SNAPSHOTTABLE_DIRECTORY, 1);
- return counts;
+ public ContentSummaryComputationContext computeContentSummary(
+ final ContentSummaryComputationContext summary) {
+ super.computeContentSummary(summary);
+ summary.getCounts().add(Content.SNAPSHOT, snapshotsByNames.size());
+ summary.getCounts().add(Content.SNAPSHOTTABLE_DIRECTORY, 1);
+ return summary;
}
/**
Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeDirectoryWithSnapshot.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeDirectoryWithSnapshot.java?rev=1542125&r1=1542124&r2=1542125&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeDirectoryWithSnapshot.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeDirectoryWithSnapshot.java Thu Nov 14 23:56:56 2013
@@ -32,6 +32,7 @@ import org.apache.hadoop.hdfs.protocol.Q
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport.DiffReportEntry;
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport.DiffType;
import org.apache.hadoop.hdfs.server.namenode.Content;
+import org.apache.hadoop.hdfs.server.namenode.ContentSummaryComputationContext;
import org.apache.hadoop.hdfs.server.namenode.FSImageSerialization;
import org.apache.hadoop.hdfs.server.namenode.INode;
import org.apache.hadoop.hdfs.server.namenode.INodeDirectory;
@@ -883,18 +884,27 @@ public class INodeDirectoryWithSnapshot
}
@Override
- public Content.Counts computeContentSummary(final Content.Counts counts) {
- super.computeContentSummary(counts);
- computeContentSummary4Snapshot(counts);
- return counts;
+ public ContentSummaryComputationContext computeContentSummary(
+ final ContentSummaryComputationContext summary) {
+ // Snapshot summary calc won't be relinquishing locks in the middle.
+ // Do this first and handover to parent.
+ computeContentSummary4Snapshot(summary.getCounts());
+ super.computeContentSummary(summary);
+ return summary;
}
private void computeContentSummary4Snapshot(final Content.Counts counts) {
+ // Create a new blank summary context for blocking processing of subtree.
+ ContentSummaryComputationContext summary =
+ new ContentSummaryComputationContext();
for(DirectoryDiff d : diffs) {
for(INode deleted : d.getChildrenDiff().getList(ListType.DELETED)) {
- deleted.computeContentSummary(counts);
+ deleted.computeContentSummary(summary);
}
}
+ // Add the counts from deleted trees.
+ counts.add(summary.getCounts());
+ // Add the deleted directory count.
counts.add(Content.DIRECTORY, diffs.asList().size());
}
Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DelegationTokenFetcher.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DelegationTokenFetcher.java?rev=1542125&r1=1542124&r2=1542125&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DelegationTokenFetcher.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DelegationTokenFetcher.java Thu Nov 14 23:56:56 2013
@@ -26,8 +26,8 @@ import java.io.InputStreamReader;
import java.io.PrintStream;
import java.net.HttpURLConnection;
import java.net.InetSocketAddress;
+import java.net.URI;
import java.net.URL;
-import java.net.URLConnection;
import java.security.PrivilegedExceptionAction;
import java.util.Collection;
import java.util.Date;
@@ -47,11 +47,14 @@ import org.apache.hadoop.hdfs.server.nam
import org.apache.hadoop.hdfs.server.namenode.GetDelegationTokenServlet;
import org.apache.hadoop.hdfs.server.namenode.RenewDelegationTokenServlet;
import org.apache.hadoop.hdfs.web.HftpFileSystem;
+import org.apache.hadoop.hdfs.web.HsftpFileSystem;
+import org.apache.hadoop.hdfs.web.URLConnectionFactory;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authentication.client.AuthenticationException;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.ExitUtil;
import org.apache.hadoop.util.GenericOptionsParser;
@@ -142,11 +145,11 @@ public class DelegationTokenFetcher {
// default to using the local file system
FileSystem local = FileSystem.getLocal(conf);
final Path tokenFile = new Path(local.getWorkingDirectory(), remaining[0]);
+ final URLConnectionFactory connectionFactory = URLConnectionFactory.DEFAULT_CONNECTION_FACTORY;
// Login the current user
UserGroupInformation.getCurrentUser().doAs(
new PrivilegedExceptionAction<Object>() {
- @SuppressWarnings("unchecked")
@Override
public Object run() throws Exception {
@@ -182,7 +185,8 @@ public class DelegationTokenFetcher {
} else {
// otherwise we are fetching
if (webUrl != null) {
- Credentials creds = getDTfromRemote(webUrl, renewer);
+ Credentials creds = getDTfromRemote(connectionFactory, new URI(webUrl),
+ renewer);
creds.writeTokenStorageFile(tokenFile, conf);
for (Token<?> token : creds.getAllTokens()) {
if(LOG.isDebugEnabled()) {
@@ -208,32 +212,31 @@ public class DelegationTokenFetcher {
});
}
- static public Credentials getDTfromRemote(String nnAddr,
- String renewer) throws IOException {
+ static public Credentials getDTfromRemote(URLConnectionFactory factory,
+ URI nnUri, String renewer) throws IOException {
+ StringBuilder buf = new StringBuilder(nnUri.toString())
+ .append(GetDelegationTokenServlet.PATH_SPEC);
+ if (renewer != null) {
+ buf.append("?").append(GetDelegationTokenServlet.RENEWER).append("=")
+ .append(renewer);
+ }
+
+ HttpURLConnection conn = null;
DataInputStream dis = null;
- InetSocketAddress serviceAddr = NetUtils.createSocketAddr(nnAddr);
-
+ InetSocketAddress serviceAddr = NetUtils.createSocketAddr(nnUri
+ .getAuthority());
+
try {
- StringBuffer url = new StringBuffer();
- if (renewer != null) {
- url.append(nnAddr).append(GetDelegationTokenServlet.PATH_SPEC)
- .append("?").append(GetDelegationTokenServlet.RENEWER).append("=")
- .append(renewer);
- } else {
- url.append(nnAddr).append(GetDelegationTokenServlet.PATH_SPEC);
- }
-
if(LOG.isDebugEnabled()) {
- LOG.debug("Retrieving token from: " + url);
+ LOG.debug("Retrieving token from: " + buf);
}
-
- URL remoteURL = new URL(url.toString());
- URLConnection connection = SecurityUtil.openSecureHttpConnection(remoteURL);
- InputStream in = connection.getInputStream();
+
+ conn = run(factory, new URL(buf.toString()));
+ InputStream in = conn.getInputStream();
Credentials ts = new Credentials();
dis = new DataInputStream(in);
ts.readFields(dis);
- for(Token<?> token: ts.getAllTokens()) {
+ for (Token<?> token : ts.getAllTokens()) {
token.setKind(HftpFileSystem.TOKEN_KIND);
SecurityUtil.setTokenService(token, serviceAddr);
}
@@ -241,53 +244,70 @@ public class DelegationTokenFetcher {
} catch (Exception e) {
throw new IOException("Unable to obtain remote token", e);
} finally {
- if(dis != null) dis.close();
+ IOUtils.cleanup(LOG, dis);
+ if (conn != null) {
+ conn.disconnect();
+ }
}
}
/**
+ * Cancel a Delegation Token.
+ * @param nnAddr the NameNode's address
+ * @param tok the token to cancel
+ * @throws IOException
+ * @throws AuthenticationException
+ */
+ static public void cancelDelegationToken(URLConnectionFactory factory,
+ URI nnAddr, Token<DelegationTokenIdentifier> tok) throws IOException,
+ AuthenticationException {
+ StringBuilder buf = new StringBuilder(nnAddr.toString())
+ .append(CancelDelegationTokenServlet.PATH_SPEC).append("?")
+ .append(CancelDelegationTokenServlet.TOKEN).append("=")
+ .append(tok.encodeToUrlString());
+ HttpURLConnection conn = run(factory, new URL(buf.toString()));
+ conn.disconnect();
+ }
+
+ /**
* Renew a Delegation Token.
* @param nnAddr the NameNode's address
* @param tok the token to renew
* @return the Date that the token will expire next.
* @throws IOException
+ * @throws AuthenticationException
*/
- static public long renewDelegationToken(String nnAddr,
- Token<DelegationTokenIdentifier> tok
- ) throws IOException {
- StringBuilder buf = new StringBuilder();
- buf.append(nnAddr);
- buf.append(RenewDelegationTokenServlet.PATH_SPEC);
- buf.append("?");
- buf.append(RenewDelegationTokenServlet.TOKEN);
- buf.append("=");
- buf.append(tok.encodeToUrlString());
- BufferedReader in = null;
+ static public long renewDelegationToken(URLConnectionFactory factory,
+ URI nnAddr, Token<DelegationTokenIdentifier> tok) throws IOException,
+ AuthenticationException {
+ StringBuilder buf = new StringBuilder(nnAddr.toString())
+ .append(RenewDelegationTokenServlet.PATH_SPEC).append("?")
+ .append(RenewDelegationTokenServlet.TOKEN).append("=")
+ .append(tok.encodeToUrlString());
+
HttpURLConnection connection = null;
-
+ BufferedReader in = null;
try {
- URL url = new URL(buf.toString());
- connection = (HttpURLConnection) SecurityUtil.openSecureHttpConnection(url);
- if (connection.getResponseCode() != HttpURLConnection.HTTP_OK) {
- throw new IOException("Error renewing token: " +
- connection.getResponseMessage());
- }
- in = new BufferedReader(
- new InputStreamReader(connection.getInputStream(), Charsets.UTF_8));
+ connection = run(factory, new URL(buf.toString()));
+ in = new BufferedReader(new InputStreamReader(
+ connection.getInputStream(), Charsets.UTF_8));
long result = Long.parseLong(in.readLine());
- in.close();
return result;
} catch (IOException ie) {
LOG.info("error in renew over HTTP", ie);
IOException e = getExceptionFromResponse(connection);
- IOUtils.cleanup(LOG, in);
- if(e!=null) {
- LOG.info("rethrowing exception from HTTP request: " +
- e.getLocalizedMessage());
+ if (e != null) {
+ LOG.info("rethrowing exception from HTTP request: "
+ + e.getLocalizedMessage());
throw e;
}
throw ie;
+ } finally {
+ IOUtils.cleanup(LOG, in);
+ if (connection != null) {
+ connection.disconnect();
+ }
}
}
@@ -339,43 +359,28 @@ public class DelegationTokenFetcher {
return e;
}
-
- /**
- * Cancel a Delegation Token.
- * @param nnAddr the NameNode's address
- * @param tok the token to cancel
- * @throws IOException
- */
- static public void cancelDelegationToken(String nnAddr,
- Token<DelegationTokenIdentifier> tok
- ) throws IOException {
- StringBuilder buf = new StringBuilder();
- buf.append(nnAddr);
- buf.append(CancelDelegationTokenServlet.PATH_SPEC);
- buf.append("?");
- buf.append(CancelDelegationTokenServlet.TOKEN);
- buf.append("=");
- buf.append(tok.encodeToUrlString());
- BufferedReader in = null;
- HttpURLConnection connection=null;
+ private static HttpURLConnection run(URLConnectionFactory factory, URL url)
+ throws IOException, AuthenticationException {
+ HttpURLConnection conn = null;
+
try {
- URL url = new URL(buf.toString());
- connection = (HttpURLConnection) SecurityUtil.openSecureHttpConnection(url);
- if (connection.getResponseCode() != HttpURLConnection.HTTP_OK) {
- throw new IOException("Error cancelling token: " +
- connection.getResponseMessage());
+ conn = (HttpURLConnection) factory.openConnection(url, true);
+ if (conn.getResponseCode() != HttpURLConnection.HTTP_OK) {
+ String msg = conn.getResponseMessage();
+
+ throw new IOException("Error when dealing remote token: " + msg);
}
} catch (IOException ie) {
- LOG.info("error in cancel over HTTP", ie);
- IOException e = getExceptionFromResponse(connection);
+ LOG.info("Error when dealing remote token:", ie);
+ IOException e = getExceptionFromResponse(conn);
- IOUtils.cleanup(LOG, in);
- if(e!=null) {
- LOG.info("rethrowing exception from HTTP request: " +
- e.getLocalizedMessage());
+ if (e != null) {
+ LOG.info("rethrowing exception from HTTP request: "
+ + e.getLocalizedMessage());
throw e;
}
throw ie;
}
+ return conn;
}
}