You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by ss...@apache.org on 2012/10/19 20:49:45 UTC
svn commit: r1400219 [1/2] - in
/hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs: ./
src/main/java/org/apache/hadoop/hdfs/
src/main/java/org/apache/hadoop/hdfs/protocol/
src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/ src/mai...
Author: sseth
Date: Fri Oct 19 18:49:38 2012
New Revision: 1400219
URL: http://svn.apache.org/viewvc?rev=1400219&view=rev
Log:
merge from trunk to branch MR-3902
Added:
hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocol/datatransfer/
- copied from r1400218, hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocol/datatransfer/
hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocol/datatransfer/TestPacketReceiver.java
- copied unchanged from r1400218, hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocol/datatransfer/TestPacketReceiver.java
Removed:
hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/packages/
Modified:
hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeID.java
hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java
hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketReceiver.java
hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/InvalidateBlocks.java
hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingReplicationBlocks.java
hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/GenerationStamp.java
hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java
hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java
hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java
hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalManager.java
hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LogsPurgeable.java
hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NNStorageRetentionManager.java
hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java
hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/FSNamesystemMBean.java
hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeRegistration.java
hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/ImageLoaderCurrent.java
hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestMultiThreadedHflush.java
hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java
hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingReplication.java
hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestUnderReplicatedBlocks.java
hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestMultipleNNDataBlockScanner.java
hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAuditLogs.java
hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNNStorageRetentionManager.java
hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/metrics/TestNameNodeMetrics.java
Modified: hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1400219&r1=1400218&r2=1400219&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Fri Oct 19 18:49:38 2012
@@ -147,6 +147,9 @@ Trunk (Unreleased)
Block Pool Used, Block Pool Used(%) and Failed Volumes.
(Brahma Reddy Battula via suresh)
+ HDFS-4052. BlockManager#invalidateWork should print log outside the lock.
+ (Jing Zhao via suresh)
+
OPTIMIZATIONS
BUG FIXES
@@ -224,8 +227,6 @@ Trunk (Unreleased)
HDFS-3834. Remove unused static fields NAME, DESCRIPTION and Usage from
Command. (Jing Zhao via suresh)
- HDFS-3678. Edit log files are never being purged from 2NN. (atm)
-
HADOOP-8158. Interrupting hadoop fs -put from the command line
causes a LeaseExpiredException. (daryn via harsh)
@@ -343,6 +344,8 @@ Release 2.0.3-alpha - Unreleased
HDFS-3912. Detect and avoid stale datanodes for writes.
(Jing Zhao via suresh)
+ HDFS-4059. Add number of stale DataNodes to metrics. (Jing Zhao via suresh)
+
IMPROVEMENTS
HDFS-3925. Prettify PipelineAck#toString() for printing to a log
@@ -388,6 +391,20 @@ Release 2.0.3-alpha - Unreleased
HDFS-4036. Remove "throw UnresolvedLinkException" from
FSDirectory.unprotectedAddFile(..). (Jing Zhao via szetszwo)
+ HDFS-2946. HA: Put a cap on the number of completed edits files retained
+ by the NN. (atm)
+
+ HDFS-4029. GenerationStamp should use an AtomicLong. (eli)
+
+ HDFS-4068. DatanodeID and DatanodeInfo member should be private. (eli)
+
+ HDFS-4073. Two minor improvements to FSDirectory. (Jing Zhao via szetszwo)
+
+ HDFS-4074. Remove the unused default constructor from INode. (Brandon Li
+ via szetszwo)
+
+ HDFS-4053. Increase the default block size. (eli)
+
OPTIMIZATIONS
BUG FIXES
@@ -440,6 +457,26 @@ Release 2.0.3-alpha - Unreleased
HDFS-4044. Duplicate ChecksumType definition in HDFS .proto files.
(Binglin Chang via suresh)
+ HDFS-4049. Fix hflush performance regression due to nagling delays
+ (todd)
+
+ HDFS-3678. Edit log files are never being purged from 2NN. (atm)
+
+ HDFS-4058. DirectoryScanner may fail with IOOB if the directory
+ scanning threads return out of volume order. (eli)
+
+ HDFS-3985. Add timeouts to TestMulitipleNNDataBlockScanner. (todd via eli)
+
+ HDFS-4061. TestBalancer and TestUnderReplicatedBlocks need timeouts. (eli)
+
+ HDFS-3997. OfflineImageViewer incorrectly passes value of imageVersion when
+ visiting IS_COMPRESSED element. (Mithun Radhakrishnan via atm)
+
+ HDFS-4055. TestAuditLogs is flaky. (Binglin Chang via eli)
+
+ HDFS-4072. On file deletion remove corresponding blocks pending
+ replications. (Jing Zhao via suresh)
+
Release 2.0.2-alpha - 2012-09-07
INCOMPATIBLE CHANGES
Modified: hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java?rev=1400219&r1=1400218&r2=1400219&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java Fri Oct 19 18:49:38 2012
@@ -31,7 +31,7 @@ import org.apache.hadoop.fs.CommonConfig
public class DFSConfigKeys extends CommonConfigurationKeys {
public static final String DFS_BLOCK_SIZE_KEY = "dfs.blocksize";
- public static final long DFS_BLOCK_SIZE_DEFAULT = 64*1024*1024;
+ public static final long DFS_BLOCK_SIZE_DEFAULT = 128*1024*1024;
public static final String DFS_REPLICATION_KEY = "dfs.replication";
public static final short DFS_REPLICATION_DEFAULT = 3;
public static final String DFS_STREAM_BUFFER_SIZE_KEY = "dfs.stream-buffer-size";
@@ -162,6 +162,8 @@ public class DFSConfigKeys extends Commo
public static final int DFS_NAMENODE_NUM_CHECKPOINTS_RETAINED_DEFAULT = 2;
public static final String DFS_NAMENODE_NUM_EXTRA_EDITS_RETAINED_KEY = "dfs.namenode.num.extra.edits.retained";
public static final int DFS_NAMENODE_NUM_EXTRA_EDITS_RETAINED_DEFAULT = 1000000; //1M
+ public static final String DFS_NAMENODE_MAX_EXTRA_EDITS_SEGMENTS_RETAINED_KEY = "dfs.namenode.max.extra.edits.segments.retained";
+ public static final int DFS_NAMENODE_MAX_EXTRA_EDITS_SEGMENTS_RETAINED_DEFAULT = 10000; // 10k
public static final String DFS_NAMENODE_MIN_SUPPORTED_DATANODE_VERSION_KEY = "dfs.namenode.min.supported.datanode.version";
public static final String DFS_NAMENODE_MIN_SUPPORTED_DATANODE_VERSION_DEFAULT = "3.0.0-SNAPSHOT";
Modified: hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeID.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeID.java?rev=1400219&r1=1400218&r2=1400219&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeID.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeID.java Fri Oct 19 18:49:38 2012
@@ -37,12 +37,12 @@ import org.apache.hadoop.classification.
public class DatanodeID implements Comparable<DatanodeID> {
public static final DatanodeID[] EMPTY_ARRAY = {};
- protected String ipAddr; // IP address
- protected String hostName; // hostname
- protected String storageID; // unique per cluster storageID
- protected int xferPort; // data streaming port
- protected int infoPort; // info server port
- protected int ipcPort; // IPC server port
+ private String ipAddr; // IP address
+ private String hostName; // hostname
+ private String storageID; // unique per cluster storageID
+ private int xferPort; // data streaming port
+ private int infoPort; // info server port
+ private int ipcPort; // IPC server port
public DatanodeID(DatanodeID from) {
this(from.getIpAddr(),
Modified: hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java?rev=1400219&r1=1400218&r2=1400219&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java Fri Oct 19 18:49:38 2012
@@ -37,13 +37,13 @@ import org.apache.hadoop.util.Time;
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class DatanodeInfo extends DatanodeID implements Node {
- protected long capacity;
- protected long dfsUsed;
- protected long remaining;
- protected long blockPoolUsed;
- protected long lastUpdate;
- protected int xceiverCount;
- protected String location = NetworkTopology.DEFAULT_RACK;
+ private long capacity;
+ private long dfsUsed;
+ private long remaining;
+ private long blockPoolUsed;
+ private long lastUpdate;
+ private int xceiverCount;
+ private String location = NetworkTopology.DEFAULT_RACK;
// Datanode administrative states
public enum AdminStates {
@@ -81,8 +81,7 @@ public class DatanodeInfo extends Datano
this.lastUpdate = from.getLastUpdate();
this.xceiverCount = from.getXceiverCount();
this.location = from.getNetworkLocation();
- this.adminState = from.adminState;
- this.hostName = from.hostName;
+ this.adminState = from.getAdminState();
}
public DatanodeInfo(DatanodeID nodeID) {
Modified: hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketReceiver.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketReceiver.java?rev=1400219&r1=1400218&r2=1400219&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketReceiver.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketReceiver.java Fri Oct 19 18:49:38 2012
@@ -53,14 +53,8 @@ public class PacketReceiver implements C
private final boolean useDirectBuffers;
/**
- * Internal buffer for reading the length prefixes at the start of
- * the packet.
- */
- private final ByteBuffer lengthPrefixBuf = ByteBuffer.allocate(
- PacketHeader.PKT_LENGTHS_LEN);
-
- /**
- * The entirety of the most recently read packet, excepting the
+ * The entirety of the most recently read packet.
+ * The first PKT_LENGTHS_LEN bytes of this buffer are the
* length prefixes.
*/
private ByteBuffer curPacketBuf = null;
@@ -82,6 +76,7 @@ public class PacketReceiver implements C
public PacketReceiver(boolean useDirectBuffers) {
this.useDirectBuffers = useDirectBuffers;
+ reallocPacketBuf(PacketHeader.PKT_LENGTHS_LEN);
}
public PacketHeader getHeader() {
@@ -133,11 +128,12 @@ public class PacketReceiver implements C
// checksums were not requested
// DATA the actual block data
Preconditions.checkState(curHeader == null || !curHeader.isLastPacketInBlock());
-
- lengthPrefixBuf.clear();
- doReadFully(ch, in, lengthPrefixBuf);
- lengthPrefixBuf.flip();
- int payloadLen = lengthPrefixBuf.getInt();
+
+ curPacketBuf.clear();
+ curPacketBuf.limit(PacketHeader.PKT_LENGTHS_LEN);
+ doReadFully(ch, in, curPacketBuf);
+ curPacketBuf.flip();
+ int payloadLen = curPacketBuf.getInt();
if (payloadLen < Ints.BYTES) {
// The "payload length" includes its own length. Therefore it
@@ -146,7 +142,7 @@ public class PacketReceiver implements C
payloadLen);
}
int dataPlusChecksumLen = payloadLen - Ints.BYTES;
- int headerLen = lengthPrefixBuf.getShort();
+ int headerLen = curPacketBuf.getShort();
if (headerLen < 0) {
throw new IOException("Invalid header length " + headerLen);
}
@@ -166,13 +162,17 @@ public class PacketReceiver implements C
// Make sure we have space for the whole packet, and
// read it.
- reallocPacketBuf(dataPlusChecksumLen + headerLen);
+ reallocPacketBuf(PacketHeader.PKT_LENGTHS_LEN +
+ dataPlusChecksumLen + headerLen);
curPacketBuf.clear();
- curPacketBuf.limit(dataPlusChecksumLen + headerLen);
+ curPacketBuf.position(PacketHeader.PKT_LENGTHS_LEN);
+ curPacketBuf.limit(PacketHeader.PKT_LENGTHS_LEN +
+ dataPlusChecksumLen + headerLen);
doReadFully(ch, in, curPacketBuf);
curPacketBuf.flip();
+ curPacketBuf.position(PacketHeader.PKT_LENGTHS_LEN);
- // Extract the header from the front of the buffer.
+ // Extract the header from the front of the buffer (after the length prefixes)
byte[] headerBuf = new byte[headerLen];
curPacketBuf.get(headerBuf);
if (curHeader == null) {
@@ -197,10 +197,6 @@ public class PacketReceiver implements C
public void mirrorPacketTo(DataOutputStream mirrorOut) throws IOException {
Preconditions.checkState(!useDirectBuffers,
"Currently only supported for non-direct buffers");
- assert lengthPrefixBuf.capacity() == PacketHeader.PKT_LENGTHS_LEN;
- mirrorOut.write(lengthPrefixBuf.array(),
- lengthPrefixBuf.arrayOffset(),
- lengthPrefixBuf.capacity());
mirrorOut.write(curPacketBuf.array(),
curPacketBuf.arrayOffset(),
curPacketBuf.remaining());
@@ -223,23 +219,36 @@ public class PacketReceiver implements C
private void reslicePacket(
int headerLen, int checksumsLen, int dataLen) {
+ // Packet structure (refer to doRead() for details):
+ // PLEN HLEN HEADER CHECKSUMS DATA
+ // 32-bit 16-bit <protobuf> <variable length>
+ // |--- lenThroughHeader ----|
+ // |----------- lenThroughChecksums ----|
+ // |------------------- lenThroughData ------|
+ int lenThroughHeader = PacketHeader.PKT_LENGTHS_LEN + headerLen;
+ int lenThroughChecksums = lenThroughHeader + checksumsLen;
+ int lenThroughData = lenThroughChecksums + dataLen;
+
assert dataLen >= 0 : "invalid datalen: " + dataLen;
-
- assert curPacketBuf.position() == headerLen;
- assert checksumsLen + dataLen == curPacketBuf.remaining() :
+ assert curPacketBuf.position() == lenThroughHeader;
+ assert curPacketBuf.limit() == lenThroughData :
"headerLen= " + headerLen + " clen=" + checksumsLen + " dlen=" + dataLen +
" rem=" + curPacketBuf.remaining();
-
- curPacketBuf.position(headerLen);
- curPacketBuf.limit(headerLen + checksumsLen);
+
+ // Slice the checksums.
+ curPacketBuf.position(lenThroughHeader);
+ curPacketBuf.limit(lenThroughChecksums);
curChecksumSlice = curPacketBuf.slice();
- curPacketBuf.position(headerLen + checksumsLen);
- curPacketBuf.limit(headerLen + checksumsLen + dataLen);
+ // Slice the data.
+ curPacketBuf.position(lenThroughChecksums);
+ curPacketBuf.limit(lenThroughData);
curDataSlice = curPacketBuf.slice();
+ // Reset buffer to point to the entirety of the packet (including
+ // length prefixes)
curPacketBuf.position(0);
- curPacketBuf.limit(headerLen + checksumsLen + dataLen);
+ curPacketBuf.limit(lenThroughData);
}
@@ -258,12 +267,21 @@ public class PacketReceiver implements C
// one.
if (curPacketBuf == null ||
curPacketBuf.capacity() < atLeastCapacity) {
- returnPacketBufToPool();
+ ByteBuffer newBuf;
if (useDirectBuffers) {
- curPacketBuf = bufferPool.getBuffer(atLeastCapacity);
+ newBuf = bufferPool.getBuffer(atLeastCapacity);
} else {
- curPacketBuf = ByteBuffer.allocate(atLeastCapacity);
+ newBuf = ByteBuffer.allocate(atLeastCapacity);
}
+ // If reallocing an existing buffer, copy the old packet length
+ // prefixes over
+ if (curPacketBuf != null) {
+ curPacketBuf.flip();
+ newBuf.put(curPacketBuf);
+ }
+
+ returnPacketBufToPool();
+ curPacketBuf = newBuf;
}
}
Modified: hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java?rev=1400219&r1=1400218&r2=1400219&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java Fri Oct 19 18:49:38 2012
@@ -288,7 +288,7 @@ public class BlockManager {
}
private static BlockTokenSecretManager createBlockTokenSecretManager(
- final Configuration conf) throws IOException {
+ final Configuration conf) {
final boolean isEnabled = conf.getBoolean(
DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY,
DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_DEFAULT);
@@ -1260,7 +1260,7 @@ public class BlockManager {
// Move the block-replication into a "pending" state.
// The reason we use 'pending' is so we can retry
// replications that fail after an appropriate amount of time.
- pendingReplications.add(block, targets.length);
+ pendingReplications.increment(block, targets.length);
if(NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug(
"BLOCK* block " + block
@@ -1306,8 +1306,11 @@ public class BlockManager {
/**
* Choose target datanodes according to the replication policy.
- * @throws IOException if the number of targets < minimum replication.
- * @see BlockPlacementPolicy#chooseTarget(String, int, DatanodeDescriptor, HashMap, long)
+ *
+ * @throws IOException
+ * if the number of targets < minimum replication.
+ * @see BlockPlacementPolicy#chooseTarget(String, int, DatanodeDescriptor,
+ * List, boolean, HashMap, long)
*/
public DatanodeDescriptor[] chooseTarget(final String src,
final int numOfReplicas, final DatanodeDescriptor client,
@@ -1811,7 +1814,7 @@ assert storedBlock.findDatanode(dn) < 0
/**
* Queue the given reported block for later processing in the
- * standby node. {@see PendingDataNodeMessages}.
+ * standby node. @see PendingDataNodeMessages.
* @param reason a textual reason to report in the debug logs
*/
private void queueReportedBlock(DatanodeDescriptor dn, Block block,
@@ -1976,14 +1979,15 @@ assert storedBlock.findDatanode(dn) < 0
}
/**
- * Faster version of {@link addStoredBlock()}, intended for use with
- * initial block report at startup. If not in startup safe mode, will
- * call standard addStoredBlock().
- * Assumes this method is called "immediately" so there is no need to
- * refresh the storedBlock from blocksMap.
- * Doesn't handle underReplication/overReplication, or worry about
+ * Faster version of
+ * {@link #addStoredBlock(BlockInfo, DatanodeDescriptor, DatanodeDescriptor, boolean)}
+ * , intended for use with initial block report at startup. If not in startup
+ * safe mode, will call standard addStoredBlock(). Assumes this method is
+ * called "immediately" so there is no need to refresh the storedBlock from
+ * blocksMap. Doesn't handle underReplication/overReplication, or worry about
* pendingReplications or corruptReplicas, because it's in startup safe mode.
* Doesn't log every block, because there are typically millions of them.
+ *
* @throws IOException
*/
private void addStoredBlockImmediate(BlockInfo storedBlock,
@@ -2505,7 +2509,7 @@ assert storedBlock.findDatanode(dn) < 0
//
// Modify the blocks->datanode map and node's map.
//
- pendingReplications.remove(block);
+ pendingReplications.decrement(block);
processAndHandleReportedBlock(node, block, ReplicaState.FINALIZED,
delHintNode);
}
@@ -2641,7 +2645,7 @@ assert storedBlock.findDatanode(dn) < 0
}
/**
- * Simpler, faster form of {@link countNodes()} that only returns the number
+ * Simpler, faster form of {@link #countNodes(Block)} that only returns the number
* of live nodes. If in startup safemode (or its 30-sec extension period),
* then it gains speed by ignoring issues of excess replicas or nodes
* that are decommissioned or in process of becoming decommissioned.
@@ -2790,6 +2794,8 @@ assert storedBlock.findDatanode(dn) < 0
addToInvalidates(block);
corruptReplicas.removeFromCorruptReplicasMap(block);
blocksMap.removeBlock(block);
+ // Remove the block from pendingReplications
+ pendingReplications.remove(block);
if (postponedMisreplicatedBlocks.remove(block)) {
postponedMisreplicatedBlocksCount--;
}
@@ -2856,6 +2862,9 @@ assert storedBlock.findDatanode(dn) < 0
* @return number of blocks scheduled for removal during this iteration.
*/
private int invalidateWorkForOneNode(String nodeId) {
+ final List<Block> toInvalidate;
+ final DatanodeDescriptor dn;
+
namesystem.writeLock();
try {
// blocks should not be replicated or removed if safe mode is on
@@ -2865,10 +2874,23 @@ assert storedBlock.findDatanode(dn) < 0
}
// get blocks to invalidate for the nodeId
assert nodeId != null;
- return invalidateBlocks.invalidateWork(nodeId);
+ dn = datanodeManager.getDatanode(nodeId);
+ if (dn == null) {
+ invalidateBlocks.remove(nodeId);
+ return 0;
+ }
+ toInvalidate = invalidateBlocks.invalidateWork(nodeId, dn);
+ if (toInvalidate == null) {
+ return 0;
+ }
} finally {
namesystem.writeUnlock();
}
+ if (NameNode.stateChangeLog.isInfoEnabled()) {
+ NameNode.stateChangeLog.info("BLOCK* " + getClass().getSimpleName()
+ + ": ask " + dn + " to delete " + toInvalidate);
+ }
+ return toInvalidate.size();
}
boolean blockHasEnoughRacks(Block b) {
Modified: hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java?rev=1400219&r1=1400218&r2=1400219&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java Fri Oct 19 18:49:38 2012
@@ -276,11 +276,11 @@ public class DatanodeDescriptor extends
}
public void resetBlocks() {
- this.capacity = 0;
- this.remaining = 0;
- this.blockPoolUsed = 0;
- this.dfsUsed = 0;
- this.xceiverCount = 0;
+ setCapacity(0);
+ setRemaining(0);
+ setBlockPoolUsed(0);
+ setDfsUsed(0);
+ setXceiverCount(0);
this.blockList = null;
this.invalidateBlocks.clear();
this.volumeFailures = 0;
@@ -303,15 +303,15 @@ public class DatanodeDescriptor extends
*/
public void updateHeartbeat(long capacity, long dfsUsed, long remaining,
long blockPoolUsed, int xceiverCount, int volFailures) {
- this.capacity = capacity;
- this.dfsUsed = dfsUsed;
- this.remaining = remaining;
- this.blockPoolUsed = blockPoolUsed;
- this.lastUpdate = Time.now();
- this.xceiverCount = xceiverCount;
+ setCapacity(capacity);
+ setRemaining(remaining);
+ setBlockPoolUsed(blockPoolUsed);
+ setDfsUsed(dfsUsed);
+ setXceiverCount(xceiverCount);
+ setLastUpdate(Time.now());
this.volumeFailures = volFailures;
this.heartbeatedSinceFailover = true;
- rollBlocksScheduled(lastUpdate);
+ rollBlocksScheduled(getLastUpdate());
}
/**
Modified: hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java?rev=1400219&r1=1400218&r2=1400219&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java Fri Oct 19 18:49:38 2012
@@ -567,7 +567,7 @@ public class DatanodeManager {
/**
* Decommission the node if it is in exclude list.
*/
- private void checkDecommissioning(DatanodeDescriptor nodeReg, String ipAddr) {
+ private void checkDecommissioning(DatanodeDescriptor nodeReg) {
// If the registered node is in exclude list, then decommission it
if (inExcludedHostsList(nodeReg)) {
startDecommission(nodeReg);
@@ -713,7 +713,7 @@ public class DatanodeManager {
// also treat the registration message as a heartbeat
heartbeatManager.register(nodeS);
- checkDecommissioning(nodeS, dnAddress);
+ checkDecommissioning(nodeS);
return;
}
@@ -733,7 +733,7 @@ public class DatanodeManager {
= new DatanodeDescriptor(nodeReg, NetworkTopology.DEFAULT_RACK);
resolveNetworkLocation(nodeDescr);
addDatanode(nodeDescr);
- checkDecommissioning(nodeDescr, dnAddress);
+ checkDecommissioning(nodeDescr);
// also treat the registration message as a heartbeat
// no need to update its timestamp
@@ -885,7 +885,7 @@ public class DatanodeManager {
* @return Return the current number of stale DataNodes (detected by
* HeartbeatManager).
*/
- int getNumStaleNodes() {
+ public int getNumStaleNodes() {
return this.numStaleNodes;
}
Modified: hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/InvalidateBlocks.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/InvalidateBlocks.java?rev=1400219&r1=1400218&r2=1400219&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/InvalidateBlocks.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/InvalidateBlocks.java Fri Oct 19 18:49:38 2012
@@ -134,26 +134,7 @@ class InvalidateBlocks {
return new ArrayList<String>(node2blocks.keySet());
}
- /** Invalidate work for the storage. */
- int invalidateWork(final String storageId) {
- final DatanodeDescriptor dn = datanodeManager.getDatanode(storageId);
- if (dn == null) {
- remove(storageId);
- return 0;
- }
- final List<Block> toInvalidate = invalidateWork(storageId, dn);
- if (toInvalidate == null) {
- return 0;
- }
-
- if (NameNode.stateChangeLog.isInfoEnabled()) {
- NameNode.stateChangeLog.info("BLOCK* " + getClass().getSimpleName()
- + ": ask " + dn + " to delete " + toInvalidate);
- }
- return toInvalidate.size();
- }
-
- private synchronized List<Block> invalidateWork(
+ synchronized List<Block> invalidateWork(
final String storageId, final DatanodeDescriptor dn) {
final LightWeightHashSet<Block> set = node2blocks.get(storageId);
if (set == null) {
Modified: hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingReplicationBlocks.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingReplicationBlocks.java?rev=1400219&r1=1400218&r2=1400219&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingReplicationBlocks.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingReplicationBlocks.java Fri Oct 19 18:49:38 2012
@@ -72,7 +72,7 @@ class PendingReplicationBlocks {
/**
* Add a block to the list of pending Replications
*/
- void add(Block block, int numReplicas) {
+ void increment(Block block, int numReplicas) {
synchronized (pendingReplications) {
PendingBlockInfo found = pendingReplications.get(block);
if (found == null) {
@@ -89,7 +89,7 @@ class PendingReplicationBlocks {
* Decrement the number of pending replication requests
* for this block.
*/
- void remove(Block block) {
+ void decrement(Block block) {
synchronized (pendingReplications) {
PendingBlockInfo found = pendingReplications.get(block);
if (found != null) {
@@ -104,6 +104,16 @@ class PendingReplicationBlocks {
}
}
+ /**
+ * Remove the record about the given block from pendingReplications.
+ * @param block The given block whose pending replication requests need to be
+ * removed
+ */
+ void remove(Block block) {
+ synchronized (pendingReplications) {
+ pendingReplications.remove(block);
+ }
+ }
public void clear() {
synchronized (pendingReplications) {
Modified: hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/GenerationStamp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/GenerationStamp.java?rev=1400219&r1=1400218&r2=1400219&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/GenerationStamp.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/GenerationStamp.java Fri Oct 19 18:49:38 2012
@@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hdfs.server.common;
+import java.util.concurrent.atomic.AtomicLong;
+
import org.apache.hadoop.classification.InterfaceAudience;
/****************************************************************
@@ -35,7 +37,7 @@ public class GenerationStamp implements
*/
public static final long GRANDFATHER_GENERATION_STAMP = 0;
- private volatile long genstamp;
+ private AtomicLong genstamp = new AtomicLong();
/**
* Create a new instance, initialized to FIRST_VALID_STAMP.
@@ -48,35 +50,36 @@ public class GenerationStamp implements
* Create a new instance, initialized to the specified value.
*/
GenerationStamp(long stamp) {
- this.genstamp = stamp;
+ genstamp.set(stamp);
}
/**
* Returns the current generation stamp
*/
public long getStamp() {
- return this.genstamp;
+ return genstamp.get();
}
/**
* Sets the current generation stamp
*/
public void setStamp(long stamp) {
- this.genstamp = stamp;
+ genstamp.set(stamp);
}
/**
* First increments the counter and then returns the stamp
*/
- public synchronized long nextStamp() {
- this.genstamp++;
- return this.genstamp;
+ public long nextStamp() {
+ return genstamp.incrementAndGet();
}
@Override // Comparable
public int compareTo(GenerationStamp that) {
- return this.genstamp < that.genstamp ? -1 :
- this.genstamp > that.genstamp ? 1 : 0;
+ long stamp1 = this.genstamp.get();
+ long stamp2 = that.genstamp.get();
+ return stamp1 < stamp2 ? -1 :
+ stamp1 > stamp2 ? 1 : 0;
}
@Override // Object
@@ -89,6 +92,7 @@ public class GenerationStamp implements
@Override // Object
public int hashCode() {
- return (int) (genstamp^(genstamp>>>32));
+ long stamp = genstamp.get();
+ return (int) (stamp^(stamp>>>32));
}
}
Modified: hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java?rev=1400219&r1=1400218&r2=1400219&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java Fri Oct 19 18:49:38 2012
@@ -431,16 +431,16 @@ public class DirectoryScanner implements
private Map<String, ScanInfo[]> getDiskReport() {
// First get list of data directories
final List<? extends FsVolumeSpi> volumes = dataset.getVolumes();
- ArrayList<ScanInfoPerBlockPool> dirReports =
- new ArrayList<ScanInfoPerBlockPool>(volumes.size());
-
+
+ // Use an array since the threads may return out of order and
+ // compilersInProgress#keySet may return out of order as well.
+ ScanInfoPerBlockPool[] dirReports = new ScanInfoPerBlockPool[volumes.size()];
+
Map<Integer, Future<ScanInfoPerBlockPool>> compilersInProgress =
new HashMap<Integer, Future<ScanInfoPerBlockPool>>();
+
for (int i = 0; i < volumes.size(); i++) {
- if (!isValid(dataset, volumes.get(i))) {
- // volume is invalid
- dirReports.add(i, null);
- } else {
+ if (isValid(dataset, volumes.get(i))) {
ReportCompiler reportCompiler =
new ReportCompiler(volumes.get(i));
Future<ScanInfoPerBlockPool> result =
@@ -452,7 +452,7 @@ public class DirectoryScanner implements
for (Entry<Integer, Future<ScanInfoPerBlockPool>> report :
compilersInProgress.entrySet()) {
try {
- dirReports.add(report.getKey(), report.getValue().get());
+ dirReports[report.getKey()] = report.getValue().get();
} catch (Exception ex) {
LOG.error("Error compiling report", ex);
// Propagate ex to DataBlockScanner to deal with
@@ -465,7 +465,7 @@ public class DirectoryScanner implements
for (int i = 0; i < volumes.size(); i++) {
if (isValid(dataset, volumes.get(i))) {
// volume is still valid
- list.addAll(dirReports.get(i));
+ list.addAll(dirReports[i]);
}
}
Modified: hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java?rev=1400219&r1=1400218&r2=1400219&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java Fri Oct 19 18:49:38 2012
@@ -290,13 +290,18 @@ public class FSDirectory implements Clos
try {
newNode = addNode(path, newNode, UNKNOWN_DISK_SPACE);
} catch (IOException e) {
+ if(NameNode.stateChangeLog.isDebugEnabled()) {
+ NameNode.stateChangeLog.debug(
+ "DIR* FSDirectory.unprotectedAddFile: exception when add " + path
+ + " to the file system", e);
+ }
return null;
}
return newNode;
}
INodeDirectory addToParent(byte[] src, INodeDirectory parentINode,
- INode newNode, boolean propagateModTime) throws UnresolvedLinkException {
+ INode newNode, boolean propagateModTime) {
// NOTE: This does not update space counts for parents
INodeDirectory newParent = null;
writeLock();
Modified: hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java?rev=1400219&r1=1400218&r2=1400219&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java Fri Oct 19 18:49:38 2012
@@ -1174,6 +1174,11 @@ public class FSEditLog implements LogsPu
// TODO: are we sure this is OK?
}
}
+
+ public void selectInputStreams(Collection<EditLogInputStream> streams,
+ long fromTxId, boolean inProgressOk) {
+ journalSet.selectInputStreams(streams, fromTxId, inProgressOk);
+ }
public Collection<EditLogInputStream> selectInputStreams(
long fromTxId, long toAtLeastTxId) throws IOException {
@@ -1191,7 +1196,7 @@ public class FSEditLog implements LogsPu
long fromTxId, long toAtLeastTxId, MetaRecoveryContext recovery,
boolean inProgressOk) throws IOException {
List<EditLogInputStream> streams = new ArrayList<EditLogInputStream>();
- journalSet.selectInputStreams(streams, fromTxId, inProgressOk);
+ selectInputStreams(streams, fromTxId, inProgressOk);
try {
checkForGaps(streams, fromTxId, toAtLeastTxId, inProgressOk);
Modified: hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1400219&r1=1400218&r2=1400219&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Fri Oct 19 18:49:38 2012
@@ -4677,6 +4677,13 @@ public class FSNamesystem implements Nam
public int getNumDeadDataNodes() {
return getBlockManager().getDatanodeManager().getNumDeadDataNodes();
}
+
+ @Override // FSNamesystemMBean
+ @Metric({"StaleDataNodes",
+ "Number of datanodes marked stale due to delayed heartbeat"})
+ public int getNumStaleDataNodes() {
+ return getBlockManager().getDatanodeManager().getNumStaleNodes();
+ }
/**
* Sets the generation stamp for this filesystem
Modified: hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java?rev=1400219&r1=1400218&r2=1400219&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java Fri Oct 19 18:49:38 2012
@@ -247,6 +247,11 @@ public class FileJournalManager implemen
LOG.debug(this + ": selecting input streams starting at " + fromTxId +
(inProgressOk ? " (inProgress ok) " : " (excluding inProgress) ") +
"from among " + elfs.size() + " candidate file(s)");
+ addStreamsToCollectionFromFiles(elfs, streams, fromTxId, inProgressOk);
+ }
+
+ static void addStreamsToCollectionFromFiles(Collection<EditLogFile> elfs,
+ Collection<EditLogInputStream> streams, long fromTxId, boolean inProgressOk) {
for (EditLogFile elf : elfs) {
if (elf.isInProgress()) {
if (!inProgressOk) {
Modified: hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java?rev=1400219&r1=1400218&r2=1400219&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java Fri Oct 19 18:49:38 2012
@@ -96,13 +96,6 @@ abstract class INode implements Comparab
}
}
- protected INode() {
- name = null;
- parent = null;
- modificationTime = 0;
- accessTime = 0;
- }
-
INode(PermissionStatus permissions, long mTime, long atime) {
this.name = null;
this.parent = null;
Modified: hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalManager.java?rev=1400219&r1=1400218&r2=1400219&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalManager.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalManager.java Fri Oct 19 18:49:38 2012
@@ -19,7 +19,6 @@ package org.apache.hadoop.hdfs.server.na
import java.io.Closeable;
import java.io.IOException;
-import java.util.Collection;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
@@ -56,21 +55,6 @@ public interface JournalManager extends
*/
void finalizeLogSegment(long firstTxId, long lastTxId) throws IOException;
- /**
- * Get a list of edit log input streams. The list will start with the
- * stream that contains fromTxnId, and continue until the end of the journal
- * being managed.
- *
- * @param fromTxnId the first transaction id we want to read
- * @param inProgressOk whether or not in-progress streams should be returned
- *
- * @return a list of streams
- * @throws IOException if the underlying storage has an error or is otherwise
- * inaccessible
- */
- void selectInputStreams(Collection<EditLogInputStream> streams,
- long fromTxnId, boolean inProgressOk) throws IOException;
-
/**
* Set the amount of memory that this stream should use to buffer edits
*/
Modified: hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LogsPurgeable.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LogsPurgeable.java?rev=1400219&r1=1400218&r2=1400219&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LogsPurgeable.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LogsPurgeable.java Fri Oct 19 18:49:38 2012
@@ -18,6 +18,7 @@
package org.apache.hadoop.hdfs.server.namenode;
import java.io.IOException;
+import java.util.Collection;
/**
* Interface used to abstract over classes which manage edit logs that may need
@@ -33,5 +34,20 @@ interface LogsPurgeable {
* @throws IOException in the event of error
*/
public void purgeLogsOlderThan(long minTxIdToKeep) throws IOException;
-
+
+ /**
+ * Get a list of edit log input streams. The list will start with the
+ * stream that contains fromTxnId, and continue until the end of the journal
+ * being managed.
+ *
+ * @param fromTxId the first transaction id we want to read
+ * @param inProgressOk whether or not in-progress streams should be returned
+ *
+ * @return a list of streams
+ * @throws IOException if the underlying storage has an error or is otherwise
+ * inaccessible
+ */
+ void selectInputStreams(Collection<EditLogInputStream> streams,
+ long fromTxId, boolean inProgressOk) throws IOException;
+
}
Modified: hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NNStorageRetentionManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NNStorageRetentionManager.java?rev=1400219&r1=1400218&r2=1400219&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NNStorageRetentionManager.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NNStorageRetentionManager.java Fri Oct 19 18:49:38 2012
@@ -19,7 +19,9 @@ package org.apache.hadoop.hdfs.server.na
import java.io.File;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Collections;
+import java.util.Comparator;
import java.util.List;
import java.util.TreeSet;
@@ -32,6 +34,7 @@ import org.apache.hadoop.hdfs.server.nam
import org.apache.hadoop.hdfs.util.MD5FileUtils;
import com.google.common.base.Preconditions;
+import com.google.common.collect.ComparisonChain;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
@@ -48,6 +51,7 @@ public class NNStorageRetentionManager {
private final int numCheckpointsToRetain;
private final long numExtraEditsToRetain;
+ private final int maxExtraEditsSegmentsToRetain;
private static final Log LOG = LogFactory.getLog(
NNStorageRetentionManager.class);
private final NNStorage storage;
@@ -65,6 +69,9 @@ public class NNStorageRetentionManager {
this.numExtraEditsToRetain = conf.getLong(
DFSConfigKeys.DFS_NAMENODE_NUM_EXTRA_EDITS_RETAINED_KEY,
DFSConfigKeys.DFS_NAMENODE_NUM_EXTRA_EDITS_RETAINED_DEFAULT);
+ this.maxExtraEditsSegmentsToRetain = conf.getInt(
+ DFSConfigKeys.DFS_NAMENODE_MAX_EXTRA_EDITS_SEGMENTS_RETAINED_KEY,
+ DFSConfigKeys.DFS_NAMENODE_MAX_EXTRA_EDITS_SEGMENTS_RETAINED_DEFAULT);
Preconditions.checkArgument(numCheckpointsToRetain > 0,
"Must retain at least one checkpoint");
Preconditions.checkArgument(numExtraEditsToRetain >= 0,
@@ -94,7 +101,39 @@ public class NNStorageRetentionManager {
// provide a "cushion" of older txns that we keep, which is
// handy for HA, where a remote node may not have as many
// new images.
- long purgeLogsFrom = Math.max(0, minImageTxId + 1 - numExtraEditsToRetain);
+ //
+ // First, determine the target number of extra transactions to retain based
+ // on the configured amount.
+ long minimumRequiredTxId = minImageTxId + 1;
+ long purgeLogsFrom = Math.max(0, minimumRequiredTxId - numExtraEditsToRetain);
+
+ ArrayList<EditLogInputStream> editLogs = new ArrayList<EditLogInputStream>();
+ purgeableLogs.selectInputStreams(editLogs, purgeLogsFrom, false);
+ Collections.sort(editLogs, new Comparator<EditLogInputStream>() {
+ @Override
+ public int compare(EditLogInputStream a, EditLogInputStream b) {
+ return ComparisonChain.start()
+ .compare(a.getFirstTxId(), b.getFirstTxId())
+ .compare(a.getLastTxId(), b.getLastTxId())
+ .result();
+ }
+ });
+
+ // Next, adjust the number of transactions to retain if doing so would mean
+ // keeping too many segments around.
+ while (editLogs.size() > maxExtraEditsSegmentsToRetain) {
+ purgeLogsFrom = editLogs.get(0).getFirstTxId();
+ editLogs.remove(0);
+ }
+
+ // Finally, ensure that we're not trying to purge any transactions that we
+ // actually need.
+ if (purgeLogsFrom > minimumRequiredTxId) {
+ throw new AssertionError("Should not purge more edits than required to "
+ + "restore: " + purgeLogsFrom + " should be <= "
+ + minimumRequiredTxId);
+ }
+
purgeableLogs.purgeLogsOlderThan(purgeLogsFrom);
}
Modified: hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java?rev=1400219&r1=1400218&r2=1400219&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java Fri Oct 19 18:49:38 2012
@@ -751,6 +751,24 @@ public class SecondaryNameNode implement
}
}
}
+
+ @Override
+ public void selectInputStreams(Collection<EditLogInputStream> streams,
+ long fromTxId, boolean inProgressOk) {
+ Iterator<StorageDirectory> iter = storage.dirIterator();
+ while (iter.hasNext()) {
+ StorageDirectory dir = iter.next();
+ List<EditLogFile> editFiles;
+ try {
+ editFiles = FileJournalManager.matchEditLogs(
+ dir.getCurrentDir());
+ } catch (IOException ioe) {
+ throw new RuntimeException(ioe);
+ }
+ FileJournalManager.addStreamsToCollectionFromFiles(editFiles, streams,
+ fromTxId, inProgressOk);
+ }
+ }
}
Modified: hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/FSNamesystemMBean.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/FSNamesystemMBean.java?rev=1400219&r1=1400218&r2=1400219&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/FSNamesystemMBean.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/FSNamesystemMBean.java Fri Oct 19 18:49:38 2012
@@ -112,4 +112,10 @@ public interface FSNamesystemMBean {
* @return number of dead data nodes
*/
public int getNumDeadDataNodes();
+
+ /**
+ * Number of stale data nodes
+ * @return number of stale data nodes
+ */
+ public int getNumStaleDataNodes();
}
Modified: hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeRegistration.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeRegistration.java?rev=1400219&r1=1400218&r2=1400219&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeRegistration.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeRegistration.java Fri Oct 19 18:49:38 2012
@@ -82,9 +82,9 @@ public class DatanodeRegistration extend
public String toString() {
return getClass().getSimpleName()
+ "(" + getIpAddr()
- + ", storageID=" + storageID
- + ", infoPort=" + infoPort
- + ", ipcPort=" + ipcPort
+ + ", storageID=" + getStorageID()
+ + ", infoPort=" + getInfoPort()
+ + ", ipcPort=" + getIpcPort()
+ ", storageInfo=" + storageInfo
+ ")";
}
Modified: hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/ImageLoaderCurrent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/ImageLoaderCurrent.java?rev=1400219&r1=1400218&r2=1400219&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/ImageLoaderCurrent.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/ImageLoaderCurrent.java Fri Oct 19 18:49:38 2012
@@ -165,7 +165,7 @@ class ImageLoaderCurrent implements Imag
if (LayoutVersion.supports(Feature.FSIMAGE_COMPRESSION, imageVersion)) {
boolean isCompressed = in.readBoolean();
- v.visit(ImageElement.IS_COMPRESSED, imageVersion);
+ v.visit(ImageElement.IS_COMPRESSED, String.valueOf(isCompressed));
if (isCompressed) {
String codecClassName = Text.readString(in);
v.visit(ImageElement.COMPRESS_CODEC, codecClassName);
Modified: hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml?rev=1400219&r1=1400218&r2=1400219&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml (original)
+++ hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml Fri Oct 19 18:49:38 2012
@@ -358,7 +358,7 @@
<property>
<name>dfs.blocksize</name>
- <value>67108864</value>
+ <value>134217728</value>
<description>
The default block size for new files, in bytes.
You can use the following suffix (case insensitive):
@@ -660,6 +660,20 @@
edits in order to start again.
Typically each edit is on the order of a few hundred bytes, so the default
of 1 million edits should be on the order of hundreds of MBs or low GBs.
+
+ NOTE: Fewer extra edits may be retained than value specified for this setting
+ if doing so would mean that more segments would be retained than the number
+ configured by dfs.namenode.max.extra.edits.segments.retained.
+ </description>
+</property>
+
+<property>
+ <name>dfs.namenode.max.extra.edits.segments.retained</name>
+ <value>10000</value>
+ <description>The maximum number of extra edit log segments which should be retained
+ beyond what is minimally necessary for a NN restart. When used in conjunction with
+ dfs.namenode.num.extra.edits.retained, this configuration property serves to cap
+ the number of extra edits files to a reasonable value.
</description>
</property>
Modified: hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestMultiThreadedHflush.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestMultiThreadedHflush.java?rev=1400219&r1=1400218&r2=1400219&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestMultiThreadedHflush.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestMultiThreadedHflush.java Fri Oct 19 18:49:38 2012
@@ -20,45 +20,40 @@ package org.apache.hadoop.hdfs;
import java.io.IOException;
import java.util.ArrayList;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
-import org.apache.commons.logging.LogFactory;
-import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.server.datanode.DataNode;
-import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
-import org.apache.hadoop.hdfs.server.namenode.LeaseManager;
-import org.apache.hadoop.hdfs.server.namenode.NameNode;
-import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
-import org.apache.log4j.Level;
+import org.apache.hadoop.metrics2.util.Quantile;
+import org.apache.hadoop.metrics2.util.SampleQuantiles;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
import org.junit.Test;
+import com.google.common.base.Stopwatch;
+
/**
* This class tests hflushing concurrently from many threads.
*/
public class TestMultiThreadedHflush {
static final int blockSize = 1024*1024;
- static final int numBlocks = 10;
- static final int fileSize = numBlocks * blockSize + 1;
private static final int NUM_THREADS = 10;
private static final int WRITE_SIZE = 517;
private static final int NUM_WRITES_PER_THREAD = 1000;
private byte[] toWrite = null;
-
- {
- ((Log4JLogger)NameNode.stateChangeLog).getLogger().setLevel(Level.ALL);
- ((Log4JLogger)LeaseManager.LOG).getLogger().setLevel(Level.ALL);
- ((Log4JLogger)LogFactory.getLog(FSNamesystem.class)).getLogger().setLevel(Level.ALL);
- ((Log4JLogger)DataNode.LOG).getLogger().setLevel(Level.ALL);
- ((Log4JLogger)DFSClient.LOG).getLogger().setLevel(Level.ALL);
- ((Log4JLogger)InterDatanodeProtocol.LOG).getLogger().setLevel(Level.ALL);
- }
+
+ private final SampleQuantiles quantiles = new SampleQuantiles(
+ new Quantile[] {
+ new Quantile(0.50, 0.050),
+ new Quantile(0.75, 0.025), new Quantile(0.90, 0.010),
+ new Quantile(0.95, 0.005), new Quantile(0.99, 0.001) });
/*
* creates a file but does not close it
@@ -104,8 +99,11 @@ public class TestMultiThreadedHflush {
}
private void doAWrite() throws IOException {
+ Stopwatch sw = new Stopwatch().start();
stm.write(toWrite);
stm.hflush();
+ long micros = sw.elapsedTime(TimeUnit.MICROSECONDS);
+ quantiles.insert(micros);
}
}
@@ -115,14 +113,28 @@ public class TestMultiThreadedHflush {
* They all finish before the file is closed.
*/
@Test
- public void testMultipleHflushers() throws Exception {
+ public void testMultipleHflushersRepl1() throws Exception {
+ doTestMultipleHflushers(1);
+ }
+
+ @Test
+ public void testMultipleHflushersRepl3() throws Exception {
+ doTestMultipleHflushers(3);
+ }
+
+ private void doTestMultipleHflushers(int repl) throws Exception {
Configuration conf = new Configuration();
- MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
+ MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+ .numDataNodes(repl)
+ .build();
FileSystem fs = cluster.getFileSystem();
Path p = new Path("/multiple-hflushers.dat");
try {
- doMultithreadedWrites(conf, p, NUM_THREADS, WRITE_SIZE, NUM_WRITES_PER_THREAD);
+ doMultithreadedWrites(conf, p, NUM_THREADS, WRITE_SIZE,
+ NUM_WRITES_PER_THREAD, repl);
+ System.out.println("Latency quantiles (in microseconds):\n" +
+ quantiles);
} finally {
fs.close();
cluster.shutdown();
@@ -200,13 +212,13 @@ public class TestMultiThreadedHflush {
}
public void doMultithreadedWrites(
- Configuration conf, Path p, int numThreads, int bufferSize, int numWrites)
- throws Exception {
+ Configuration conf, Path p, int numThreads, int bufferSize, int numWrites,
+ int replication) throws Exception {
initBuffer(bufferSize);
// create a new file.
FileSystem fs = p.getFileSystem(conf);
- FSDataOutputStream stm = createFile(fs, p, 1);
+ FSDataOutputStream stm = createFile(fs, p, replication);
System.out.println("Created file simpleFlush.dat");
// There have been a couple issues with flushing empty buffers, so do
@@ -240,20 +252,41 @@ public class TestMultiThreadedHflush {
}
public static void main(String args[]) throws Exception {
- if (args.length != 1) {
- System.err.println(
- "usage: " + TestMultiThreadedHflush.class.getSimpleName() +
- " <path to test file> ");
- System.exit(1);
+ System.exit(ToolRunner.run(new CLIBenchmark(), args));
+ }
+
+ private static class CLIBenchmark extends Configured implements Tool {
+ public int run(String args[]) throws Exception {
+ if (args.length != 1) {
+ System.err.println(
+ "usage: " + TestMultiThreadedHflush.class.getSimpleName() +
+ " <path to test file> ");
+ System.err.println(
+ "Configurations settable by -D options:\n" +
+ " num.threads [default 10] - how many threads to run\n" +
+ " write.size [default 511] - bytes per write\n" +
+ " num.writes [default 50000] - how many writes to perform");
+ System.exit(1);
+ }
+ TestMultiThreadedHflush test = new TestMultiThreadedHflush();
+ Configuration conf = getConf();
+ Path p = new Path(args[0]);
+
+ int numThreads = conf.getInt("num.threads", 10);
+ int writeSize = conf.getInt("write.size", 511);
+ int numWrites = conf.getInt("num.writes", 50000);
+ int replication = conf.getInt(DFSConfigKeys.DFS_REPLICATION_KEY,
+ DFSConfigKeys.DFS_REPLICATION_DEFAULT);
+
+ Stopwatch sw = new Stopwatch().start();
+ test.doMultithreadedWrites(conf, p, numThreads, writeSize, numWrites,
+ replication);
+ sw.stop();
+
+ System.out.println("Finished in " + sw.elapsedMillis() + "ms");
+ System.out.println("Latency quantiles (in microseconds):\n" +
+ test.quantiles);
+ return 0;
}
- TestMultiThreadedHflush test = new TestMultiThreadedHflush();
- Configuration conf = new Configuration();
- Path p = new Path(args[0]);
- long st = System.nanoTime();
- test.doMultithreadedWrites(conf, p, 10, 511, 50000);
- long et = System.nanoTime();
-
- System.out.println("Finished in " + ((et - st) / 1000000) + "ms");
}
-
}
Modified: hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java?rev=1400219&r1=1400218&r2=1400219&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java Fri Oct 19 18:49:38 2012
@@ -372,7 +372,7 @@ public class TestBalancer {
* Test parse method in Balancer#Cli class with threshold value out of
* boundaries.
*/
- @Test
+ @Test(timeout=100000)
public void testBalancerCliParseWithThresholdOutOfBoundaries() {
String parameters[] = new String[] { "-threshold", "0" };
String reason = "IllegalArgumentException is expected when threshold value"
@@ -394,7 +394,7 @@ public class TestBalancer {
/** Test a cluster with even distribution,
* then a new empty node is added to the cluster*/
- @Test
+ @Test(timeout=100000)
public void testBalancer0() throws Exception {
testBalancer0Internal(new HdfsConfiguration());
}
@@ -406,7 +406,7 @@ public class TestBalancer {
}
/** Test unevenly distributed cluster */
- @Test
+ @Test(timeout=100000)
public void testBalancer1() throws Exception {
testBalancer1Internal(new HdfsConfiguration());
}
@@ -419,7 +419,7 @@ public class TestBalancer {
new String[] {RACK0, RACK1});
}
- @Test
+ @Test(timeout=100000)
public void testBalancer2() throws Exception {
testBalancer2Internal(new HdfsConfiguration());
}
@@ -467,8 +467,7 @@ public class TestBalancer {
/**
* Test parse method in Balancer#Cli class with wrong number of params
*/
-
- @Test
+ @Test(timeout=100000)
public void testBalancerCliParseWithWrongParams() {
String parameters[] = new String[] { "-threshold" };
String reason =
Modified: hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java?rev=1400219&r1=1400218&r2=1400219&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java Fri Oct 19 18:49:38 2012
@@ -191,4 +191,12 @@ public class BlockManagerTestUtil {
"Must use default policy, got %s", bpp.getClass());
((BlockPlacementPolicyDefault)bpp).setPreferLocalNode(prefer);
}
+
+ /**
+ * Call heartbeat check function of HeartbeatManager
+ * @param bm the BlockManager to manipulate
+ */
+ public static void checkHeartbeat(BlockManager bm) {
+ bm.getDatanodeManager().getHeartbeatManager().heartbeatCheck();
+ }
}
Modified: hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingReplication.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingReplication.java?rev=1400219&r1=1400218&r2=1400219&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingReplication.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingReplication.java Fri Oct 19 18:49:38 2012
@@ -20,14 +20,30 @@ package org.apache.hadoop.hdfs.server.bl
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
+import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.junit.Test;
/**
- * This class tests the internals of PendingReplicationBlocks.java
+ * This class tests the internals of PendingReplicationBlocks.java,
+ * as well as how PendingReplicationBlocks acts in BlockManager
*/
public class TestPendingReplication {
final static int TIMEOUT = 3; // 3 seconds
+ private static final int DFS_REPLICATION_INTERVAL = 1;
+ // Number of datanodes in the cluster
+ private static final int DATANODE_COUNT = 5;
@Test
public void testPendingReplication() {
@@ -40,7 +56,7 @@ public class TestPendingReplication {
//
for (int i = 0; i < 10; i++) {
Block block = new Block(i, i, 0);
- pendingReplications.add(block, i);
+ pendingReplications.increment(block, i);
}
assertEquals("Size of pendingReplications ",
10, pendingReplications.size());
@@ -50,15 +66,15 @@ public class TestPendingReplication {
// remove one item and reinsert it
//
Block blk = new Block(8, 8, 0);
- pendingReplications.remove(blk); // removes one replica
+ pendingReplications.decrement(blk); // removes one replica
assertEquals("pendingReplications.getNumReplicas ",
7, pendingReplications.getNumReplicas(blk));
for (int i = 0; i < 7; i++) {
- pendingReplications.remove(blk); // removes all replicas
+ pendingReplications.decrement(blk); // removes all replicas
}
assertTrue(pendingReplications.size() == 9);
- pendingReplications.add(blk, 8);
+ pendingReplications.increment(blk, 8);
assertTrue(pendingReplications.size() == 10);
//
@@ -86,7 +102,7 @@ public class TestPendingReplication {
for (int i = 10; i < 15; i++) {
Block block = new Block(i, i, 0);
- pendingReplications.add(block, i);
+ pendingReplications.increment(block, i);
}
assertTrue(pendingReplications.size() == 15);
@@ -116,4 +132,70 @@ public class TestPendingReplication {
}
pendingReplications.stop();
}
+
+ /**
+ * Test if BlockManager can correctly remove corresponding pending records
+ * when a file is deleted
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testPendingAndInvalidate() throws Exception {
+ final Configuration CONF = new HdfsConfiguration();
+ CONF.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 1024);
+ CONF.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY,
+ DFS_REPLICATION_INTERVAL);
+ CONF.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY,
+ DFS_REPLICATION_INTERVAL);
+ MiniDFSCluster cluster = new MiniDFSCluster.Builder(CONF).numDataNodes(
+ DATANODE_COUNT).build();
+ cluster.waitActive();
+
+ FSNamesystem namesystem = cluster.getNamesystem();
+ BlockManager bm = namesystem.getBlockManager();
+ DistributedFileSystem fs = cluster.getFileSystem();
+ try {
+ // 1. create a file
+ Path filePath = new Path("/tmp.txt");
+ DFSTestUtil.createFile(fs, filePath, 1024, (short) 3, 0L);
+
+ // 2. disable the heartbeats
+ for (DataNode dn : cluster.getDataNodes()) {
+ DataNodeTestUtils.setHeartbeatsDisabledForTests(dn, true);
+ }
+
+ // 3. mark a couple of blocks as corrupt
+ LocatedBlock block = NameNodeAdapter.getBlockLocations(
+ cluster.getNameNode(), filePath.toString(), 0, 1).get(0);
+ cluster.getNamesystem().writeLock();
+ try {
+ bm.findAndMarkBlockAsCorrupt(block.getBlock(), block.getLocations()[0],
+ "TEST");
+ bm.findAndMarkBlockAsCorrupt(block.getBlock(), block.getLocations()[1],
+ "TEST");
+ } finally {
+ cluster.getNamesystem().writeUnlock();
+ }
+ BlockManagerTestUtil.computeAllPendingWork(bm);
+ BlockManagerTestUtil.updateState(bm);
+ assertEquals(bm.getPendingReplicationBlocksCount(), 1L);
+ assertEquals(bm.pendingReplications.getNumReplicas(block.getBlock()
+ .getLocalBlock()), 2);
+
+ // 4. delete the file
+ fs.delete(filePath, true);
+ // retry at most 10 times, each time sleep for 1s. Note that 10s is much
+ // less than the default pending record timeout (5~10min)
+ int retries = 10;
+ long pendingNum = bm.getPendingReplicationBlocksCount();
+ while (pendingNum != 0 && retries-- > 0) {
+ Thread.sleep(1000); // let NN do the deletion
+ BlockManagerTestUtil.updateState(bm);
+ pendingNum = bm.getPendingReplicationBlocksCount();
+ }
+ assertEquals(pendingNum, 0L);
+ } finally {
+ cluster.shutdown();
+ }
+ }
}
Modified: hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestUnderReplicatedBlocks.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestUnderReplicatedBlocks.java?rev=1400219&r1=1400218&r2=1400219&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestUnderReplicatedBlocks.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestUnderReplicatedBlocks.java Fri Oct 19 18:49:38 2012
@@ -30,7 +30,7 @@ import org.apache.hadoop.hdfs.protocol.E
import org.junit.Test;
public class TestUnderReplicatedBlocks {
- @Test
+ @Test(timeout=300000) // 5 min timeout
public void testSetrepIncWithUnderReplicatedBlocks() throws Exception {
Configuration conf = new HdfsConfiguration();
final short REPLICATION_FACTOR = 2;
Modified: hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestMultipleNNDataBlockScanner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestMultipleNNDataBlockScanner.java?rev=1400219&r1=1400218&r2=1400219&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestMultipleNNDataBlockScanner.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestMultipleNNDataBlockScanner.java Fri Oct 19 18:49:38 2012
@@ -74,7 +74,7 @@ public class TestMultipleNNDataBlockScan
}
}
- @Test
+ @Test(timeout=120000)
public void testDataBlockScanner() throws IOException, InterruptedException {
setUp();
try {
@@ -97,7 +97,7 @@ public class TestMultipleNNDataBlockScan
}
}
- @Test
+ @Test(timeout=120000)
public void testBlockScannerAfterRefresh() throws IOException,
InterruptedException {
setUp();
@@ -149,7 +149,7 @@ public class TestMultipleNNDataBlockScan
}
}
- @Test
+ @Test(timeout=120000)
public void testBlockScannerAfterRestart() throws IOException,
InterruptedException {
setUp();
@@ -176,7 +176,7 @@ public class TestMultipleNNDataBlockScan
}
}
- @Test
+ @Test(timeout=120000)
public void test2NNBlockRescanInterval() throws IOException {
((Log4JLogger)BlockPoolSliceScanner.LOG).getLogger().setLevel(Level.ALL);
Configuration conf = new HdfsConfiguration();
@@ -206,7 +206,7 @@ public class TestMultipleNNDataBlockScan
*
* @throws Exception
*/
- @Test
+ @Test(timeout=120000)
public void testBlockRescanInterval() throws IOException {
((Log4JLogger)BlockPoolSliceScanner.LOG).getLogger().setLevel(Level.ALL);
Configuration conf = new HdfsConfiguration();
Modified: hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAuditLogs.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAuditLogs.java?rev=1400219&r1=1400218&r2=1400219&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAuditLogs.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAuditLogs.java Fri Oct 19 18:49:38 2012
@@ -117,7 +117,7 @@ public class TestAuditLogs {
int val = istream.read();
istream.close();
verifyAuditLogs(true);
- assertTrue("failed to read from file", val > 0);
+ assertTrue("failed to read from file", val >= 0);
}
/** test that allowed stat puts proper entry in audit log */
@@ -168,7 +168,7 @@ public class TestAuditLogs {
istream.close();
verifyAuditLogsRepeat(true, 3);
- assertTrue("failed to read from file", val > 0);
+ assertTrue("failed to read from file", val >= 0);
}
/** test that stat via webhdfs puts proper entry in audit log */
Modified: hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNNStorageRetentionManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNNStorageRetentionManager.java?rev=1400219&r1=1400218&r2=1400219&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNNStorageRetentionManager.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNNStorageRetentionManager.java Fri Oct 19 18:49:38 2012
@@ -22,6 +22,7 @@ import static org.apache.hadoop.hdfs.ser
import static org.apache.hadoop.hdfs.server.namenode.NNStorage.getInProgressEditsFileName;
import java.io.IOException;
+import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -196,6 +197,35 @@ public class TestNNStorageRetentionManag
runTest(tc);
}
+ @Test
+ public void testRetainExtraLogsLimitedSegments() throws IOException {
+ conf.setLong(DFSConfigKeys.DFS_NAMENODE_NUM_EXTRA_EDITS_RETAINED_KEY,
+ 150);
+ conf.setLong(DFSConfigKeys.DFS_NAMENODE_MAX_EXTRA_EDITS_SEGMENTS_RETAINED_KEY, 2);
+ TestCaseDescription tc = new TestCaseDescription();
+ tc.addRoot("/foo1", NameNodeDirType.IMAGE);
+ tc.addRoot("/foo2", NameNodeDirType.EDITS);
+ tc.addImage("/foo1/current/" + getImageFileName(100), true);
+ tc.addImage("/foo1/current/" + getImageFileName(200), true);
+ tc.addImage("/foo1/current/" + getImageFileName(300), false);
+ tc.addImage("/foo1/current/" + getImageFileName(400), false);
+
+ tc.addLog("/foo2/current/" + getFinalizedEditsFileName(1, 100), true);
+ // Without lowering the max segments to retain, we'd retain all segments
+ // going back to txid 150 (300 - 150).
+ tc.addLog("/foo2/current/" + getFinalizedEditsFileName(101, 175), true);
+ tc.addLog("/foo2/current/" + getFinalizedEditsFileName(176, 200), true);
+ tc.addLog("/foo2/current/" + getFinalizedEditsFileName(201, 225), true);
+ tc.addLog("/foo2/current/" + getFinalizedEditsFileName(226, 240), true);
+ // Only retain 2 extra segments. The 301-400 segment is considered required,
+ // not extra.
+ tc.addLog("/foo2/current/" + getFinalizedEditsFileName(241, 275), false);
+ tc.addLog("/foo2/current/" + getFinalizedEditsFileName(276, 300), false);
+ tc.addLog("/foo2/current/" + getFinalizedEditsFileName(301, 400), false);
+ tc.addLog("/foo2/current/" + getInProgressEditsFileName(401), false);
+ runTest(tc);
+ }
+
private void runTest(TestCaseDescription tc) throws IOException {
StoragePurger mockPurger =
Mockito.mock(NNStorageRetentionManager.StoragePurger.class);
@@ -287,8 +317,10 @@ public class TestNNStorageRetentionManag
return mockStorageForDirs(sds.toArray(new StorageDirectory[0]));
}
+ @SuppressWarnings("unchecked")
public FSEditLog mockEditLog(StoragePurger purger) {
final List<JournalManager> jms = Lists.newArrayList();
+ final JournalSet journalSet = new JournalSet(0);
for (FakeRoot root : dirRoots.values()) {
if (!root.type.isOfType(NameNodeDirType.EDITS)) continue;
@@ -297,6 +329,7 @@ public class TestNNStorageRetentionManag
root.mockStorageDir(), null);
fjm.purger = purger;
jms.add(fjm);
+ journalSet.add(fjm, false);
}
FSEditLog mockLog = Mockito.mock(FSEditLog.class);
@@ -314,6 +347,18 @@ public class TestNNStorageRetentionManag
return null;
}
}).when(mockLog).purgeLogsOlderThan(Mockito.anyLong());
+
+ Mockito.doAnswer(new Answer<Void>() {
+
+ @Override
+ public Void answer(InvocationOnMock invocation) throws Throwable {
+ Object[] args = invocation.getArguments();
+ journalSet.selectInputStreams((Collection<EditLogInputStream>)args[0],
+ (long)((Long)args[1]), (boolean)((Boolean)args[2]));
+ return null;
+ }
+ }).when(mockLog).selectInputStreams(Mockito.anyCollection(),
+ Mockito.anyLong(), Mockito.anyBoolean());
return mockLog;
}
}