You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by cm...@apache.org on 2014/08/20 01:50:25 UTC
svn commit: r1619012 [13/35] - in
/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project: hadoop-hdfs-httpfs/
hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/client/
hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/server/ hadoop...
Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java Tue Aug 19 23:49:39 2014
@@ -98,7 +98,6 @@ public class BlockMetadataHeader {
/**
* This reads all the fields till the beginning of checksum.
- * @param in
* @return Metadata Header
* @throws IOException
*/
@@ -109,9 +108,7 @@ public class BlockMetadataHeader {
/**
* Reads header at the top of metadata file and returns the header.
*
- * @param dataset
- * @param block
- * @return
+ * @return metadata header for the block
* @throws IOException
*/
public static BlockMetadataHeader readHeader(File file) throws IOException {
@@ -147,8 +144,6 @@ public class BlockMetadataHeader {
/**
* This writes all the fields till the beginning of checksum.
* @param out DataOutputStream
- * @param header
- * @return
* @throws IOException
*/
@VisibleForTesting
@@ -161,9 +156,7 @@ public class BlockMetadataHeader {
/**
* Writes all the fields till the beginning of checksum.
- * @param out
- * @param checksum
- * @throws IOException
+ * @throws IOException on error
*/
static void writeHeader(DataOutputStream out, DataChecksum checksum)
throws IOException {
Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolManager.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolManager.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolManager.java Tue Aug 19 23:49:39 2014
@@ -88,7 +88,11 @@ class BlockPoolManager {
synchronized void remove(BPOfferService t) {
offerServices.remove(t);
- bpByBlockPoolId.remove(t.getBlockPoolId());
+ if (t.hasBlockPoolId()) {
+ // It's possible that the block pool never successfully registered
+ // with any NN, so it was never added it to this map
+ bpByBlockPoolId.remove(t.getBlockPoolId());
+ }
boolean removed = false;
for (Iterator<BPOfferService> it = bpByNameserviceId.values().iterator();
Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java Tue Aug 19 23:49:39 2014
@@ -84,6 +84,10 @@ class BlockPoolSliceScanner {
private final SortedSet<BlockScanInfo> blockInfoSet
= new TreeSet<BlockScanInfo>(BlockScanInfo.LAST_SCAN_TIME_COMPARATOR);
+
+ private final SortedSet<BlockScanInfo> newBlockInfoSet =
+ new TreeSet<BlockScanInfo>(BlockScanInfo.LAST_SCAN_TIME_COMPARATOR);
+
private final GSet<Block, BlockScanInfo> blockMap
= new LightWeightGSet<Block, BlockScanInfo>(
LightWeightGSet.computeCapacity(0.5, "BlockMap"));
@@ -97,7 +101,7 @@ class BlockPoolSliceScanner {
private long totalTransientErrors = 0;
private final AtomicInteger totalBlocksScannedInLastRun = new AtomicInteger(); // Used for test only
- private long currentPeriodStart = Time.now();
+ private long currentPeriodStart = Time.monotonicNow();
private long bytesLeft = 0; // Bytes to scan in this period
private long totalBytesToScan = 0;
private boolean isNewPeriod = true;
@@ -195,7 +199,7 @@ class BlockPoolSliceScanner {
BlockScanInfo info = new BlockScanInfo( block );
info.lastScanTime = scanTime--;
//still keep 'info.lastScanType' to NONE.
- addBlockInfo(info);
+ addBlockInfo(info, false);
}
RollingLogs rollingLogs = null;
@@ -221,25 +225,42 @@ class BlockPoolSliceScanner {
// Should we change throttler bandwidth every time bytesLeft changes?
// not really required.
}
-
- private synchronized void addBlockInfo(BlockScanInfo info) {
- boolean added = blockInfoSet.add(info);
+
+ /**
+ * Add the BlockScanInfo to sorted set of blockScanInfo
+ * @param info BlockScanInfo to be added
+ * @param isNewBlock true if the block is the new Block, false if
+ * BlockScanInfo is being updated with new scanTime
+ */
+ private synchronized void addBlockInfo(BlockScanInfo info,
+ boolean isNewBlock) {
+ boolean added = false;
+ if (isNewBlock) {
+ // check whether the block already present
+ boolean exists = blockInfoSet.contains(info);
+ added = !exists && newBlockInfoSet.add(info);
+ } else {
+ added = blockInfoSet.add(info);
+ }
blockMap.put(info);
if (added) {
updateBytesToScan(info.getNumBytes(), info.lastScanTime);
}
}
-
+
private synchronized void delBlockInfo(BlockScanInfo info) {
boolean exists = blockInfoSet.remove(info);
+ if (!exists){
+ exists = newBlockInfoSet.remove(info);
+ }
blockMap.remove(info);
if (exists) {
updateBytesToScan(-info.getNumBytes(), info.lastScanTime);
}
}
-
+
/** Update blockMap by the given LogEntry */
private synchronized void updateBlockInfo(LogEntry e) {
BlockScanInfo info = blockMap.get(new Block(e.blockId, 0, e.genStamp));
@@ -249,7 +270,7 @@ class BlockPoolSliceScanner {
delBlockInfo(info);
info.lastScanTime = e.verificationTime;
info.lastScanType = ScanType.VERIFICATION_SCAN;
- addBlockInfo(info);
+ addBlockInfo(info, false);
}
}
@@ -260,7 +281,7 @@ class BlockPoolSliceScanner {
long period = Math.min(scanPeriod,
Math.max(blockMap.size(),1) * 600 * 1000L);
int periodInt = Math.abs((int)period);
- return Time.now() - scanPeriod +
+ return Time.monotonicNow() - scanPeriod +
DFSUtil.getRandom().nextInt(periodInt);
}
@@ -275,14 +296,14 @@ class BlockPoolSliceScanner {
info = new BlockScanInfo(block.getLocalBlock());
info.lastScanTime = getNewBlockScanTime();
- addBlockInfo(info);
+ addBlockInfo(info, true);
adjustThrottler();
}
/** Deletes the block from internal structures */
synchronized void deleteBlock(Block block) {
BlockScanInfo info = blockMap.get(block);
- if ( info != null ) {
+ if (info != null) {
delBlockInfo(info);
}
}
@@ -310,23 +331,16 @@ class BlockPoolSliceScanner {
}
}
- private synchronized void updateScanStatus(Block block,
+ private synchronized void updateScanStatus(BlockScanInfo info,
ScanType type,
boolean scanOk) {
- BlockScanInfo info = blockMap.get(block);
-
- if ( info != null ) {
- delBlockInfo(info);
- } else {
- // It might already be removed. Thats ok, it will be caught next time.
- info = new BlockScanInfo(block);
- }
-
- long now = Time.now();
+ delBlockInfo(info);
+
+ long now = Time.monotonicNow();
info.lastScanType = type;
info.lastScanTime = now;
info.lastScanOk = scanOk;
- addBlockInfo(info);
+ addBlockInfo(info, false);
// Don't update meta data if the verification failed.
if (!scanOk) {
@@ -334,8 +348,8 @@ class BlockPoolSliceScanner {
}
if (verificationLog != null) {
- verificationLog.append(now, block.getGenerationStamp(),
- block.getBlockId());
+ verificationLog.append(now, info.getGenerationStamp(),
+ info.getBlockId());
}
}
@@ -399,8 +413,9 @@ class BlockPoolSliceScanner {
}
private synchronized void adjustThrottler() {
- long timeLeft = currentPeriodStart+scanPeriod - Time.now();
- long bw = Math.max(bytesLeft*1000/timeLeft, MIN_SCAN_RATE);
+ long timeLeft = Math.max(1L,
+ currentPeriodStart + scanPeriod - Time.monotonicNow());
+ long bw = Math.max((bytesLeft * 1000) / timeLeft, MIN_SCAN_RATE);
throttler.setBandwidth(Math.min(bw, MAX_SCAN_RATE));
}
@@ -433,11 +448,13 @@ class BlockPoolSliceScanner {
totalTransientErrors++;
}
- updateScanStatus(block.getLocalBlock(), ScanType.VERIFICATION_SCAN, true);
+ updateScanStatus((BlockScanInfo)block.getLocalBlock(),
+ ScanType.VERIFICATION_SCAN, true);
return;
} catch (IOException e) {
- updateScanStatus(block.getLocalBlock(), ScanType.VERIFICATION_SCAN, false);
+ updateScanStatus((BlockScanInfo)block.getLocalBlock(),
+ ScanType.VERIFICATION_SCAN, false);
// If the block does not exists anymore, then its not an error
if (!dataset.contains(block)) {
@@ -496,7 +513,7 @@ class BlockPoolSliceScanner {
// Picks one block and verifies it
private void verifyFirstBlock() {
- Block block = null;
+ BlockScanInfo block = null;
synchronized (this) {
if (!blockInfoSet.isEmpty()) {
block = blockInfoSet.first();
@@ -523,7 +540,7 @@ class BlockPoolSliceScanner {
private boolean assignInitialVerificationTimes() {
//First updates the last verification times from the log file.
if (verificationLog != null) {
- long now = Time.now();
+ long now = Time.monotonicNow();
RollingLogs.LineIterator logIterator = null;
try {
logIterator = verificationLog.logs.iterator(false);
@@ -574,7 +591,7 @@ class BlockPoolSliceScanner {
// Initially spread the block reads over half of scan period
// so that we don't keep scanning the blocks too quickly when restarted.
long verifyInterval = Math.min(scanPeriod/(2L * numBlocks), 10*60*1000L);
- long lastScanTime = Time.now() - scanPeriod;
+ long lastScanTime = Time.monotonicNow() - scanPeriod;
if (!blockInfoSet.isEmpty()) {
BlockScanInfo info;
@@ -582,7 +599,7 @@ class BlockPoolSliceScanner {
delBlockInfo(info);
info.lastScanTime = lastScanTime;
lastScanTime += verifyInterval;
- addBlockInfo(info);
+ addBlockInfo(info, false);
}
}
}
@@ -601,16 +618,16 @@ class BlockPoolSliceScanner {
// reset the byte counts :
bytesLeft = totalBytesToScan;
- currentPeriodStart = Time.now();
+ currentPeriodStart = Time.monotonicNow();
isNewPeriod = true;
}
private synchronized boolean workRemainingInCurrentPeriod() {
- if (bytesLeft <= 0 && Time.now() < currentPeriodStart + scanPeriod) {
+ if (bytesLeft <= 0 && Time.monotonicNow() < currentPeriodStart + scanPeriod) {
if (LOG.isDebugEnabled()) {
LOG.debug("Skipping scan since bytesLeft=" + bytesLeft + ", Start=" +
currentPeriodStart + ", period=" + scanPeriod + ", now=" +
- Time.now() + " " + blockPoolId);
+ Time.monotonicNow() + " " + blockPoolId);
}
return false;
} else {
@@ -633,7 +650,7 @@ class BlockPoolSliceScanner {
scan();
} finally {
totalBlocksScannedInLastRun.set(processedBlocks.size());
- lastScanTime.set(Time.now());
+ lastScanTime.set(Time.monotonicNow());
}
}
@@ -656,7 +673,7 @@ class BlockPoolSliceScanner {
while (datanode.shouldRun
&& !datanode.blockScanner.blockScannerThread.isInterrupted()
&& datanode.isBPServiceAlive(blockPoolId)) {
- long now = Time.now();
+ long now = Time.monotonicNow();
synchronized (this) {
if ( now >= (currentPeriodStart + scanPeriod)) {
startNewPeriod();
@@ -678,12 +695,21 @@ class BlockPoolSliceScanner {
throw e;
} finally {
rollVerificationLogs();
+ rollNewBlocksInfo();
if (LOG.isDebugEnabled()) {
LOG.debug("Done scanning block pool: " + blockPoolId);
}
}
}
-
+
+ // add new blocks to scan in next iteration
+ private synchronized void rollNewBlocksInfo() {
+ for (BlockScanInfo newBlock : newBlockInfoSet) {
+ blockInfoSet.add(newBlock);
+ }
+ newBlockInfoSet.clear();
+ }
+
private synchronized void rollVerificationLogs() {
if (verificationLog != null) {
try {
@@ -714,7 +740,7 @@ class BlockPoolSliceScanner {
int total = blockInfoSet.size();
- long now = Time.now();
+ long now = Time.monotonicNow();
Date date = new Date();
Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java Tue Aug 19 23:49:39 2014
@@ -36,8 +36,10 @@ import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.Properties;
+import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -106,13 +108,22 @@ public class BlockPoolSliceStorage exten
void recoverTransitionRead(DataNode datanode, NamespaceInfo nsInfo,
Collection<File> dataDirs, StartupOption startOpt) throws IOException {
LOG.info("Analyzing storage directories for bpid " + nsInfo.getBlockPoolID());
+ Set<String> existingStorageDirs = new HashSet<String>();
+ for (int i = 0; i < getNumStorageDirs(); i++) {
+ existingStorageDirs.add(getStorageDir(i).getRoot().getAbsolutePath());
+ }
+
// 1. For each BP data directory analyze the state and
// check whether all is consistent before transitioning.
- this.storageDirs = new ArrayList<StorageDirectory>(dataDirs.size());
ArrayList<StorageState> dataDirStates = new ArrayList<StorageState>(
dataDirs.size());
for (Iterator<File> it = dataDirs.iterator(); it.hasNext();) {
File dataDir = it.next();
+ if (existingStorageDirs.contains(dataDir.getAbsolutePath())) {
+ LOG.info("Storage directory " + dataDir + " has already been used.");
+ it.remove();
+ continue;
+ }
StorageDirectory sd = new StorageDirectory(dataDir, null, true);
StorageState curState;
try {
@@ -152,7 +163,7 @@ public class BlockPoolSliceStorage exten
// During startup some of them can upgrade or roll back
// while others could be up-to-date for the regular startup.
for (int idx = 0; idx < getNumStorageDirs(); idx++) {
- doTransition(getStorageDir(idx), nsInfo, startOpt);
+ doTransition(datanode, getStorageDir(idx), nsInfo, startOpt);
assert getCTime() == nsInfo.getCTime()
: "Data-node and name-node CTimes must be the same.";
}
@@ -242,7 +253,7 @@ public class BlockPoolSliceStorage exten
* @param startOpt startup option
* @throws IOException
*/
- private void doTransition(StorageDirectory sd,
+ private void doTransition(DataNode datanode, StorageDirectory sd,
NamespaceInfo nsInfo, StartupOption startOpt) throws IOException {
if (startOpt == StartupOption.ROLLBACK) {
doRollback(sd, nsInfo); // rollback if applicable
@@ -275,7 +286,7 @@ public class BlockPoolSliceStorage exten
}
if (this.layoutVersion > HdfsConstants.DATANODE_LAYOUT_VERSION
|| this.cTime < nsInfo.getCTime()) {
- doUpgrade(sd, nsInfo); // upgrade
+ doUpgrade(datanode, sd, nsInfo); // upgrade
return;
}
// layoutVersion == LAYOUT_VERSION && this.cTime > nsInfo.cTime
@@ -304,7 +315,8 @@ public class BlockPoolSliceStorage exten
* @param nsInfo Namespace Info from the namenode
* @throws IOException on error
*/
- void doUpgrade(StorageDirectory bpSd, NamespaceInfo nsInfo) throws IOException {
+ void doUpgrade(DataNode datanode, StorageDirectory bpSd, NamespaceInfo nsInfo)
+ throws IOException {
// Upgrading is applicable only to release with federation or after
if (!DataNodeLayoutVersion.supports(
LayoutVersion.Feature.FEDERATION, layoutVersion)) {
@@ -312,7 +324,7 @@ public class BlockPoolSliceStorage exten
}
LOG.info("Upgrading block pool storage directory " + bpSd.getRoot()
+ ".\n old LV = " + this.getLayoutVersion() + "; old CTime = "
- + this.getCTime() + ".\n new LV = " + nsInfo.getLayoutVersion()
+ + this.getCTime() + ".\n new LV = " + HdfsConstants.DATANODE_LAYOUT_VERSION
+ "; new CTime = " + nsInfo.getCTime());
// get <SD>/previous directory
String dnRoot = getDataNodeStorageRoot(bpSd.getRoot().getCanonicalPath());
@@ -340,7 +352,7 @@ public class BlockPoolSliceStorage exten
rename(bpCurDir, bpTmpDir);
// 3. Create new <SD>/current with block files hardlinks and VERSION
- linkAllBlocks(bpTmpDir, bpCurDir);
+ linkAllBlocks(datanode, bpTmpDir, bpCurDir);
this.layoutVersion = HdfsConstants.DATANODE_LAYOUT_VERSION;
assert this.namespaceID == nsInfo.getNamespaceID()
: "Data-node and name-node layout versions must be the same.";
@@ -517,14 +529,15 @@ public class BlockPoolSliceStorage exten
* @param toDir the current data directory
* @throws IOException if error occurs during hardlink
*/
- private void linkAllBlocks(File fromDir, File toDir) throws IOException {
+ private void linkAllBlocks(DataNode datanode, File fromDir, File toDir)
+ throws IOException {
// do the link
int diskLayoutVersion = this.getLayoutVersion();
// hardlink finalized blocks in tmpDir
HardLink hardLink = new HardLink();
- DataStorage.linkBlocks(new File(fromDir, DataStorage.STORAGE_DIR_FINALIZED),
+ DataStorage.linkBlocks(datanode, new File(fromDir, DataStorage.STORAGE_DIR_FINALIZED),
new File(toDir,DataStorage.STORAGE_DIR_FINALIZED), diskLayoutVersion, hardLink);
- DataStorage.linkBlocks(new File(fromDir, DataStorage.STORAGE_DIR_RBW),
+ DataStorage.linkBlocks(datanode, new File(fromDir, DataStorage.STORAGE_DIR_RBW),
new File(toDir, DataStorage.STORAGE_DIR_RBW), diskLayoutVersion, hardLink);
LOG.info( hardLink.linkStats.report() );
}
Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java Tue Aug 19 23:49:39 2014
@@ -37,6 +37,7 @@ import java.util.zip.Checksum;
import org.apache.commons.logging.Log;
import org.apache.hadoop.fs.ChecksumException;
import org.apache.hadoop.fs.FSOutputSummer;
+import org.apache.hadoop.hdfs.StorageType;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
@@ -44,6 +45,7 @@ import org.apache.hadoop.hdfs.protocol.d
import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
import org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver;
import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams;
@@ -69,7 +71,7 @@ class BlockReceiver implements Closeable
@VisibleForTesting
static long CACHE_DROP_LAG_BYTES = 8 * 1024 * 1024;
-
+ private final long datanodeSlowLogThresholdMs;
private DataInputStream in = null; // from where data are read
private DataChecksum clientChecksum; // checksum used by client
private DataChecksum diskChecksum; // checksum we write to disk
@@ -86,8 +88,7 @@ class BlockReceiver implements Closeable
private int bytesPerChecksum;
private int checksumSize;
- private final PacketReceiver packetReceiver =
- new PacketReceiver(false);
+ private final PacketReceiver packetReceiver = new PacketReceiver(false);
protected final String inAddr;
protected final String myAddr;
@@ -105,6 +106,7 @@ class BlockReceiver implements Closeable
private boolean dropCacheBehindWrites;
private long lastCacheManagementOffset = 0;
private boolean syncBehindWrites;
+ private boolean syncBehindWritesInBackground;
/** The client name. It is empty if a datanode is the client */
private final String clientname;
@@ -122,7 +124,16 @@ class BlockReceiver implements Closeable
private boolean syncOnClose;
private long restartBudget;
- BlockReceiver(final ExtendedBlock block, final DataInputStream in,
+ /**
+ * for replaceBlock response
+ */
+ private final long responseInterval;
+ private long lastResponseTime = 0;
+ private boolean isReplaceBlock = false;
+ private DataOutputStream replyOut = null;
+
+ BlockReceiver(final ExtendedBlock block, final StorageType storageType,
+ final DataInputStream in,
final String inAddr, final String myAddr,
final BlockConstructionStage stage,
final long newGs, final long minBytesRcvd, final long maxBytesRcvd,
@@ -141,7 +152,10 @@ class BlockReceiver implements Closeable
this.isDatanode = clientname.length() == 0;
this.isClient = !this.isDatanode;
this.restartBudget = datanode.getDnConf().restartReplicaExpiry;
-
+ this.datanodeSlowLogThresholdMs = datanode.getDnConf().datanodeSlowIoWarningThresholdMs;
+ // For replaceBlock() calls response should be sent to avoid socketTimeout
+ // at clients. So sending with the interval of 0.5 * socketTimeout
+ this.responseInterval = (long) (datanode.getDnConf().socketTimeout * 0.5);
//for datanode, we have
//1: clientName.length() == 0, and
//2: stage == null or PIPELINE_SETUP_CREATE
@@ -162,11 +176,11 @@ class BlockReceiver implements Closeable
// Open local disk out
//
if (isDatanode) { //replication or move
- replicaInfo = datanode.data.createTemporary(block);
+ replicaInfo = datanode.data.createTemporary(storageType, block);
} else {
switch (stage) {
case PIPELINE_SETUP_CREATE:
- replicaInfo = datanode.data.createRbw(block);
+ replicaInfo = datanode.data.createRbw(storageType, block);
datanode.notifyNamenodeReceivingBlock(
block, replicaInfo.getStorageUuid());
break;
@@ -198,7 +212,7 @@ class BlockReceiver implements Closeable
case TRANSFER_RBW:
case TRANSFER_FINALIZED:
// this is a transfer destination
- replicaInfo = datanode.data.createTemporary(block);
+ replicaInfo = datanode.data.createTemporary(storageType, block);
break;
default: throw new IOException("Unsupported stage " + stage +
" while receiving block " + block + " from " + inAddr);
@@ -208,6 +222,8 @@ class BlockReceiver implements Closeable
datanode.getDnConf().dropCacheBehindWrites :
cachingStrategy.getDropBehind();
this.syncBehindWrites = datanode.getDnConf().syncBehindWrites;
+ this.syncBehindWritesInBackground = datanode.getDnConf().
+ syncBehindWritesInBackground;
final boolean isCreate = isDatanode || isTransfer
|| stage == BlockConstructionStage.PIPELINE_SETUP_CREATE;
@@ -249,7 +265,7 @@ class BlockReceiver implements Closeable
if (cause != null) { // possible disk error
ioe = cause;
- datanode.checkDiskError(ioe); // may throw an exception here
+ datanode.checkDiskErrorAsync();
}
throw ioe;
@@ -268,10 +284,8 @@ class BlockReceiver implements Closeable
*/
@Override
public void close() throws IOException {
- if (packetReceiver != null) {
- packetReceiver.close();
- }
-
+ packetReceiver.close();
+
IOException ioe = null;
if (syncOnClose && (out != null || checksumOut != null)) {
datanode.metrics.incrFsyncCount();
@@ -327,7 +341,7 @@ class BlockReceiver implements Closeable
}
// disk check
if(ioe != null) {
- datanode.checkDiskError(ioe);
+ datanode.checkDiskErrorAsync();
throw ioe;
}
}
@@ -338,6 +352,7 @@ class BlockReceiver implements Closeable
*/
void flushOrSync(boolean isSync) throws IOException {
long flushTotalNanos = 0;
+ long begin = Time.monotonicNow();
if (checksumOut != null) {
long flushStartNanos = System.nanoTime();
checksumOut.flush();
@@ -366,6 +381,12 @@ class BlockReceiver implements Closeable
datanode.metrics.incrFsyncCount();
}
}
+ long duration = Time.monotonicNow() - begin;
+ if (duration > datanodeSlowLogThresholdMs) {
+ LOG.warn("Slow flushOrSync took " + duration + "ms (threshold="
+ + datanodeSlowLogThresholdMs + "ms), isSync:" + isSync + ", flushTotalNanos="
+ + flushTotalNanos + "ns");
+ }
}
/**
@@ -491,8 +512,14 @@ class BlockReceiver implements Closeable
//First write the packet to the mirror:
if (mirrorOut != null && !mirrorError) {
try {
+ long begin = Time.monotonicNow();
packetReceiver.mirrorPacketTo(mirrorOut);
mirrorOut.flush();
+ long duration = Time.monotonicNow() - begin;
+ if (duration > datanodeSlowLogThresholdMs) {
+ LOG.warn("Slow BlockReceiver write packet to mirror took " + duration
+ + "ms (threshold=" + datanodeSlowLogThresholdMs + "ms)");
+ }
} catch (IOException e) {
handleMirrorOutError(e);
}
@@ -575,7 +602,13 @@ class BlockReceiver implements Closeable
int numBytesToDisk = (int)(offsetInBlock-onDiskLen);
// Write data to disk.
+ long begin = Time.monotonicNow();
out.write(dataBuf.array(), startByteToDisk, numBytesToDisk);
+ long duration = Time.monotonicNow() - begin;
+ if (duration > datanodeSlowLogThresholdMs) {
+ LOG.warn("Slow BlockReceiver write data to disk cost:" + duration
+ + "ms (threshold=" + datanodeSlowLogThresholdMs + "ms)");
+ }
// If this is a partial chunk, then verify that this is the only
// chunk in the packet. Calculate new crc for this chunk.
@@ -618,7 +651,7 @@ class BlockReceiver implements Closeable
manageWriterOsCache(offsetInBlock);
}
} catch (IOException iex) {
- datanode.checkDiskError(iex);
+ datanode.checkDiskErrorAsync();
throw iex;
}
}
@@ -630,6 +663,20 @@ class BlockReceiver implements Closeable
lastPacketInBlock, offsetInBlock, Status.SUCCESS);
}
+ /*
+ * Send in-progress responses for the replaceBlock() calls back to caller to
+ * avoid timeouts due to balancer throttling. HDFS-6247
+ */
+ if (isReplaceBlock
+ && (Time.monotonicNow() - lastResponseTime > responseInterval)) {
+ BlockOpResponseProto.Builder response = BlockOpResponseProto.newBuilder()
+ .setStatus(Status.IN_PROGRESS);
+ response.build().writeDelimitedTo(replyOut);
+ replyOut.flush();
+
+ lastResponseTime = Time.monotonicNow();
+ }
+
if (throttler != null) { // throttle I/O
throttler.throttle(len);
}
@@ -641,6 +688,7 @@ class BlockReceiver implements Closeable
try {
if (outFd != null &&
offsetInBlock > lastCacheManagementOffset + CACHE_DROP_LAG_BYTES) {
+ long begin = Time.monotonicNow();
//
// For SYNC_FILE_RANGE_WRITE, we want to sync from
// lastCacheManagementOffset to a position "two windows ago"
@@ -651,10 +699,17 @@ class BlockReceiver implements Closeable
// of file
//
if (syncBehindWrites) {
- NativeIO.POSIX.syncFileRangeIfPossible(outFd,
- lastCacheManagementOffset,
- offsetInBlock - lastCacheManagementOffset,
- NativeIO.POSIX.SYNC_FILE_RANGE_WRITE);
+ if (syncBehindWritesInBackground) {
+ this.datanode.getFSDataset().submitBackgroundSyncFileRangeRequest(
+ block, outFd, lastCacheManagementOffset,
+ offsetInBlock - lastCacheManagementOffset,
+ NativeIO.POSIX.SYNC_FILE_RANGE_WRITE);
+ } else {
+ NativeIO.POSIX.syncFileRangeIfPossible(outFd,
+ lastCacheManagementOffset, offsetInBlock
+ - lastCacheManagementOffset,
+ NativeIO.POSIX.SYNC_FILE_RANGE_WRITE);
+ }
}
//
// For POSIX_FADV_DONTNEED, we want to drop from the beginning
@@ -673,18 +728,29 @@ class BlockReceiver implements Closeable
NativeIO.POSIX.POSIX_FADV_DONTNEED);
}
lastCacheManagementOffset = offsetInBlock;
+ long duration = Time.monotonicNow() - begin;
+ if (duration > datanodeSlowLogThresholdMs) {
+ LOG.warn("Slow manageWriterOsCache took " + duration
+ + "ms (threshold=" + datanodeSlowLogThresholdMs + "ms)");
+ }
}
} catch (Throwable t) {
LOG.warn("Error managing cache for writer of block " + block, t);
}
}
-
+
+ public void sendOOB() throws IOException, InterruptedException {
+ ((PacketResponder) responder.getRunnable()).sendOOBResponse(PipelineAck
+ .getRestartOOBStatus());
+ }
+
void receiveBlock(
DataOutputStream mirrOut, // output to next datanode
DataInputStream mirrIn, // input from next datanode
DataOutputStream replyOut, // output to previous datanode
String mirrAddr, DataTransferThrottler throttlerArg,
- DatanodeInfo[] downstreams) throws IOException {
+ DatanodeInfo[] downstreams,
+ boolean isReplaceBlock) throws IOException {
syncOnClose = datanode.getDnConf().syncOnClose;
boolean responderClosed = false;
@@ -692,6 +758,9 @@ class BlockReceiver implements Closeable
mirrorAddr = mirrAddr;
throttler = throttlerArg;
+ this.replyOut = replyOut;
+ this.isReplaceBlock = isReplaceBlock;
+
try {
if (isClient && !isTransfer) {
responder = new Daemon(datanode.threadGroup,
@@ -766,9 +835,7 @@ class BlockReceiver implements Closeable
// The worst case is not recovering this RBW replica.
// Client will fall back to regular pipeline recovery.
}
- try {
- ((PacketResponder) responder.getRunnable()).
- sendOOBResponse(PipelineAck.getRestartOOBStatus());
+ try {
// Even if the connection is closed after the ack packet is
// flushed, the client can react to the connection closure
// first. Insert a delay to lower the chance of client
@@ -776,8 +843,6 @@ class BlockReceiver implements Closeable
Thread.sleep(1000);
} catch (InterruptedException ie) {
// It is already going down. Ignore this.
- } catch (IOException ioe) {
- LOG.info("Error sending OOB Ack.", ioe);
}
}
responder.interrupt();
@@ -956,9 +1021,9 @@ class BlockReceiver implements Closeable
/**
* enqueue the seqno that is still be to acked by the downstream datanode.
- * @param seqno
- * @param lastPacketInBlock
- * @param offsetInBlock
+ * @param seqno sequence number of the packet
+ * @param lastPacketInBlock if true, this is the last packet in block
+ * @param offsetInBlock offset of this packet in block
*/
void enqueue(final long seqno, final boolean lastPacketInBlock,
final long offsetInBlock, final Status ackStatus) {
@@ -1174,11 +1239,7 @@ class BlockReceiver implements Closeable
} catch (IOException e) {
LOG.warn("IOException in BlockReceiver.run(): ", e);
if (running) {
- try {
- datanode.checkDiskError(e); // may throw an exception here
- } catch (IOException ioe) {
- LOG.warn("DataNode.checkDiskError failed in run() with: ", ioe);
- }
+ datanode.checkDiskErrorAsync();
LOG.info(myString, e);
running = false;
if (!Thread.interrupted()) { // failure not caused by interruption
@@ -1306,9 +1367,15 @@ class BlockReceiver implements Closeable
replicaInfo.setBytesAcked(offsetInBlock);
}
// send my ack back to upstream datanode
+ long begin = Time.monotonicNow();
replyAck.write(upstreamOut);
upstreamOut.flush();
- if (LOG.isDebugEnabled()) {
+ long duration = Time.monotonicNow() - begin;
+ if (duration > datanodeSlowLogThresholdMs) {
+ LOG.warn("Slow PacketResponder send ack to upstream took " + duration
+ + "ms (threshold=" + datanodeSlowLogThresholdMs + "ms), " + myString
+ + ", replyAck=" + replyAck);
+ } else if (LOG.isDebugEnabled()) {
LOG.debug(myString + ", replyAck=" + replyAck);
}
Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java Tue Aug 19 23:49:39 2014
@@ -168,7 +168,7 @@ class BlockSender implements java.io.Clo
* @param block Block that is being read
* @param startOffset starting offset to read from
* @param length length of data to read
- * @param corruptChecksumOk
+ * @param corruptChecksumOk if true, corrupt checksum is okay
* @param verifyChecksum verify checksum while reading the data
* @param sendChecksum send checksum to client.
* @param datanode datanode from which the block is being read
@@ -687,7 +687,7 @@ class BlockSender implements java.io.Clo
// Trigger readahead of beginning of file if configured.
manageOsCache();
- final long startTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0;
+ final long startTime = ClientTraceLog.isDebugEnabled() ? System.nanoTime() : 0;
try {
int maxChunksPerPacket;
int pktBufSize = PacketHeader.PKT_MAX_HEADER_LEN;
@@ -733,9 +733,9 @@ class BlockSender implements java.io.Clo
sentEntireByteRange = true;
}
} finally {
- if (clientTraceFmt != null) {
+ if ((clientTraceFmt != null) && ClientTraceLog.isDebugEnabled()) {
final long endTime = System.nanoTime();
- ClientTraceLog.info(String.format(clientTraceFmt, totalRead,
+ ClientTraceLog.debug(String.format(clientTraceFmt, totalRead,
initialOffset, endTime - startTime));
}
close();
Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java Tue Aug 19 23:49:39 2014
@@ -52,7 +52,9 @@ import static org.apache.hadoop.hdfs.DFS
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver;
+import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
+import org.apache.hadoop.security.SaslPropertiesResolver;
/**
* Simple class encapsulating all of the configuration that the DataNode
@@ -67,6 +69,7 @@ public class DNConf {
final boolean transferToAllowed;
final boolean dropCacheBehindWrites;
final boolean syncBehindWrites;
+ final boolean syncBehindWritesInBackground;
final boolean dropCacheBehindReads;
final boolean syncOnClose;
final boolean encryptDataTransfer;
@@ -79,10 +82,13 @@ public class DNConf {
final long deleteReportInterval;
final long initialBlockReportDelay;
final long cacheReportInterval;
+ final long dfsclientSlowIoWarningThresholdMs;
+ final long datanodeSlowIoWarningThresholdMs;
final int writePacketSize;
final String minimumNameNodeVersion;
final String encryptionAlgorithm;
+ final SaslPropertiesResolver saslPropsResolver;
final TrustedChannelResolver trustedChannelResolver;
final long xceiverStopTimeout;
@@ -117,6 +123,9 @@ public class DNConf {
syncBehindWrites = conf.getBoolean(
DFSConfigKeys.DFS_DATANODE_SYNC_BEHIND_WRITES_KEY,
DFSConfigKeys.DFS_DATANODE_SYNC_BEHIND_WRITES_DEFAULT);
+ syncBehindWritesInBackground = conf.getBoolean(
+ DFSConfigKeys.DFS_DATANODE_SYNC_BEHIND_WRITES_IN_BACKGROUND_KEY,
+ DFSConfigKeys.DFS_DATANODE_SYNC_BEHIND_WRITES_IN_BACKGROUND_DEFAULT);
dropCacheBehindReads = conf.getBoolean(
DFSConfigKeys.DFS_DATANODE_DROP_CACHE_BEHIND_READS_KEY,
DFSConfigKeys.DFS_DATANODE_DROP_CACHE_BEHIND_READS_DEFAULT);
@@ -129,7 +138,14 @@ public class DNConf {
DFS_BLOCKREPORT_SPLIT_THRESHOLD_DEFAULT);
this.cacheReportInterval = conf.getLong(DFS_CACHEREPORT_INTERVAL_MSEC_KEY,
DFS_CACHEREPORT_INTERVAL_MSEC_DEFAULT);
-
+
+ this.dfsclientSlowIoWarningThresholdMs = conf.getLong(
+ DFSConfigKeys.DFS_CLIENT_SLOW_IO_WARNING_THRESHOLD_KEY,
+ DFSConfigKeys.DFS_CLIENT_SLOW_IO_WARNING_THRESHOLD_DEFAULT);
+ this.datanodeSlowIoWarningThresholdMs = conf.getLong(
+ DFSConfigKeys.DFS_DATANODE_SLOW_IO_WARNING_THRESHOLD_KEY,
+ DFSConfigKeys.DFS_DATANODE_SLOW_IO_WARNING_THRESHOLD_DEFAULT);
+
long initBRDelay = conf.getLong(
DFS_BLOCKREPORT_INITIAL_DELAY_KEY,
DFS_BLOCKREPORT_INITIAL_DELAY_DEFAULT) * 1000L;
@@ -155,6 +171,8 @@ public class DNConf {
DFS_ENCRYPT_DATA_TRANSFER_DEFAULT);
this.encryptionAlgorithm = conf.get(DFS_DATA_ENCRYPTION_ALGORITHM_KEY);
this.trustedChannelResolver = TrustedChannelResolver.getInstance(conf);
+ this.saslPropsResolver = DataTransferSaslUtil.getSaslPropertiesResolver(
+ conf);
this.xceiverStopTimeout = conf.getLong(
DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS_KEY,
@@ -168,12 +186,31 @@ public class DNConf {
DFS_DATANODE_RESTART_REPLICA_EXPIRY_KEY,
DFS_DATANODE_RESTART_REPLICA_EXPIRY_DEFAULT) * 1000L;
}
-
+
// We get minimumNameNodeVersion via a method so it can be mocked out in tests.
String getMinimumNameNodeVersion() {
return this.minimumNameNodeVersion;
}
-
+
+ /**
+ * Returns true if encryption enabled for DataTransferProtocol.
+ *
+ * @return boolean true if encryption enabled for DataTransferProtocol
+ */
+ public boolean getEncryptDataTransfer() {
+ return encryptDataTransfer;
+ }
+
+ /**
+ * Returns encryption algorithm configured for DataTransferProtocol, or null
+ * if not configured.
+ *
+ * @return encryption algorithm configured for DataTransferProtocol
+ */
+ public String getEncryptionAlgorithm() {
+ return encryptionAlgorithm;
+ }
+
public long getXceiverStopTimeout() {
return xceiverStopTimeout;
}
@@ -181,4 +218,24 @@ public class DNConf {
public long getMaxLockedMemory() {
return maxLockedMemory;
}
+
+ /**
+ * Returns the SaslPropertiesResolver configured for use with
+ * DataTransferProtocol, or null if not configured.
+ *
+ * @return SaslPropertiesResolver configured for use with DataTransferProtocol
+ */
+ public SaslPropertiesResolver getSaslPropsResolver() {
+ return saslPropsResolver;
+ }
+
+ /**
+ * Returns the TrustedChannelResolver configured for use with
+ * DataTransferProtocol, or null if not configured.
+ *
+ * @return TrustedChannelResolver configured for use with DataTransferProtocol
+ */
+ public TrustedChannelResolver getTrustedChannelResolver() {
+ return trustedChannelResolver;
+ }
}
Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java Tue Aug 19 23:49:39 2014
@@ -131,8 +131,7 @@ public class DataBlockScanner implements
private BlockPoolSliceScanner getNextBPScanner(String currentBpId) {
String nextBpId = null;
- while ((nextBpId == null) && datanode.shouldRun
- && !blockScannerThread.isInterrupted()) {
+ while (datanode.shouldRun && !blockScannerThread.isInterrupted()) {
waitForInit();
synchronized (this) {
if (getBlockPoolSetSize() > 0) {
Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java Tue Aug 19 23:49:39 2014
@@ -17,10 +17,68 @@
*/
package org.apache.hadoop.hdfs.server.datanode;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Joiner;
-import com.google.common.base.Preconditions;
-import com.google.protobuf.BlockingService;
+import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT;
+import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ADMIN;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_ADDRESS_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_ADDRESS_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_PERMISSION_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_PERMISSION_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DNS_INTERFACE_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DNS_INTERFACE_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DNS_NAMESERVER_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DNS_NAMESERVER_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HANDLER_COUNT_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HANDLER_COUNT_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HOST_NAME_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HTTPS_ADDRESS_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HTTPS_ADDRESS_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HTTP_ADDRESS_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HTTP_ADDRESS_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_IPC_ADDRESS_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_KERBEROS_PRINCIPAL_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_KEYTAB_FILE_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_PLUGINS_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_STARTUP_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATA_TRANSFER_PROTECTION_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_MAX_NUM_BLOCKS_TO_LOG_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_MAX_NUM_BLOCKS_TO_LOG_KEY;
+import static org.apache.hadoop.util.ExitUtil.terminate;
+
+import java.io.BufferedOutputStream;
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.PrintStream;
+import java.lang.management.ManagementFactory;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.URI;
+import java.net.UnknownHostException;
+import java.nio.channels.SocketChannel;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.management.ObjectName;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -36,17 +94,44 @@ import org.apache.hadoop.hdfs.DFSConfigK
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.HDFSPolicyProvider;
import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.StorageType;
import org.apache.hadoop.hdfs.net.DomainPeerServer;
import org.apache.hadoop.hdfs.net.TcpPeerServer;
-import org.apache.hadoop.hdfs.protocol.*;
-import org.apache.hadoop.hdfs.protocol.datatransfer.*;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
+import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.DatanodeLocalInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
+import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
+import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol;
+import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
+import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
+import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactory;
+import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient;
+import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferServer;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.ClientDatanodeProtocolService;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DNTransferAckProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
import org.apache.hadoop.hdfs.protocol.proto.InterDatanodeProtocolProtos.InterDatanodeProtocolService;
-import org.apache.hadoop.hdfs.protocolPB.*;
-import org.apache.hadoop.hdfs.security.token.block.*;
+import org.apache.hadoop.hdfs.protocolPB.ClientDatanodeProtocolPB;
+import org.apache.hadoop.hdfs.protocolPB.ClientDatanodeProtocolServerSideTranslatorPB;
+import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
+import org.apache.hadoop.hdfs.protocolPB.InterDatanodeProtocolPB;
+import org.apache.hadoop.hdfs.protocolPB.InterDatanodeProtocolServerSideTranslatorPB;
+import org.apache.hadoop.hdfs.protocolPB.InterDatanodeProtocolTranslatorPB;
+import org.apache.hadoop.hdfs.protocolPB.PBHelper;
+import org.apache.hadoop.hdfs.security.token.block.BlockPoolTokenSecretManager;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager.AccessMode;
+import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
+import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
+import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NodeType;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
@@ -59,7 +144,11 @@ import org.apache.hadoop.hdfs.server.dat
import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
import org.apache.hadoop.hdfs.server.datanode.web.resources.DatanodeWebHdfsMethods;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
-import org.apache.hadoop.hdfs.server.protocol.*;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
+import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
+import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
+import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
import org.apache.hadoop.hdfs.web.WebHdfsFileSystem;
import org.apache.hadoop.hdfs.web.resources.Param;
import org.apache.hadoop.http.HttpConfig;
@@ -82,25 +171,21 @@ import org.apache.hadoop.security.UserGr
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
-import org.apache.hadoop.util.*;
+import org.apache.hadoop.util.Daemon;
+import org.apache.hadoop.util.DiskChecker;
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
-import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
+import org.apache.hadoop.util.GenericOptionsParser;
+import org.apache.hadoop.util.JvmPauseMonitor;
+import org.apache.hadoop.util.ServicePlugin;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Time;
+import org.apache.hadoop.util.VersionInfo;
import org.mortbay.util.ajax.JSON;
-import javax.management.ObjectName;
-
-import java.io.*;
-import java.lang.management.ManagementFactory;
-import java.net.*;
-import java.nio.channels.ClosedByInterruptException;
-import java.nio.channels.ClosedChannelException;
-import java.nio.channels.SocketChannel;
-import java.security.PrivilegedExceptionAction;
-import java.util.*;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
-import static org.apache.hadoop.util.ExitUtil.terminate;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.protobuf.BlockingService;
/**********************************************************
* DataNode is a class (and program) that stores a set of
@@ -185,6 +270,7 @@ public class DataNode extends Configured
public final static String EMPTY_DEL_HINT = "";
final AtomicInteger xmitsInProgress = new AtomicInteger();
Daemon dataXceiverServer = null;
+ DataXceiverServer xserver = null;
Daemon localDataXceiverServer = null;
ShortCircuitRegistry shortCircuitRegistry = null;
ThreadGroup threadGroup = null;
@@ -227,8 +313,18 @@ public class DataNode extends Configured
private final List<String> usersWithLocalPathAccess;
private final boolean connectToDnViaHostname;
ReadaheadPool readaheadPool;
+ SaslDataTransferClient saslClient;
+ SaslDataTransferServer saslServer;
private final boolean getHdfsBlockLocationsEnabled;
private ObjectName dataNodeInfoBeanName;
+ private Thread checkDiskErrorThread = null;
+ protected final int checkDiskErrorInterval = 5*1000;
+ private boolean checkDiskErrorFlag = false;
+ private Object checkDiskErrorMutex = new Object();
+ private long lastDiskErrorCheck;
+ private String supergroup;
+ private boolean isPermissionEnabled;
+ private String dnUserName = null;
/**
* Create the DataNode given a configuration, an array of dataDirs,
@@ -238,6 +334,7 @@ public class DataNode extends Configured
final List<StorageLocation> dataDirs,
final SecureResources resources) throws IOException {
super(conf);
+ this.lastDiskErrorCheck = 0;
this.maxNumberOfBlocksToLog = conf.getLong(DFS_MAX_NUM_BLOCKS_TO_LOG_KEY,
DFS_MAX_NUM_BLOCKS_TO_LOG_DEFAULT);
@@ -249,6 +346,11 @@ public class DataNode extends Configured
this.getHdfsBlockLocationsEnabled = conf.getBoolean(
DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED,
DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED_DEFAULT);
+ this.supergroup = conf.get(DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROUP_KEY,
+ DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROUP_DEFAULT);
+ this.isPermissionEnabled = conf.getBoolean(
+ DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY,
+ DFSConfigKeys.DFS_PERMISSIONS_ENABLED_DEFAULT);
confVersion = "core-" +
conf.get("hadoop.common.configuration.version", "UNSPECIFIED") +
@@ -429,6 +531,33 @@ public class DataNode extends Configured
ipcServer.refreshServiceAcl(conf, new HDFSPolicyProvider());
}
}
+
+ /** Check whether the current user is in the superuser group. */
+ private void checkSuperuserPrivilege() throws IOException, AccessControlException {
+ if (!isPermissionEnabled) {
+ return;
+ }
+ // Try to get the ugi in the RPC call.
+ UserGroupInformation callerUgi = ipcServer.getRemoteUser();
+ if (callerUgi == null) {
+ // This is not from RPC.
+ callerUgi = UserGroupInformation.getCurrentUser();
+ }
+
+ // Is this by the DN user itself?
+ assert dnUserName != null;
+ if (callerUgi.getShortUserName().equals(dnUserName)) {
+ return;
+ }
+
+ // Is the user a member of the super group?
+ List<String> groups = Arrays.asList(callerUgi.getGroupNames());
+ if (groups.contains(supergroup)) {
+ return;
+ }
+ // Not a superuser.
+ throw new AccessControlException();
+ }
/**
* Initialize the datanode's periodic scanners:
@@ -521,8 +650,8 @@ public class DataNode extends Configured
streamingAddr = tcpPeerServer.getStreamingAddr();
LOG.info("Opened streaming server at " + streamingAddr);
this.threadGroup = new ThreadGroup("dataXceiverServer");
- this.dataXceiverServer = new Daemon(threadGroup,
- new DataXceiverServer(tcpPeerServer, conf, this));
+ xserver = new DataXceiverServer(tcpPeerServer, conf, this);
+ this.dataXceiverServer = new Daemon(threadGroup, xserver);
this.threadGroup.setDaemon(true); // auto destroy when empty
if (conf.getBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY,
@@ -646,7 +775,6 @@ public class DataNode extends Configured
/**
* Return the BPOfferService instance corresponding to the given block.
- * @param block
* @return the BPOS
* @throws IOException if no such BPOS can be found
*/
@@ -685,15 +813,10 @@ public class DataNode extends Configured
*/
void startDataNode(Configuration conf,
List<StorageLocation> dataDirs,
- // DatanodeProtocol namenode,
SecureResources resources
) throws IOException {
- if(UserGroupInformation.isSecurityEnabled() && resources == null) {
- if (!conf.getBoolean("ignore.secure.ports.for.testing", false)) {
- throw new RuntimeException("Cannot start secure cluster without "
- + "privileged resources.");
- }
- }
+
+ checkSecureConfig(conf, resources);
// settings global for all BPs in the Data Node
this.secureResources = resources;
@@ -708,15 +831,19 @@ public class DataNode extends Configured
" size (%s) is greater than zero and native code is not available.",
DFS_DATANODE_MAX_LOCKED_MEMORY_KEY));
}
- long ulimit = NativeIO.POSIX.getCacheManipulator().getMemlockLimit();
- if (dnConf.maxLockedMemory > ulimit) {
- throw new RuntimeException(String.format(
- "Cannot start datanode because the configured max locked memory" +
- " size (%s) of %d bytes is more than the datanode's available" +
- " RLIMIT_MEMLOCK ulimit of %d bytes.",
- DFS_DATANODE_MAX_LOCKED_MEMORY_KEY,
- dnConf.maxLockedMemory,
- ulimit));
+ if (Path.WINDOWS) {
+ NativeIO.Windows.extendWorkingSetSize(dnConf.maxLockedMemory);
+ } else {
+ long ulimit = NativeIO.POSIX.getCacheManipulator().getMemlockLimit();
+ if (dnConf.maxLockedMemory > ulimit) {
+ throw new RuntimeException(String.format(
+ "Cannot start datanode because the configured max locked memory" +
+ " size (%s) of %d bytes is more than the datanode's available" +
+ " RLIMIT_MEMLOCK ulimit of %d bytes.",
+ DFS_DATANODE_MAX_LOCKED_MEMORY_KEY,
+ dnConf.maxLockedMemory,
+ ulimit));
+ }
}
}
LOG.info("Starting DataNode with maxLockedMemory = " +
@@ -733,16 +860,71 @@ public class DataNode extends Configured
// BlockPoolTokenSecretManager is required to create ipc server.
this.blockPoolTokenSecretManager = new BlockPoolTokenSecretManager();
+
+ // Login is done by now. Set the DN user name.
+ dnUserName = UserGroupInformation.getCurrentUser().getShortUserName();
+ LOG.info("dnUserName = " + dnUserName);
+ LOG.info("supergroup = " + supergroup);
initIpcServer(conf);
metrics = DataNodeMetrics.create(conf, getDisplayName());
-
+ metrics.getJvmMetrics().setPauseMonitor(pauseMonitor);
+
blockPoolManager = new BlockPoolManager(this);
blockPoolManager.refreshNamenodes(conf);
// Create the ReadaheadPool from the DataNode context so we can
// exit without having to explicitly shutdown its thread pool.
readaheadPool = ReadaheadPool.getInstance();
+ saslClient = new SaslDataTransferClient(dnConf.saslPropsResolver,
+ dnConf.trustedChannelResolver,
+ conf.getBoolean(
+ IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY,
+ IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT));
+ saslServer = new SaslDataTransferServer(dnConf, blockPoolTokenSecretManager);
+ }
+
+ /**
+ * Checks if the DataNode has a secure configuration if security is enabled.
+ * There are 2 possible configurations that are considered secure:
+ * 1. The server has bound to privileged ports for RPC and HTTP via
+ * SecureDataNodeStarter.
+ * 2. The configuration enables SASL on DataTransferProtocol and HTTPS (no
+ * plain HTTP) for the HTTP server. The SASL handshake guarantees
+ * authentication of the RPC server before a client transmits a secret, such
+ * as a block access token. Similarly, SSL guarantees authentication of the
+ * HTTP server before a client transmits a secret, such as a delegation
+ * token.
+ * It is not possible to run with both privileged ports and SASL on
+ * DataTransferProtocol. For backwards-compatibility, the connection logic
+ * must check if the target port is a privileged port, and if so, skip the
+ * SASL handshake.
+ *
+ * @param conf Configuration to check
+ * @param resources SecuredResources obtained for DataNode
+ * @throws RuntimeException if security enabled, but configuration is insecure
+ */
+ private static void checkSecureConfig(Configuration conf,
+ SecureResources resources) throws RuntimeException {
+ if (!UserGroupInformation.isSecurityEnabled()) {
+ return;
+ }
+ String dataTransferProtection = conf.get(DFS_DATA_TRANSFER_PROTECTION_KEY);
+ if (resources != null && dataTransferProtection == null) {
+ return;
+ }
+ if (conf.getBoolean("ignore.secure.ports.for.testing", false)) {
+ return;
+ }
+ if (dataTransferProtection != null &&
+ DFSUtil.getHttpPolicy(conf) == HttpConfig.Policy.HTTPS_ONLY &&
+ resources == null) {
+ return;
+ }
+ throw new RuntimeException("Cannot start secure DataNode without " +
+ "configuring either privileged resources or SASL RPC data transfer " +
+ "protection and SSL for HTTP. Using privileged resources in " +
+ "combination with SASL RPC data transfer protection is not supported.");
}
public static String generateUuid() {
@@ -811,9 +993,7 @@ public class DataNode extends Configured
/**
* After the block pool has contacted the NN, registers that block pool
* with the secret manager, updating it with the secrets provided by the NN.
- * @param bpRegistration
- * @param blockPoolId
- * @throws IOException
+ * @throws IOException on error
*/
private synchronized void registerBlockPoolWithSecretManager(
DatanodeRegistration bpRegistration, String blockPoolId) throws IOException {
@@ -850,19 +1030,24 @@ public class DataNode extends Configured
*/
void shutdownBlockPool(BPOfferService bpos) {
blockPoolManager.remove(bpos);
+ if (bpos.hasBlockPoolId()) {
+ // Possible that this is shutting down before successfully
+ // registering anywhere. If that's the case, we wouldn't have
+ // a block pool id
+ String bpId = bpos.getBlockPoolId();
+ if (blockScanner != null) {
+ blockScanner.removeBlockPool(bpId);
+ }
- String bpId = bpos.getBlockPoolId();
- if (blockScanner != null) {
- blockScanner.removeBlockPool(bpId);
- }
-
- if (data != null) {
- data.shutdownBlockPool(bpId);
- }
+ if (data != null) {
+ data.shutdownBlockPool(bpId);
+ }
- if (storage != null) {
- storage.removeBlockPoolStorage(bpId);
+ if (storage != null) {
+ storage.removeBlockPoolStorage(bpId);
+ }
}
+
}
/**
@@ -883,14 +1068,19 @@ public class DataNode extends Configured
+ " should have retrieved namespace info before initBlockPool.");
}
+ setClusterId(nsInfo.clusterID, nsInfo.getBlockPoolID());
+
// Register the new block pool with the BP manager.
blockPoolManager.addBlockPool(bpos);
-
- setClusterId(nsInfo.clusterID, nsInfo.getBlockPoolID());
// In the case that this is the first block pool to connect, initialize
// the dataset, block scanners, etc.
initStorage(nsInfo);
+
+ // Exclude failed disks before initializing the block pools to avoid startup
+ // failures.
+ checkDiskError();
+
initPeriodicScanners(conf);
data.addBlockPool(nsInfo.getBlockPoolID(), conf);
@@ -949,6 +1139,11 @@ public class DataNode extends Configured
}
@VisibleForTesting
+ public DataXceiverServer getXferServer() {
+ return xserver;
+ }
+
+ @VisibleForTesting
public int getXferPort() {
return streamingAddr.getPort();
}
@@ -981,9 +1176,8 @@ public class DataNode extends Configured
/**
* get BP registration by blockPool id
- * @param bpid
* @return BP registration object
- * @throws IOException
+ * @throws IOException on error
*/
@VisibleForTesting
public DatanodeRegistration getDNRegistrationForBP(String bpid)
@@ -1071,6 +1265,7 @@ public class DataNode extends Configured
Token<BlockTokenIdentifier> token) throws IOException {
checkBlockLocalPathAccess();
checkBlockToken(block, token, BlockTokenSecretManager.AccessMode.READ);
+ Preconditions.checkNotNull(data, "Storage not yet initialized");
BlockLocalPathInfo info = data.getBlockLocalPathInfo(block);
if (LOG.isDebugEnabled()) {
if (info != null) {
@@ -1206,12 +1401,18 @@ public class DataNode extends Configured
// in order to avoid any further acceptance of requests, but the peers
// for block writes are not closed until the clients are notified.
if (dataXceiverServer != null) {
+ xserver.sendOOBToPeers();
((DataXceiverServer) this.dataXceiverServer.getRunnable()).kill();
this.dataXceiverServer.interrupt();
}
+ // Interrupt the checkDiskErrorThread and terminate it.
+ if(this.checkDiskErrorThread != null) {
+ this.checkDiskErrorThread.interrupt();
+ }
+
// Record the time of initial notification
- long timeNotified = Time.now();
+ long timeNotified = Time.monotonicNow();
if (localDataXceiverServer != null) {
((DataXceiverServer) this.localDataXceiverServer.getRunnable()).kill();
@@ -1243,8 +1444,9 @@ public class DataNode extends Configured
while (true) {
// When shutting down for restart, wait 2.5 seconds before forcing
// termination of receiver threads.
- if (!this.shutdownForUpgrade ||
- (this.shutdownForUpgrade && (Time.now() - timeNotified > 2500))) {
+ if (!this.shutdownForUpgrade ||
+ (this.shutdownForUpgrade && (Time.monotonicNow() - timeNotified
+ > 2500))) {
this.threadGroup.interrupt();
}
LOG.info("Waiting for threadgroup to exit, active threads is " +
@@ -1319,55 +1521,17 @@ public class DataNode extends Configured
}
- /** Check if there is no space in disk
- * @param e that caused this checkDiskError call
- **/
- protected void checkDiskError(Exception e ) throws IOException {
-
- LOG.warn("checkDiskError: exception: ", e);
- if (isNetworkRelatedException(e)) {
- LOG.info("Not checking disk as checkDiskError was called on a network" +
- " related exception");
- return;
- }
- if (e.getMessage() != null &&
- e.getMessage().startsWith("No space left on device")) {
- throw new DiskOutOfSpaceException("No space left on device");
- } else {
- checkDiskError();
- }
- }
-
/**
- * Check if the provided exception looks like it's from a network error
- * @param e the exception from a checkDiskError call
- * @return true if this exception is network related, false otherwise
+ * Check if there is a disk failure asynchronously and if so, handle the error
*/
- protected boolean isNetworkRelatedException(Exception e) {
- if (e instanceof SocketException
- || e instanceof SocketTimeoutException
- || e instanceof ClosedChannelException
- || e instanceof ClosedByInterruptException) {
- return true;
- }
-
- String msg = e.getMessage();
-
- return null != msg
- && (msg.startsWith("An established connection was aborted")
- || msg.startsWith("Broken pipe")
- || msg.startsWith("Connection reset")
- || msg.contains("java.nio.channels.SocketChannel"));
- }
-
- /**
- * Check if there is a disk failure and if so, handle the error
- */
- public void checkDiskError() {
- try {
- data.checkDataDir();
- } catch (DiskErrorException de) {
- handleDiskError(de.getMessage());
+ public void checkDiskErrorAsync() {
+ synchronized(checkDiskErrorMutex) {
+ checkDiskErrorFlag = true;
+ if(checkDiskErrorThread == null) {
+ startCheckDiskErrorThread();
+ checkDiskErrorThread.start();
+ LOG.info("Starting CheckDiskError Thread");
+ }
}
}
@@ -1405,8 +1569,8 @@ public class DataNode extends Configured
return xmitsInProgress.get();
}
- private void transferBlock(ExtendedBlock block, DatanodeInfo xferTargets[])
- throws IOException {
+ private void transferBlock(ExtendedBlock block, DatanodeInfo[] xferTargets,
+ StorageType[] xferTargetStorageTypes) throws IOException {
BPOfferService bpos = getBPOSForBlock(block);
DatanodeRegistration bpReg = getDNRegistrationForBP(block.getBlockPoolId());
@@ -1442,16 +1606,17 @@ public class DataNode extends Configured
LOG.info(bpReg + " Starting thread to transfer " +
block + " to " + xfersBuilder);
- new Daemon(new DataTransfer(xferTargets, block,
+ new Daemon(new DataTransfer(xferTargets, xferTargetStorageTypes, block,
BlockConstructionStage.PIPELINE_SETUP_CREATE, "")).start();
}
}
void transferBlocks(String poolId, Block blocks[],
- DatanodeInfo xferTargets[][]) {
+ DatanodeInfo xferTargets[][], StorageType[][] xferTargetStorageTypes) {
for (int i = 0; i < blocks.length; i++) {
try {
- transferBlock(new ExtendedBlock(poolId, blocks[i]), xferTargets[i]);
+ transferBlock(new ExtendedBlock(poolId, blocks[i]), xferTargets[i],
+ xferTargetStorageTypes[i]);
} catch (IOException ie) {
LOG.warn("Failed to transfer block " + blocks[i], ie);
}
@@ -1554,6 +1719,7 @@ public class DataNode extends Configured
*/
private class DataTransfer implements Runnable {
final DatanodeInfo[] targets;
+ final StorageType[] targetStorageTypes;
final ExtendedBlock b;
final BlockConstructionStage stage;
final private DatanodeRegistration bpReg;
@@ -1564,7 +1730,8 @@ public class DataNode extends Configured
* Connect to the first item in the target list. Pass along the
* entire target list, the block, and the data.
*/
- DataTransfer(DatanodeInfo targets[], ExtendedBlock b, BlockConstructionStage stage,
+ DataTransfer(DatanodeInfo targets[], StorageType[] targetStorageTypes,
+ ExtendedBlock b, BlockConstructionStage stage,
final String clientname) {
if (DataTransferProtocol.LOG.isDebugEnabled()) {
DataTransferProtocol.LOG.debug(getClass().getSimpleName() + ": "
@@ -1574,6 +1741,7 @@ public class DataNode extends Configured
+ ", targests=" + Arrays.asList(targets));
}
this.targets = targets;
+ this.targetStorageTypes = targetStorageTypes;
this.b = b;
this.stage = stage;
BPOfferService bpos = blockPoolManager.get(b.getBlockPoolId());
@@ -1605,20 +1773,25 @@ public class DataNode extends Configured
NetUtils.connect(sock, curTarget, dnConf.socketTimeout);
sock.setSoTimeout(targets.length * dnConf.socketTimeout);
+ //
+ // Header info
+ //
+ Token<BlockTokenIdentifier> accessToken = BlockTokenSecretManager.DUMMY_TOKEN;
+ if (isBlockTokenEnabled) {
+ accessToken = blockPoolTokenSecretManager.generateToken(b,
+ EnumSet.of(BlockTokenSecretManager.AccessMode.WRITE));
+ }
+
long writeTimeout = dnConf.socketWriteTimeout +
HdfsServerConstants.WRITE_TIMEOUT_EXTENSION * (targets.length-1);
OutputStream unbufOut = NetUtils.getOutputStream(sock, writeTimeout);
InputStream unbufIn = NetUtils.getInputStream(sock);
- if (dnConf.encryptDataTransfer &&
- !dnConf.trustedChannelResolver.isTrusted(sock.getInetAddress())) {
- IOStreamPair encryptedStreams =
- DataTransferEncryptor.getEncryptedStreams(
- unbufOut, unbufIn,
- blockPoolTokenSecretManager.generateDataEncryptionKey(
- b.getBlockPoolId()));
- unbufOut = encryptedStreams.out;
- unbufIn = encryptedStreams.in;
- }
+ DataEncryptionKeyFactory keyFactory =
+ getDataEncryptionKeyFactoryForBlock(b);
+ IOStreamPair saslStreams = saslClient.socketSend(sock, unbufOut,
+ unbufIn, keyFactory, accessToken, bpReg);
+ unbufOut = saslStreams.out;
+ unbufIn = saslStreams.in;
out = new DataOutputStream(new BufferedOutputStream(unbufOut,
HdfsConstants.SMALL_BUFFER_SIZE));
@@ -1627,16 +1800,8 @@ public class DataNode extends Configured
false, false, true, DataNode.this, null, cachingStrategy);
DatanodeInfo srcNode = new DatanodeInfo(bpReg);
- //
- // Header info
- //
- Token<BlockTokenIdentifier> accessToken = BlockTokenSecretManager.DUMMY_TOKEN;
- if (isBlockTokenEnabled) {
- accessToken = blockPoolTokenSecretManager.generateToken(b,
- EnumSet.of(BlockTokenSecretManager.AccessMode.WRITE));
- }
-
- new Sender(out).writeBlock(b, accessToken, clientname, targets, srcNode,
+ new Sender(out).writeBlock(b, targetStorageTypes[0], accessToken,
+ clientname, targets, targetStorageTypes, srcNode,
stage, 0, 0, 0, 0, blockSender.getChecksum(), cachingStrategy);
// send data & checksum
@@ -1667,13 +1832,8 @@ public class DataNode extends Configured
} catch (IOException ie) {
LOG.warn(bpReg + ":Failed to transfer " + b + " to " +
targets[0] + " got ", ie);
- // check if there are any disk problem
- try{
- checkDiskError(ie);
- } catch(IOException e) {
- LOG.warn("DataNode.checkDiskError failed in run() with: ", e);
- }
-
+ // check if there are any disk problem
+ checkDiskErrorAsync();
} finally {
xmitsInProgress.getAndDecrement();
IOUtils.closeStream(blockSender);
@@ -1683,12 +1843,32 @@ public class DataNode extends Configured
}
}
}
-
+
+ /**
+ * Returns a new DataEncryptionKeyFactory that generates a key from the
+ * BlockPoolTokenSecretManager, using the block pool ID of the given block.
+ *
+ * @param block for which the factory needs to create a key
+ * @return DataEncryptionKeyFactory for block's block pool ID
+ */
+ DataEncryptionKeyFactory getDataEncryptionKeyFactoryForBlock(
+ final ExtendedBlock block) {
+ return new DataEncryptionKeyFactory() {
+ @Override
+ public DataEncryptionKey newDataEncryptionKey() {
+ return dnConf.encryptDataTransfer ?
+ blockPoolTokenSecretManager.generateDataEncryptionKey(
+ block.getBlockPoolId()) : null;
+ }
+ };
+ }
+
/**
* After a block becomes finalized, a datanode increases metric counter,
* notifies namenode, and adds it to the block scanner
- * @param block
- * @param delHint
+ * @param block block to close
+ * @param delHint hint on which excess block to delete
+ * @param storageUuid UUID of the storage where block is stored
*/
void closeBlock(ExtendedBlock block, String delHint, String storageUuid) {
metrics.incrBlocksWritten();
@@ -1776,8 +1956,15 @@ public class DataNode extends Configured
try {
location = StorageLocation.parse(locationString);
} catch (IOException ioe) {
- throw new IllegalArgumentException("Failed to parse conf property "
- + DFS_DATANODE_DATA_DIR_KEY + ": " + locationString, ioe);
+ LOG.error("Failed to initialize storage directory " + locationString
+ + ". Exception details: " + ioe);
+ // Ignore the exception.
+ continue;
+ } catch (SecurityException se) {
+ LOG.error("Failed to initialize storage directory " + locationString
+ + ". Exception details: " + se);
+ // Ignore the exception.
+ continue;
}
locations.add(location);
@@ -2282,11 +2469,11 @@ public class DataNode extends Configured
@Override // ClientDataNodeProtocol
public long getReplicaVisibleLength(final ExtendedBlock block) throws IOException {
- checkWriteAccess(block);
+ checkReadAccess(block);
return data.getReplicaVisibleLength(block);
}
- private void checkWriteAccess(final ExtendedBlock block) throws IOException {
+ private void checkReadAccess(final ExtendedBlock block) throws IOException {
if (isBlockTokenEnabled) {
Set<TokenIdentifier> tokenIds = UserGroupInformation.getCurrentUser()
.getTokenIdentifiers();
@@ -2311,11 +2498,12 @@ public class DataNode extends Configured
* The corresponding replica must be an RBW or a Finalized.
* Its GS and numBytes will be set to
* the stored GS and the visible length.
- * @param targets
- * @param client
+ * @param targets targets to transfer the block to
+ * @param client client name
*/
void transferReplicaForPipelineRecovery(final ExtendedBlock b,
- final DatanodeInfo[] targets, final String client) throws IOException {
+ final DatanodeInfo[] targets, final StorageType[] targetStorageTypes,
+ final String client) throws IOException {
final long storedGS;
final long visible;
final BlockConstructionStage stage;
@@ -2348,7 +2536,7 @@ public class DataNode extends Configured
b.setNumBytes(visible);
if (targets.length > 0) {
- new DataTransfer(targets, b, stage, client).run();
+ new DataTransfer(targets, targetStorageTypes, b, stage, client).run();
}
}
@@ -2423,6 +2611,7 @@ public class DataNode extends Configured
*/
@Override // DataNodeMXBean
public String getVolumeInfo() {
+ Preconditions.checkNotNull(data, "Storage not yet initialized");
return JSON.toString(data.getVolumeInfoMap());
}
@@ -2437,6 +2626,7 @@ public class DataNode extends Configured
@Override // ClientDatanodeProtocol
public void refreshNamenodes() throws IOException {
+ checkSuperuserPrivilege();
conf = new Configuration();
refreshNamenodes(conf);
}
@@ -2444,6 +2634,7 @@ public class DataNode extends Configured
@Override // ClientDatanodeProtocol
public void deleteBlockPool(String blockPoolId, boolean force)
throws IOException {
+ checkSuperuserPrivilege();
LOG.info("deleteBlockPool command received for block pool " + blockPoolId
+ ", force=" + force);
if (blockPoolManager.get(blockPoolId) != null) {
@@ -2459,6 +2650,7 @@ public class DataNode extends Configured
@Override // ClientDatanodeProtocol
public synchronized void shutdownDatanode(boolean forUpgrade) throws IOException {
+ checkSuperuserPrivilege();
LOG.info("shutdownDatanode command received (upgrade=" + forUpgrade +
"). Shutting down Datanode...");
@@ -2579,4 +2771,59 @@ public class DataNode extends Configured
public ShortCircuitRegistry getShortCircuitRegistry() {
return shortCircuitRegistry;
}
+
+ /**
+ * Check the disk error
+ */
+ private void checkDiskError() {
+ try {
+ data.checkDataDir();
+ } catch (DiskErrorException de) {
+ handleDiskError(de.getMessage());
+ }
+ }
+
+ /**
+ * Starts a new thread which will check for disk error check request
+ * every 5 sec
+ */
+ private void startCheckDiskErrorThread() {
+ checkDiskErrorThread = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ while(shouldRun) {
+ boolean tempFlag ;
+ synchronized(checkDiskErrorMutex) {
+ tempFlag = checkDiskErrorFlag;
+ checkDiskErrorFlag = false;
+ }
+ if(tempFlag) {
+ try {
+ checkDiskError();
+ } catch (Exception e) {
+ LOG.warn("Unexpected exception occurred while checking disk error " + e);
+ checkDiskErrorThread = null;
+ return;
+ }
+ synchronized(checkDiskErrorMutex) {
+ lastDiskErrorCheck = Time.monotonicNow();
+ }
+ }
+ try {
+ Thread.sleep(checkDiskErrorInterval);
+ } catch (InterruptedException e) {
+ LOG.debug("InterruptedException in check disk error thread", e);
+ checkDiskErrorThread = null;
+ return;
+ }
+ }
+ }
+ });
+ }
+
+ public long getLastDiskErrorCheck() {
+ synchronized(checkDiskErrorMutex) {
+ return lastDiskErrorCheck;
+ }
+ }
}
Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeLayoutVersion.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeLayoutVersion.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeLayoutVersion.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeLayoutVersion.java Tue Aug 19 23:49:39 2014
@@ -62,7 +62,10 @@ public class DataNodeLayoutVersion {
* </ul>
*/
public static enum Feature implements LayoutFeature {
- FIRST_LAYOUT(-55, -53, "First datenode layout", false);
+ FIRST_LAYOUT(-55, -53, "First datanode layout", false),
+ BLOCKID_BASED_LAYOUT(-56,
+ "The block ID of a finalized block uniquely determines its position " +
+ "in the directory structure");
private final FeatureInfo info;