You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by cu...@apache.org on 2014/08/20 03:34:47 UTC
svn commit: r1619019 [7/11] - in
/hadoop/common/branches/YARN-1051/hadoop-hdfs-project:
hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/client/
hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/server/
hadoop-hdfs-httpfs/src/main/ja...
Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java?rev=1619019&r1=1619018&r2=1619019&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java Wed Aug 20 01:34:29 2014
@@ -18,13 +18,19 @@
package org.apache.hadoop.hdfs.server.datanode;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.Futures;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.*;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.HardLink;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.LayoutVersion;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NodeType;
@@ -35,13 +41,31 @@ import org.apache.hadoop.hdfs.server.com
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.nativeio.NativeIO;
import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.DiskChecker;
-import java.io.*;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.RandomAccessFile;
import java.nio.channels.FileLock;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
/**
* Data storage information file.
@@ -149,43 +173,99 @@ public class DataStorage extends Storage
}
/**
- * Analyze storage directories.
- * Recover from previous transitions if required.
- * Perform fs state transition if necessary depending on the namespace info.
- * Read storage info.
- * <br>
- * This method should be synchronized between multiple DN threads. Only the
- * first DN thread does DN level storage dir recoverTransitionRead.
- *
+ * {{@inheritDoc org.apache.hadoop.hdfs.server.common.Storage#writeAll()}}
+ */
+ private void writeAll(Collection<StorageDirectory> dirs) throws IOException {
+ this.layoutVersion = getServiceLayoutVersion();
+ for (StorageDirectory dir : dirs) {
+ writeProperties(dir);
+ }
+ }
+
+ /**
+ * Add a list of volumes to be managed by DataStorage. If the volume is empty,
+ * format it, otherwise recover it from previous transitions if required.
+ *
+ * @param datanode the reference to DataNode.
* @param nsInfo namespace information
* @param dataDirs array of data storage directories
* @param startOpt startup option
* @throws IOException
*/
- synchronized void recoverTransitionRead(DataNode datanode,
+ synchronized void addStorageLocations(DataNode datanode,
NamespaceInfo nsInfo, Collection<StorageLocation> dataDirs,
StartupOption startOpt)
throws IOException {
- if (initialized) {
- // DN storage has been initialized, no need to do anything
- return;
+ // Similar to recoverTransitionRead, it first ensures the datanode level
+ // format is completed.
+ List<StorageLocation> tmpDataDirs =
+ new ArrayList<StorageLocation>(dataDirs);
+ addStorageLocations(datanode, nsInfo, tmpDataDirs, startOpt, false, true);
+
+ Collection<File> bpDataDirs = new ArrayList<File>();
+ String bpid = nsInfo.getBlockPoolID();
+ for (StorageLocation dir : dataDirs) {
+ File dnRoot = dir.getFile();
+ File bpRoot = BlockPoolSliceStorage.getBpRoot(bpid, new File(dnRoot,
+ STORAGE_DIR_CURRENT));
+ bpDataDirs.add(bpRoot);
}
- LOG.info("Data-node version: " + HdfsConstants.DATANODE_LAYOUT_VERSION
- + " and name-node layout version: " + nsInfo.getLayoutVersion());
-
- // 1. For each data directory calculate its state and
- // check whether all is consistent before transitioning.
- // Format and recover.
- this.storageDirs = new ArrayList<StorageDirectory>(dataDirs.size());
- ArrayList<StorageState> dataDirStates = new ArrayList<StorageState>(dataDirs.size());
+ // mkdir for the list of BlockPoolStorage
+ makeBlockPoolDataDir(bpDataDirs, null);
+ BlockPoolSliceStorage bpStorage = this.bpStorageMap.get(bpid);
+ if (bpStorage == null) {
+ bpStorage = new BlockPoolSliceStorage(
+ nsInfo.getNamespaceID(), bpid, nsInfo.getCTime(),
+ nsInfo.getClusterID());
+ }
+
+ bpStorage.recoverTransitionRead(datanode, nsInfo, bpDataDirs, startOpt);
+ addBlockPoolStorage(bpid, bpStorage);
+ }
+
+ /**
+ * Add a list of volumes to be managed by this DataStorage. If the volume is
+ * empty, it formats the volume, otherwise it recovers it from previous
+ * transitions if required.
+ *
+ * If isInitialize is false, only the directories that have finished the
+ * doTransition() process will be added into DataStorage.
+ *
+ * @param datanode the reference to DataNode.
+ * @param nsInfo namespace information
+ * @param dataDirs array of data storage directories
+ * @param startOpt startup option
+ * @param isInitialize whether it is called when DataNode starts up.
+ * @throws IOException
+ */
+ private synchronized void addStorageLocations(DataNode datanode,
+ NamespaceInfo nsInfo, Collection<StorageLocation> dataDirs,
+ StartupOption startOpt, boolean isInitialize, boolean ignoreExistingDirs)
+ throws IOException {
+ Set<String> existingStorageDirs = new HashSet<String>();
+ for (int i = 0; i < getNumStorageDirs(); i++) {
+ existingStorageDirs.add(getStorageDir(i).getRoot().getAbsolutePath());
+ }
+
+ // 1. For each data directory calculate its state and check whether all is
+ // consistent before transitioning. Format and recover.
+ ArrayList<StorageState> dataDirStates =
+ new ArrayList<StorageState>(dataDirs.size());
+ List<StorageDirectory> addedStorageDirectories =
+ new ArrayList<StorageDirectory>();
for(Iterator<StorageLocation> it = dataDirs.iterator(); it.hasNext();) {
File dataDir = it.next().getFile();
+ if (existingStorageDirs.contains(dataDir.getAbsolutePath())) {
+ LOG.info("Storage directory " + dataDir + " has already been used.");
+ it.remove();
+ continue;
+ }
StorageDirectory sd = new StorageDirectory(dataDir);
StorageState curState;
try {
curState = sd.analyzeStorage(startOpt, this);
// sd is locked but not opened
- switch(curState) {
+ switch (curState) {
case NORMAL:
break;
case NON_EXISTENT:
@@ -194,7 +274,8 @@ public class DataStorage extends Storage
it.remove();
continue;
case NOT_FORMATTED: // format
- LOG.info("Storage directory " + dataDir + " is not formatted");
+ LOG.info("Storage directory " + dataDir + " is not formatted for "
+ + nsInfo.getBlockPoolID());
LOG.info("Formatting ...");
format(sd, nsInfo, datanode.getDatanodeUuid());
break;
@@ -208,33 +289,82 @@ public class DataStorage extends Storage
//continue with other good dirs
continue;
}
- // add to the storage list
- addStorageDir(sd);
+ if (isInitialize) {
+ addStorageDir(sd);
+ }
+ addedStorageDirectories.add(sd);
dataDirStates.add(curState);
}
- if (dataDirs.size() == 0 || dataDirStates.size() == 0) // none of the data dirs exist
+ if (dataDirs.size() == 0 || dataDirStates.size() == 0) {
+ // none of the data dirs exist
+ if (ignoreExistingDirs) {
+ return;
+ }
throw new IOException(
"All specified directories are not accessible or do not exist.");
+ }
// 2. Do transitions
// Each storage directory is treated individually.
- // During startup some of them can upgrade or rollback
- // while others could be uptodate for the regular startup.
- try {
- for (int idx = 0; idx < getNumStorageDirs(); idx++) {
- doTransition(datanode, getStorageDir(idx), nsInfo, startOpt);
- createStorageID(getStorageDir(idx));
+ // During startup some of them can upgrade or rollback
+ // while others could be up-to-date for the regular startup.
+ for (Iterator<StorageDirectory> it = addedStorageDirectories.iterator();
+ it.hasNext(); ) {
+ StorageDirectory sd = it.next();
+ try {
+ doTransition(datanode, sd, nsInfo, startOpt);
+ createStorageID(sd);
+ } catch (IOException e) {
+ if (!isInitialize) {
+ sd.unlock();
+ it.remove();
+ continue;
+ }
+ unlockAll();
+ throw e;
}
- } catch (IOException e) {
- unlockAll();
- throw e;
}
- // 3. Update all storages. Some of them might have just been formatted.
- this.writeAll();
+ // 3. Update all successfully loaded storages. Some of them might have just
+ // been formatted.
+ this.writeAll(addedStorageDirectories);
+
+ // 4. Make newly loaded storage directories visible for service.
+ if (!isInitialize) {
+ this.storageDirs.addAll(addedStorageDirectories);
+ }
+ }
+
+ /**
+ * Analyze storage directories.
+ * Recover from previous transitions if required.
+ * Perform fs state transition if necessary depending on the namespace info.
+ * Read storage info.
+ * <br>
+ * This method should be synchronized between multiple DN threads. Only the
+ * first DN thread does DN level storage dir recoverTransitionRead.
+ *
+ * @param nsInfo namespace information
+ * @param dataDirs array of data storage directories
+ * @param startOpt startup option
+ * @throws IOException
+ */
+ synchronized void recoverTransitionRead(DataNode datanode,
+ NamespaceInfo nsInfo, Collection<StorageLocation> dataDirs,
+ StartupOption startOpt)
+ throws IOException {
+ if (initialized) {
+ // DN storage has been initialized, no need to do anything
+ return;
+ }
+ LOG.info("DataNode version: " + HdfsConstants.DATANODE_LAYOUT_VERSION
+ + " and NameNode layout version: " + nsInfo.getLayoutVersion());
+
+ this.storageDirs = new ArrayList<StorageDirectory>(dataDirs.size());
+ addStorageLocations(datanode, nsInfo, dataDirs, startOpt, true, false);
- // 4. mark DN storage is initialized
+ // mark DN storage is initialized
this.initialized = true;
}
@@ -261,6 +391,7 @@ public class DataStorage extends Storage
STORAGE_DIR_CURRENT));
bpDataDirs.add(bpRoot);
}
+
// mkdir for the list of BlockPoolStorage
makeBlockPoolDataDir(bpDataDirs, null);
BlockPoolSliceStorage bpStorage = new BlockPoolSliceStorage(
@@ -488,7 +619,7 @@ public class DataStorage extends Storage
// do upgrade
if (this.layoutVersion > HdfsConstants.DATANODE_LAYOUT_VERSION) {
- doUpgrade(sd, nsInfo); // upgrade
+ doUpgrade(datanode, sd, nsInfo); // upgrade
return;
}
@@ -523,7 +654,8 @@ public class DataStorage extends Storage
* @param sd storage directory
* @throws IOException on error
*/
- void doUpgrade(StorageDirectory sd, NamespaceInfo nsInfo) throws IOException {
+ void doUpgrade(DataNode datanode, StorageDirectory sd, NamespaceInfo nsInfo)
+ throws IOException {
// If the existing on-disk layout version supportes federation, simply
// update its layout version.
if (DataNodeLayoutVersion.supports(
@@ -568,7 +700,8 @@ public class DataStorage extends Storage
BlockPoolSliceStorage bpStorage = new BlockPoolSliceStorage(nsInfo.getNamespaceID(),
nsInfo.getBlockPoolID(), nsInfo.getCTime(), nsInfo.getClusterID());
bpStorage.format(curDir, nsInfo);
- linkAllBlocks(tmpDir, bbwDir, new File(curBpDir, STORAGE_DIR_CURRENT));
+ linkAllBlocks(datanode, tmpDir, bbwDir, new File(curBpDir,
+ STORAGE_DIR_CURRENT));
// 4. Write version file under <SD>/current
layoutVersion = HdfsConstants.DATANODE_LAYOUT_VERSION;
@@ -746,22 +879,22 @@ public class DataStorage extends Storage
*
* @throws IOException If error occurs during hardlink
*/
- private void linkAllBlocks(File fromDir, File fromBbwDir, File toDir)
- throws IOException {
+ private void linkAllBlocks(DataNode datanode, File fromDir, File fromBbwDir,
+ File toDir) throws IOException {
HardLink hardLink = new HardLink();
// do the link
int diskLayoutVersion = this.getLayoutVersion();
if (DataNodeLayoutVersion.supports(
LayoutVersion.Feature.APPEND_RBW_DIR, diskLayoutVersion)) {
// hardlink finalized blocks in tmpDir/finalized
- linkBlocks(new File(fromDir, STORAGE_DIR_FINALIZED),
+ linkBlocks(datanode, new File(fromDir, STORAGE_DIR_FINALIZED),
new File(toDir, STORAGE_DIR_FINALIZED), diskLayoutVersion, hardLink);
// hardlink rbw blocks in tmpDir/rbw
- linkBlocks(new File(fromDir, STORAGE_DIR_RBW),
+ linkBlocks(datanode, new File(fromDir, STORAGE_DIR_RBW),
new File(toDir, STORAGE_DIR_RBW), diskLayoutVersion, hardLink);
} else { // pre-RBW version
// hardlink finalized blocks in tmpDir
- linkBlocks(fromDir, new File(toDir, STORAGE_DIR_FINALIZED),
+ linkBlocks(datanode, fromDir, new File(toDir, STORAGE_DIR_FINALIZED),
diskLayoutVersion, hardLink);
if (fromBbwDir.exists()) {
/*
@@ -770,15 +903,67 @@ public class DataStorage extends Storage
* NOT underneath the 'current' directory in those releases. See
* HDFS-3731 for details.
*/
- linkBlocks(fromBbwDir,
+ linkBlocks(datanode, fromBbwDir,
new File(toDir, STORAGE_DIR_RBW), diskLayoutVersion, hardLink);
}
}
LOG.info( hardLink.linkStats.report() );
}
+
+ private static class LinkArgs {
+ public File src;
+ public File dst;
+
+ public LinkArgs(File src, File dst) {
+ this.src = src;
+ this.dst = dst;
+ }
+ }
+
+ static void linkBlocks(DataNode datanode, File from, File to, int oldLV,
+ HardLink hl) throws IOException {
+ boolean upgradeToIdBasedLayout = false;
+ // If we are upgrading from a version older than the one where we introduced
+ // block ID-based layout AND we're working with the finalized directory,
+ // we'll need to upgrade from the old flat layout to the block ID-based one
+ if (oldLV > DataNodeLayoutVersion.Feature.BLOCKID_BASED_LAYOUT.getInfo().
+ getLayoutVersion() && to.getName().equals(STORAGE_DIR_FINALIZED)) {
+ upgradeToIdBasedLayout = true;
+ }
+
+ final List<LinkArgs> idBasedLayoutSingleLinks = Lists.newArrayList();
+ linkBlocksHelper(from, to, oldLV, hl, upgradeToIdBasedLayout, to,
+ idBasedLayoutSingleLinks);
+ int numLinkWorkers = datanode.getConf().getInt(
+ DFSConfigKeys.DFS_DATANODE_BLOCK_ID_LAYOUT_UPGRADE_THREADS_KEY,
+ DFSConfigKeys.DFS_DATANODE_BLOCK_ID_LAYOUT_UPGRADE_THREADS);
+ ExecutorService linkWorkers = Executors.newFixedThreadPool(numLinkWorkers);
+ final int step = idBasedLayoutSingleLinks.size() / numLinkWorkers + 1;
+ List<Future<Void>> futures = Lists.newArrayList();
+ for (int i = 0; i < idBasedLayoutSingleLinks.size(); i += step) {
+ final int iCopy = i;
+ futures.add(linkWorkers.submit(new Callable<Void>() {
+ @Override
+ public Void call() throws IOException {
+ int upperBound = Math.min(iCopy + step,
+ idBasedLayoutSingleLinks.size());
+ for (int j = iCopy; j < upperBound; j++) {
+ LinkArgs cur = idBasedLayoutSingleLinks.get(j);
+ NativeIO.link(cur.src, cur.dst);
+ }
+ return null;
+ }
+ }));
+ }
+ linkWorkers.shutdown();
+ for (Future<Void> f : futures) {
+ Futures.get(f, IOException.class);
+ }
+ }
- static void linkBlocks(File from, File to, int oldLV, HardLink hl)
- throws IOException {
+ static void linkBlocksHelper(File from, File to, int oldLV, HardLink hl,
+ boolean upgradeToIdBasedLayout, File blockRoot,
+ List<LinkArgs> idBasedLayoutSingleLinks) throws IOException {
if (!from.exists()) {
return;
}
@@ -805,9 +990,6 @@ public class DataStorage extends Storage
// from is a directory
hl.linkStats.countDirs++;
- if (!to.mkdirs())
- throw new IOException("Cannot create directory " + to);
-
String[] blockNames = from.list(new java.io.FilenameFilter() {
@Override
public boolean accept(File dir, String name) {
@@ -815,12 +997,36 @@ public class DataStorage extends Storage
}
});
+ // If we are upgrading to block ID-based layout, we don't want to recreate
+ // any subdirs from the source that contain blocks, since we have a new
+ // directory structure
+ if (!upgradeToIdBasedLayout || !to.getName().startsWith(
+ BLOCK_SUBDIR_PREFIX)) {
+ if (!to.mkdirs())
+ throw new IOException("Cannot create directory " + to);
+ }
+
// Block files just need hard links with the same file names
// but a different directory
if (blockNames.length > 0) {
- HardLink.createHardLinkMult(from, blockNames, to);
- hl.linkStats.countMultLinks++;
- hl.linkStats.countFilesMultLinks += blockNames.length;
+ if (upgradeToIdBasedLayout) {
+ for (String blockName : blockNames) {
+ long blockId = Block.getBlockId(blockName);
+ File blockLocation = DatanodeUtil.idToBlockDir(blockRoot, blockId);
+ if (!blockLocation.exists()) {
+ if (!blockLocation.mkdirs()) {
+ throw new IOException("Failed to mkdirs " + blockLocation);
+ }
+ }
+ idBasedLayoutSingleLinks.add(new LinkArgs(new File(from, blockName),
+ new File(blockLocation, blockName)));
+ hl.linkStats.countSingleLinks++;
+ }
+ } else {
+ HardLink.createHardLinkMult(from, blockNames, to);
+ hl.linkStats.countMultLinks++;
+ hl.linkStats.countFilesMultLinks += blockNames.length;
+ }
} else {
hl.linkStats.countEmptyDirs++;
}
@@ -834,8 +1040,9 @@ public class DataStorage extends Storage
}
});
for(int i = 0; i < otherNames.length; i++)
- linkBlocks(new File(from, otherNames[i]),
- new File(to, otherNames[i]), oldLV, hl);
+ linkBlocksHelper(new File(from, otherNames[i]),
+ new File(to, otherNames[i]), oldLV, hl, upgradeToIdBasedLayout,
+ blockRoot, idBasedLayoutSingleLinks);
}
/**
Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java?rev=1619019&r1=1619018&r2=1619019&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java Wed Aug 20 01:34:29 2014
@@ -103,7 +103,8 @@ class DataXceiver extends Receiver imple
private long opStartTime; //the start time of receiving an Op
private final InputStream socketIn;
private OutputStream socketOut;
-
+ private BlockReceiver blockReceiver = null;
+
/**
* Client Name used in previous operation. Not available on first request
* on the socket.
@@ -159,6 +160,12 @@ class DataXceiver extends Receiver imple
return socketOut;
}
+ public void sendOOB() throws IOException, InterruptedException {
+ LOG.info("Sending OOB to peer: " + peer);
+ if(blockReceiver!=null)
+ blockReceiver.sendOOB();
+ }
+
/**
* Read/write data from/to the DataXceiverServer.
*/
@@ -168,7 +175,7 @@ class DataXceiver extends Receiver imple
Op op = null;
try {
- dataXceiverServer.addPeer(peer, Thread.currentThread());
+ dataXceiverServer.addPeer(peer, Thread.currentThread(), this);
peer.setWriteTimeout(datanode.getDnConf().socketWriteTimeout);
InputStream input = socketIn;
IOStreamPair saslStreams = datanode.saslServer.receive(peer, socketOut,
@@ -584,7 +591,6 @@ class DataXceiver extends Receiver imple
DataOutputStream mirrorOut = null; // stream to next target
DataInputStream mirrorIn = null; // reply from next target
Socket mirrorSock = null; // socket to next target
- BlockReceiver blockReceiver = null; // responsible for data handling
String mirrorNode = null; // the name:port of next target
String firstBadLink = ""; // first datanode that failed in connection setup
Status mirrorInStatus = SUCCESS;
@@ -708,7 +714,7 @@ class DataXceiver extends Receiver imple
if (blockReceiver != null) {
String mirrorAddr = (mirrorSock == null) ? null : mirrorNode;
blockReceiver.receiveBlock(mirrorOut, mirrorIn, replyOut,
- mirrorAddr, null, targets);
+ mirrorAddr, null, targets, false);
// send close-ack for transfer-RBW/Finalized
if (isTransfer) {
@@ -747,6 +753,7 @@ class DataXceiver extends Receiver imple
IOUtils.closeStream(replyOut);
IOUtils.closeSocket(mirrorSock);
IOUtils.closeStream(blockReceiver);
+ blockReceiver = null;
}
//update metrics
@@ -983,7 +990,7 @@ class DataXceiver extends Receiver imple
String errMsg = null;
BlockReceiver blockReceiver = null;
DataInputStream proxyReply = null;
-
+ DataOutputStream replyOut = new DataOutputStream(getOutputStream());
try {
// get the output stream to the proxy
final String dnAddr = proxySource.getXferAddr(connectToDnViaHostname);
@@ -1040,8 +1047,8 @@ class DataXceiver extends Receiver imple
CachingStrategy.newDropBehind());
// receive a block
- blockReceiver.receiveBlock(null, null, null, null,
- dataXceiverServer.balanceThrottler, null);
+ blockReceiver.receiveBlock(null, null, replyOut, null,
+ dataXceiverServer.balanceThrottler, null, true);
// notify name node
datanode.notifyNamenodeReceivedBlock(
@@ -1076,6 +1083,7 @@ class DataXceiver extends Receiver imple
IOUtils.closeStream(proxyOut);
IOUtils.closeStream(blockReceiver);
IOUtils.closeStream(proxyReply);
+ IOUtils.closeStream(replyOut);
}
//update metrics
Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java?rev=1619019&r1=1619018&r2=1619019&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java Wed Aug 20 01:34:29 2014
@@ -27,11 +27,11 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.net.Peer;
import org.apache.hadoop.hdfs.net.PeerServer;
-import org.apache.hadoop.hdfs.server.balancer.Balancer;
import org.apache.hadoop.hdfs.util.DataTransferThrottler;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.Daemon;
+import com.google.common.annotations.VisibleForTesting;
/**
* Server used for receiving/sending a block of data.
@@ -45,6 +45,7 @@ class DataXceiverServer implements Runna
private final PeerServer peerServer;
private final DataNode datanode;
private final HashMap<Peer, Thread> peers = new HashMap<Peer, Thread>();
+ private final HashMap<Peer, DataXceiver> peersXceiver = new HashMap<Peer, DataXceiver>();
private boolean closed = false;
/**
@@ -217,18 +218,38 @@ class DataXceiverServer implements Runna
}
}
- synchronized void addPeer(Peer peer, Thread t) throws IOException {
+ synchronized void addPeer(Peer peer, Thread t, DataXceiver xceiver)
+ throws IOException {
if (closed) {
throw new IOException("Server closed.");
}
peers.put(peer, t);
+ peersXceiver.put(peer, xceiver);
}
synchronized void closePeer(Peer peer) {
peers.remove(peer);
+ peersXceiver.remove(peer);
IOUtils.cleanup(null, peer);
}
+ // Sending OOB to all peers
+ public synchronized void sendOOBToPeers() {
+ if (!datanode.shutdownForUpgrade) {
+ return;
+ }
+
+ for (Peer p : peers.keySet()) {
+ try {
+ peersXceiver.get(p).sendOOB();
+ } catch (IOException e) {
+ LOG.warn("Got error when sending OOB message.", e);
+ } catch (InterruptedException e) {
+ LOG.warn("Interrupted when sending OOB message.");
+ }
+ }
+ }
+
// Notify all peers of the shutdown and restart.
// datanode.shouldRun should still be true and datanode.restarting should
// be set true before calling this method.
@@ -247,6 +268,7 @@ class DataXceiverServer implements Runna
IOUtils.cleanup(LOG, p);
}
peers.clear();
+ peersXceiver.clear();
}
// Return the number of peers.
@@ -254,7 +276,14 @@ class DataXceiverServer implements Runna
return peers.size();
}
+ // Return the number of peers and DataXceivers.
+ @VisibleForTesting
+ synchronized int getNumPeersXceiver() {
+ return peersXceiver.size();
+ }
+
synchronized void releasePeer(Peer peer) {
peers.remove(peer);
+ peersXceiver.remove(peer);
}
}
Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DatanodeUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DatanodeUtil.java?rev=1619019&r1=1619018&r2=1619019&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DatanodeUtil.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DatanodeUtil.java Wed Aug 20 01:34:29 2014
@@ -30,6 +30,8 @@ public class DatanodeUtil {
public static final String DISK_ERROR = "Possible disk error: ";
+ private static final String SEP = System.getProperty("file.separator");
+
/** Get the cause of an I/O exception if caused by a possible disk error
* @param ioe an I/O exception
* @return cause if the I/O exception is caused by a possible disk error;
@@ -78,4 +80,38 @@ public class DatanodeUtil {
public static File getUnlinkTmpFile(File f) {
return new File(f.getParentFile(), f.getName()+UNLINK_BLOCK_SUFFIX);
}
+
+ /**
+ * Checks whether there are any files anywhere in the directory tree rooted
+ * at dir (directories don't count as files). dir must exist
+ * @return true if there are no files
+ * @throws IOException if unable to list subdirectories
+ */
+ public static boolean dirNoFilesRecursive(File dir) throws IOException {
+ File[] contents = dir.listFiles();
+ if (contents == null) {
+ throw new IOException("Cannot list contents of " + dir);
+ }
+ for (File f : contents) {
+ if (!f.isDirectory() || (f.isDirectory() && !dirNoFilesRecursive(f))) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ /**
+ * Get the directory where a finalized block with this ID should be stored.
+ * Do not attempt to create the directory.
+ * @param root the root directory where finalized blocks are stored
+ * @param blockId
+ * @return
+ */
+ public static File idToBlockDir(File root, long blockId) {
+ int d1 = (int)((blockId >> 16) & 0xff);
+ int d2 = (int)((blockId >> 8) & 0xff);
+ String path = DataStorage.BLOCK_SUBDIR_PREFIX + d1 + SEP +
+ DataStorage.BLOCK_SUBDIR_PREFIX + d2;
+ return new File(root, path);
+ }
}
Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java?rev=1619019&r1=1619018&r2=1619019&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java Wed Aug 20 01:34:29 2014
@@ -54,10 +54,10 @@ abstract public class ReplicaInfo extend
private File baseDir;
/**
- * Ints representing the sub directory path from base dir to the directory
- * containing this replica.
+ * Whether or not this replica's parent directory includes subdirs, in which
+ * case we can generate them based on the replica's block ID
*/
- private int[] subDirs;
+ private boolean hasSubdirs;
private static final Map<String, File> internedBaseDirs = new HashMap<String, File>();
@@ -151,18 +151,8 @@ abstract public class ReplicaInfo extend
* @return the parent directory path where this replica is located
*/
File getDir() {
- if (subDirs == null) {
- return null;
- }
-
- StringBuilder sb = new StringBuilder();
- for (int i : subDirs) {
- sb.append(DataStorage.BLOCK_SUBDIR_PREFIX);
- sb.append(i);
- sb.append("/");
- }
- File ret = new File(baseDir, sb.toString());
- return ret;
+ return hasSubdirs ? DatanodeUtil.idToBlockDir(baseDir,
+ getBlockId()) : baseDir;
}
/**
@@ -175,54 +165,46 @@ abstract public class ReplicaInfo extend
private void setDirInternal(File dir) {
if (dir == null) {
- subDirs = null;
baseDir = null;
return;
}
- ReplicaDirInfo replicaDirInfo = parseSubDirs(dir);
- this.subDirs = replicaDirInfo.subDirs;
+ ReplicaDirInfo dirInfo = parseBaseDir(dir);
+ this.hasSubdirs = dirInfo.hasSubidrs;
synchronized (internedBaseDirs) {
- if (!internedBaseDirs.containsKey(replicaDirInfo.baseDirPath)) {
+ if (!internedBaseDirs.containsKey(dirInfo.baseDirPath)) {
// Create a new String path of this file and make a brand new File object
// to guarantee we drop the reference to the underlying char[] storage.
- File baseDir = new File(replicaDirInfo.baseDirPath);
- internedBaseDirs.put(replicaDirInfo.baseDirPath, baseDir);
+ File baseDir = new File(dirInfo.baseDirPath);
+ internedBaseDirs.put(dirInfo.baseDirPath, baseDir);
}
- this.baseDir = internedBaseDirs.get(replicaDirInfo.baseDirPath);
+ this.baseDir = internedBaseDirs.get(dirInfo.baseDirPath);
}
}
-
+
@VisibleForTesting
public static class ReplicaDirInfo {
- @VisibleForTesting
public String baseDirPath;
-
- @VisibleForTesting
- public int[] subDirs;
+ public boolean hasSubidrs;
+
+ public ReplicaDirInfo (String baseDirPath, boolean hasSubidrs) {
+ this.baseDirPath = baseDirPath;
+ this.hasSubidrs = hasSubidrs;
+ }
}
@VisibleForTesting
- public static ReplicaDirInfo parseSubDirs(File dir) {
- ReplicaDirInfo ret = new ReplicaDirInfo();
+ public static ReplicaDirInfo parseBaseDir(File dir) {
File currentDir = dir;
- List<Integer> subDirList = new ArrayList<Integer>();
+ boolean hasSubdirs = false;
while (currentDir.getName().startsWith(DataStorage.BLOCK_SUBDIR_PREFIX)) {
- // Prepend the integer into the list.
- subDirList.add(0, Integer.parseInt(currentDir.getName().replaceFirst(
- DataStorage.BLOCK_SUBDIR_PREFIX, "")));
+ hasSubdirs = true;
currentDir = currentDir.getParentFile();
}
- ret.subDirs = new int[subDirList.size()];
- for (int i = 0; i < subDirList.size(); i++) {
- ret.subDirs[i] = subDirList.get(i);
- }
-
- ret.baseDirPath = currentDir.getAbsolutePath();
- return ret;
+ return new ReplicaDirInfo(currentDir.getAbsolutePath(), hasSubdirs);
}
/**
Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ShortCircuitRegistry.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ShortCircuitRegistry.java?rev=1619019&r1=1619018&r2=1619019&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ShortCircuitRegistry.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ShortCircuitRegistry.java Wed Aug 20 01:34:29 2014
@@ -74,7 +74,7 @@ import com.google.common.collect.HashMul
* DN also marks the block's slots as "unanchorable" to prevent additional
* clients from initiating these operations in the future.
*
- * The counterpart fo this class on the client is {@link DfsClientShmManager}.
+ * The counterpart of this class on the client is {@link DfsClientShmManager}.
*/
public class ShortCircuitRegistry {
public static final Log LOG = LogFactory.getLog(ShortCircuitRegistry.class);
@@ -217,7 +217,32 @@ public class ShortCircuitRegistry {
}
return allowMunlock;
}
-
+
+ /**
+ * Invalidate any slot associated with a blockId that we are invalidating
+ * (deleting) from this DataNode. When a slot is invalid, the DFSClient will
+ * not use the corresponding replica for new read or mmap operations (although
+ * existing, ongoing read or mmap operations will complete.)
+ *
+ * @param blockId The block ID.
+ */
+ public synchronized void processBlockInvalidation(ExtendedBlockId blockId) {
+ if (!enabled) return;
+ final Set<Slot> affectedSlots = slots.get(blockId);
+ if (!affectedSlots.isEmpty()) {
+ final StringBuilder bld = new StringBuilder();
+ String prefix = "";
+ bld.append("Block ").append(blockId).append(" has been invalidated. ").
+ append("Marking short-circuit slots as invalid: ");
+ for (Slot slot : affectedSlots) {
+ slot.makeInvalid();
+ bld.append(prefix).append(slot.toString());
+ prefix = ", ";
+ }
+ LOG.info(bld.toString());
+ }
+ }
+
public static class NewShmInfo implements Closeable {
public final ShmId shmId;
public final FileInputStream stream;
Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StorageLocation.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StorageLocation.java?rev=1619019&r1=1619018&r2=1619019&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StorageLocation.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StorageLocation.java Wed Aug 20 01:34:29 2014
@@ -78,7 +78,7 @@ public class StorageLocation {
* @return A StorageLocation object if successfully parsed, null otherwise.
* Does not throw any exceptions.
*/
- static StorageLocation parse(String rawLocation)
+ public static StorageLocation parse(String rawLocation)
throws IOException, SecurityException {
Matcher matcher = regex.matcher(rawLocation);
StorageType storageType = StorageType.DEFAULT;
Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java?rev=1619019&r1=1619018&r2=1619019&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java Wed Aug 20 01:34:29 2014
@@ -22,6 +22,7 @@ import java.io.File;
import java.io.FileDescriptor;
import java.io.IOException;
import java.io.InputStream;
+import java.util.Collection;
import java.util.List;
import java.util.Map;
@@ -39,6 +40,7 @@ import org.apache.hadoop.hdfs.server.dat
import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica;
import org.apache.hadoop.hdfs.server.datanode.Replica;
import org.apache.hadoop.hdfs.server.datanode.ReplicaInPipelineInterface;
+import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetFactory;
import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
@@ -91,6 +93,10 @@ public interface FsDatasetSpi<V extends
/** @return a list of volumes. */
public List<V> getVolumes();
+ /** Add an array of StorageLocation to FsDataset. */
+ public void addVolumes(Collection<StorageLocation> volumes)
+ throws IOException;
+
/** @return a storage with the given storage ID */
public DatanodeStorage getStorage(final String storageUuid);
Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java?rev=1619019&r1=1619018&r2=1619019&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java Wed Aug 20 01:34:29 2014
@@ -59,7 +59,8 @@ class BlockPoolSlice {
private final String bpid;
private final FsVolumeImpl volume; // volume to which this BlockPool belongs to
private final File currentDir; // StorageDirectory/current/bpid/current
- private final LDir finalizedDir; // directory store Finalized replica
+ // directory where finalized replicas are stored
+ private final File finalizedDir;
private final File rbwDir; // directory store RBW replica
private final File tmpDir; // directory store Temporary replica
private static final String DU_CACHE_FILE = "dfsUsed";
@@ -82,8 +83,13 @@ class BlockPoolSlice {
this.bpid = bpid;
this.volume = volume;
this.currentDir = new File(bpDir, DataStorage.STORAGE_DIR_CURRENT);
- final File finalizedDir = new File(
+ this.finalizedDir = new File(
currentDir, DataStorage.STORAGE_DIR_FINALIZED);
+ if (!this.finalizedDir.exists()) {
+ if (!this.finalizedDir.mkdirs()) {
+ throw new IOException("Failed to mkdirs " + this.finalizedDir);
+ }
+ }
// Files that were being written when the datanode was last shutdown
// are now moved back to the data directory. It is possible that
@@ -95,10 +101,6 @@ class BlockPoolSlice {
FileUtil.fullyDelete(tmpDir);
}
this.rbwDir = new File(currentDir, DataStorage.STORAGE_DIR_RBW);
- final int maxBlocksPerDir = conf.getInt(
- DFSConfigKeys.DFS_DATANODE_NUMBLOCKS_KEY,
- DFSConfigKeys.DFS_DATANODE_NUMBLOCKS_DEFAULT);
- this.finalizedDir = new LDir(finalizedDir, maxBlocksPerDir);
if (!rbwDir.mkdirs()) { // create rbw directory if not exist
if (!rbwDir.isDirectory()) {
throw new IOException("Mkdirs failed to create " + rbwDir.toString());
@@ -131,7 +133,7 @@ class BlockPoolSlice {
}
File getFinalizedDir() {
- return finalizedDir.dir;
+ return finalizedDir;
}
File getRbwDir() {
@@ -239,26 +241,57 @@ class BlockPoolSlice {
}
File addBlock(Block b, File f) throws IOException {
- File blockFile = finalizedDir.addBlock(b, f);
+ File blockDir = DatanodeUtil.idToBlockDir(finalizedDir, b.getBlockId());
+ if (!blockDir.exists()) {
+ if (!blockDir.mkdirs()) {
+ throw new IOException("Failed to mkdirs " + blockDir);
+ }
+ }
+ File blockFile = FsDatasetImpl.moveBlockFiles(b, f, blockDir);
File metaFile = FsDatasetUtil.getMetaFile(blockFile, b.getGenerationStamp());
dfsUsage.incDfsUsed(b.getNumBytes()+metaFile.length());
return blockFile;
}
void checkDirs() throws DiskErrorException {
- finalizedDir.checkDirTree();
+ DiskChecker.checkDirs(finalizedDir);
DiskChecker.checkDir(tmpDir);
DiskChecker.checkDir(rbwDir);
}
void getVolumeMap(ReplicaMap volumeMap) throws IOException {
// add finalized replicas
- finalizedDir.getVolumeMap(bpid, volumeMap, volume);
+ addToReplicasMap(volumeMap, finalizedDir, true);
// add rbw replicas
addToReplicasMap(volumeMap, rbwDir, false);
}
/**
+ * Recover an unlinked tmp file on datanode restart. If the original block
+ * does not exist, then the tmp file is renamed to be the
+ * original file name and the original name is returned; otherwise the tmp
+ * file is deleted and null is returned.
+ */
+ File recoverTempUnlinkedBlock(File unlinkedTmp) throws IOException {
+ File blockFile = FsDatasetUtil.getOrigFile(unlinkedTmp);
+ if (blockFile.exists()) {
+ // If the original block file still exists, then no recovery is needed.
+ if (!unlinkedTmp.delete()) {
+ throw new IOException("Unable to cleanup unlinked tmp file " +
+ unlinkedTmp);
+ }
+ return null;
+ } else {
+ if (!unlinkedTmp.renameTo(blockFile)) {
+ throw new IOException("Unable to rename unlinked tmp file " +
+ unlinkedTmp);
+ }
+ return blockFile;
+ }
+ }
+
+
+ /**
* Add replicas under the given directory to the volume map
* @param volumeMap the replicas map
* @param dir an input directory
@@ -267,23 +300,34 @@ class BlockPoolSlice {
*/
void addToReplicasMap(ReplicaMap volumeMap, File dir, boolean isFinalized
) throws IOException {
- File blockFiles[] = FileUtil.listFiles(dir);
- for (File blockFile : blockFiles) {
- if (!Block.isBlockFilename(blockFile))
+ File files[] = FileUtil.listFiles(dir);
+ for (File file : files) {
+ if (file.isDirectory()) {
+ addToReplicasMap(volumeMap, file, isFinalized);
+ }
+
+ if (isFinalized && FsDatasetUtil.isUnlinkTmpFile(file)) {
+ file = recoverTempUnlinkedBlock(file);
+ if (file == null) { // the original block still exists, so we cover it
+ // in another iteration and can continue here
+ continue;
+ }
+ }
+ if (!Block.isBlockFilename(file))
continue;
long genStamp = FsDatasetUtil.getGenerationStampFromFile(
- blockFiles, blockFile);
- long blockId = Block.filename2id(blockFile.getName());
+ files, file);
+ long blockId = Block.filename2id(file.getName());
ReplicaInfo newReplica = null;
if (isFinalized) {
newReplica = new FinalizedReplica(blockId,
- blockFile.length(), genStamp, volume, blockFile.getParentFile());
+ file.length(), genStamp, volume, file.getParentFile());
} else {
boolean loadRwr = true;
- File restartMeta = new File(blockFile.getParent() +
- File.pathSeparator + "." + blockFile.getName() + ".restart");
+ File restartMeta = new File(file.getParent() +
+ File.pathSeparator + "." + file.getName() + ".restart");
Scanner sc = null;
try {
sc = new Scanner(restartMeta);
@@ -291,8 +335,8 @@ class BlockPoolSlice {
if (sc.hasNextLong() && (sc.nextLong() > Time.now())) {
// It didn't expire. Load the replica as a RBW.
newReplica = new ReplicaBeingWritten(blockId,
- validateIntegrityAndSetLength(blockFile, genStamp),
- genStamp, volume, blockFile.getParentFile(), null);
+ validateIntegrityAndSetLength(file, genStamp),
+ genStamp, volume, file.getParentFile(), null);
loadRwr = false;
}
sc.close();
@@ -301,7 +345,7 @@ class BlockPoolSlice {
restartMeta.getPath());
}
} catch (FileNotFoundException fnfe) {
- // nothing to do here
+ // nothing to do hereFile dir =
} finally {
if (sc != null) {
sc.close();
@@ -310,15 +354,15 @@ class BlockPoolSlice {
// Restart meta doesn't exist or expired.
if (loadRwr) {
newReplica = new ReplicaWaitingToBeRecovered(blockId,
- validateIntegrityAndSetLength(blockFile, genStamp),
- genStamp, volume, blockFile.getParentFile());
+ validateIntegrityAndSetLength(file, genStamp),
+ genStamp, volume, file.getParentFile());
}
}
ReplicaInfo oldReplica = volumeMap.add(bpid, newReplica);
if (oldReplica != null) {
FsDatasetImpl.LOG.warn("Two block files with the same block id exist " +
- "on disk: " + oldReplica.getBlockFile() + " and " + blockFile );
+ "on disk: " + oldReplica.getBlockFile() + " and " + file );
}
}
}
@@ -405,10 +449,6 @@ class BlockPoolSlice {
}
}
- void clearPath(File f) {
- finalizedDir.clearPath(f);
- }
-
@Override
public String toString() {
return currentDir.getAbsolutePath();
Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java?rev=1619019&r1=1619018&r2=1619019&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java Wed Aug 20 01:34:29 2014
@@ -61,6 +61,7 @@ class FsDatasetAsyncDiskService {
private static final long THREADS_KEEP_ALIVE_SECONDS = 60;
private final DataNode datanode;
+ private final ThreadGroup threadGroup;
private Map<File, ThreadPoolExecutor> executors
= new HashMap<File, ThreadPoolExecutor>();
@@ -70,42 +71,52 @@ class FsDatasetAsyncDiskService {
*
* The AsyncDiskServices uses one ThreadPool per volume to do the async
* disk operations.
- *
- * @param volumes The roots of the data volumes.
*/
- FsDatasetAsyncDiskService(DataNode datanode, File[] volumes) {
+ FsDatasetAsyncDiskService(DataNode datanode) {
this.datanode = datanode;
+ this.threadGroup = new ThreadGroup(getClass().getSimpleName());
+ }
+
+ private void addExecutorForVolume(final File volume) {
+ ThreadFactory threadFactory = new ThreadFactory() {
+ int counter = 0;
+
+ @Override
+ public Thread newThread(Runnable r) {
+ int thisIndex;
+ synchronized (this) {
+ thisIndex = counter++;
+ }
+ Thread t = new Thread(threadGroup, r);
+ t.setName("Async disk worker #" + thisIndex +
+ " for volume " + volume);
+ return t;
+ }
+ };
- final ThreadGroup threadGroup = new ThreadGroup(getClass().getSimpleName());
- // Create one ThreadPool per volume
- for (int v = 0 ; v < volumes.length; v++) {
- final File vol = volumes[v];
- ThreadFactory threadFactory = new ThreadFactory() {
- int counter = 0;
-
- @Override
- public Thread newThread(Runnable r) {
- int thisIndex;
- synchronized (this) {
- thisIndex = counter++;
- }
- Thread t = new Thread(threadGroup, r);
- t.setName("Async disk worker #" + thisIndex +
- " for volume " + vol);
- return t;
- }
- };
-
- ThreadPoolExecutor executor = new ThreadPoolExecutor(
- CORE_THREADS_PER_VOLUME, MAXIMUM_THREADS_PER_VOLUME,
- THREADS_KEEP_ALIVE_SECONDS, TimeUnit.SECONDS,
- new LinkedBlockingQueue<Runnable>(), threadFactory);
-
- // This can reduce the number of running threads
- executor.allowCoreThreadTimeOut(true);
- executors.put(vol, executor);
+ ThreadPoolExecutor executor = new ThreadPoolExecutor(
+ CORE_THREADS_PER_VOLUME, MAXIMUM_THREADS_PER_VOLUME,
+ THREADS_KEEP_ALIVE_SECONDS, TimeUnit.SECONDS,
+ new LinkedBlockingQueue<Runnable>(), threadFactory);
+
+ // This can reduce the number of running threads
+ executor.allowCoreThreadTimeOut(true);
+ executors.put(volume, executor);
+ }
+
+ /**
+ * Starts AsyncDiskService for a new volume
+ * @param volume the root of the new data volume.
+ */
+ synchronized void addVolume(File volume) {
+ if (executors == null) {
+ throw new RuntimeException("AsyncDiskService is already shutdown");
}
-
+ ThreadPoolExecutor executor = executors.get(volume);
+ if (executor != null) {
+ throw new RuntimeException("Volume " + volume + " is already existed.");
+ }
+ addExecutorForVolume(volume);
}
synchronized long countPendingDeletions() {
Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java?rev=1619019&r1=1619018&r2=1619019&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java Wed Aug 20 01:34:29 2014
@@ -44,6 +44,7 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.ExtendedBlockId;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.StorageType;
import org.apache.hadoop.hdfs.protocol.Block;
@@ -201,6 +202,7 @@ class FsDatasetImpl implements FsDataset
final Map<String, DatanodeStorage> storageMap;
final FsDatasetAsyncDiskService asyncDiskService;
final FsDatasetCache cacheManager;
+ private final Configuration conf;
private final int validVolsRequired;
final ReplicaMap volumeMap;
@@ -215,6 +217,7 @@ class FsDatasetImpl implements FsDataset
) throws IOException {
this.datanode = datanode;
this.dataStorage = storage;
+ this.conf = conf;
// The number of volumes required for operation is the total number
// of volumes minus the number of failed volumes we can tolerate.
final int volFailuresTolerated =
@@ -241,38 +244,76 @@ class FsDatasetImpl implements FsDataset
}
storageMap = new HashMap<String, DatanodeStorage>();
- final List<FsVolumeImpl> volArray = new ArrayList<FsVolumeImpl>(
- storage.getNumStorageDirs());
- for (int idx = 0; idx < storage.getNumStorageDirs(); idx++) {
- Storage.StorageDirectory sd = storage.getStorageDir(idx);
- final File dir = sd.getCurrentDir();
- final StorageType storageType = getStorageTypeFromLocations(dataLocations, sd.getRoot());
- volArray.add(new FsVolumeImpl(this, sd.getStorageUuid(), dir, conf,
- storageType));
- LOG.info("Added volume - " + dir + ", StorageType: " + storageType);
- storageMap.put(sd.getStorageUuid(),
- new DatanodeStorage(sd.getStorageUuid(), DatanodeStorage.State.NORMAL, storageType));
- }
volumeMap = new ReplicaMap(this);
-
@SuppressWarnings("unchecked")
final VolumeChoosingPolicy<FsVolumeImpl> blockChooserImpl =
ReflectionUtils.newInstance(conf.getClass(
DFSConfigKeys.DFS_DATANODE_FSDATASET_VOLUME_CHOOSING_POLICY_KEY,
RoundRobinVolumeChoosingPolicy.class,
VolumeChoosingPolicy.class), conf);
- volumes = new FsVolumeList(volArray, volsFailed, blockChooserImpl);
- volumes.initializeReplicaMaps(volumeMap);
+ volumes = new FsVolumeList(volsFailed, blockChooserImpl);
+ asyncDiskService = new FsDatasetAsyncDiskService(datanode);
- File[] roots = new File[storage.getNumStorageDirs()];
for (int idx = 0; idx < storage.getNumStorageDirs(); idx++) {
- roots[idx] = storage.getStorageDir(idx).getCurrentDir();
+ addVolume(dataLocations, storage.getStorageDir(idx));
}
- asyncDiskService = new FsDatasetAsyncDiskService(datanode, roots);
+
cacheManager = new FsDatasetCache(this);
registerMBean(datanode.getDatanodeUuid());
}
+ private void addVolume(Collection<StorageLocation> dataLocations,
+ Storage.StorageDirectory sd) throws IOException {
+ final File dir = sd.getCurrentDir();
+ final StorageType storageType =
+ getStorageTypeFromLocations(dataLocations, sd.getRoot());
+
+ // If IOException raises from FsVolumeImpl() or getVolumeMap(), there is
+ // nothing needed to be rolled back to make various data structures, e.g.,
+ // storageMap and asyncDiskService, consistent.
+ FsVolumeImpl fsVolume = new FsVolumeImpl(
+ this, sd.getStorageUuid(), dir, this.conf, storageType);
+ fsVolume.getVolumeMap(volumeMap);
+
+ volumes.addVolume(fsVolume);
+ storageMap.put(sd.getStorageUuid(),
+ new DatanodeStorage(sd.getStorageUuid(),
+ DatanodeStorage.State.NORMAL,
+ storageType));
+ asyncDiskService.addVolume(sd.getCurrentDir());
+
+ LOG.info("Added volume - " + dir + ", StorageType: " + storageType);
+ }
+
+ /**
+ * Add an array of StorageLocation to FsDataset.
+ *
+ * @pre dataStorage must have these volumes.
+ * @param volumes
+ * @throws IOException
+ */
+ @Override
+ public synchronized void addVolumes(Collection<StorageLocation> volumes)
+ throws IOException {
+ final Collection<StorageLocation> dataLocations =
+ DataNode.getStorageLocations(this.conf);
+ Map<String, Storage.StorageDirectory> allStorageDirs =
+ new HashMap<String, Storage.StorageDirectory>();
+ for (int idx = 0; idx < dataStorage.getNumStorageDirs(); idx++) {
+ Storage.StorageDirectory sd = dataStorage.getStorageDir(idx);
+ allStorageDirs.put(sd.getRoot().getAbsolutePath(), sd);
+ }
+
+ for (StorageLocation vol : volumes) {
+ String key = vol.getFile().getAbsolutePath();
+ if (!allStorageDirs.containsKey(key)) {
+ LOG.warn("Attempt to add an invalid volume: " + vol.getFile());
+ } else {
+ addVolume(dataLocations, allStorageDirs.get(key));
+ }
+ }
+ }
+
private StorageType getStorageTypeFromLocations(
Collection<StorageLocation> dataLocations, File dir) {
for (StorageLocation dataLocation : dataLocations) {
@@ -1150,7 +1191,7 @@ class FsDatasetImpl implements FsDataset
return f;
// if file is not null, but doesn't exist - possibly disk failed
- datanode.checkDiskError();
+ datanode.checkDiskErrorAsync();
}
if (LOG.isDebugEnabled()) {
@@ -1223,17 +1264,17 @@ class FsDatasetImpl implements FsDataset
+ ". Parent not found for file " + f);
continue;
}
- ReplicaState replicaState = info.getState();
- if (replicaState == ReplicaState.FINALIZED ||
- (replicaState == ReplicaState.RUR &&
- ((ReplicaUnderRecovery)info).getOriginalReplica().getState() ==
- ReplicaState.FINALIZED)) {
- v.clearPath(bpid, parent);
- }
volumeMap.remove(bpid, invalidBlks[i]);
}
+
+ // If a DFSClient has the replica in its cache of short-circuit file
+ // descriptors (and the client is using ShortCircuitShm), invalidate it.
+ datanode.getShortCircuitRegistry().processBlockInvalidation(
+ new ExtendedBlockId(invalidBlks[i].getBlockId(), bpid));
+
// If the block is cached, start uncaching it.
cacheManager.uncacheBlock(bpid, invalidBlks[i].getBlockId());
+
// Delete the block asynchronously to make sure we can do it fast enough.
// It's ok to unlink the block file before the uncache operation
// finishes.
Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java?rev=1619019&r1=1619018&r2=1619019&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java Wed Aug 20 01:34:29 2014
@@ -37,6 +37,7 @@ import org.apache.hadoop.hdfs.DFSConfigK
import org.apache.hadoop.hdfs.StorageType;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.server.datanode.DataStorage;
+import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
@@ -235,10 +236,6 @@ class FsVolumeImpl implements FsVolumeSp
// dfsUsage.incDfsUsed(b.getNumBytes()+metaFile.length());
bp.addToReplicasMap(volumeMap, dir, isFinalized);
}
-
- void clearPath(String bpid, File f) throws IOException {
- getBlockPoolSlice(bpid).clearPath(f);
- }
@Override
public String toString() {
@@ -274,7 +271,8 @@ class FsVolumeImpl implements FsVolumeSp
File finalizedDir = new File(bpCurrentDir,
DataStorage.STORAGE_DIR_FINALIZED);
File rbwDir = new File(bpCurrentDir, DataStorage.STORAGE_DIR_RBW);
- if (finalizedDir.exists() && FileUtil.list(finalizedDir).length != 0) {
+ if (finalizedDir.exists() && !DatanodeUtil.dirNoFilesRecursive(
+ finalizedDir)) {
return false;
}
if (rbwDir.exists() && FileUtil.list(rbwDir).length != 0) {
@@ -301,7 +299,8 @@ class FsVolumeImpl implements FsVolumeSp
if (!rbwDir.delete()) {
throw new IOException("Failed to delete " + rbwDir);
}
- if (!finalizedDir.delete()) {
+ if (!DatanodeUtil.dirNoFilesRecursive(finalizedDir) ||
+ !FileUtil.fullyDelete(finalizedDir)) {
throw new IOException("Failed to delete " + finalizedDir);
}
FileUtil.fullyDelete(tmpDir);
Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java?rev=1619019&r1=1619018&r2=1619019&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java Wed Aug 20 01:34:29 2014
@@ -40,9 +40,8 @@ class FsVolumeList {
private final VolumeChoosingPolicy<FsVolumeImpl> blockChooser;
private volatile int numFailedVolumes;
- FsVolumeList(List<FsVolumeImpl> volumes, int failedVols,
+ FsVolumeList(int failedVols,
VolumeChoosingPolicy<FsVolumeImpl> blockChooser) {
- this.volumes = Collections.unmodifiableList(volumes);
this.blockChooser = blockChooser;
this.numFailedVolumes = failedVols;
}
@@ -101,12 +100,6 @@ class FsVolumeList {
}
return remaining;
}
-
- void initializeReplicaMaps(ReplicaMap globalReplicaMap) throws IOException {
- for (FsVolumeImpl v : volumes) {
- v.getVolumeMap(globalReplicaMap);
- }
- }
void getAllVolumesMap(final String bpid, final ReplicaMap volumeMap) throws IOException {
long totalStartTime = Time.monotonicNow();
@@ -205,6 +198,19 @@ class FsVolumeList {
return volumes.toString();
}
+ /**
+ * Dynamically add new volumes to the existing volumes that this DN manages.
+ * @param newVolume the instance of new FsVolumeImpl.
+ */
+ synchronized void addVolume(FsVolumeImpl newVolume) {
+ // Make a copy of volumes to add new volumes.
+ final List<FsVolumeImpl> volumeList = volumes == null ?
+ new ArrayList<FsVolumeImpl>() :
+ new ArrayList<FsVolumeImpl>(volumes);
+ volumeList.add(newVolume);
+ volumes = Collections.unmodifiableList(volumeList);
+ FsDatasetImpl.LOG.info("Added new volume: " + newVolume.toString());
+ }
void addBlockPool(final String bpid, final Configuration conf) throws IOException {
long totalStartTime = Time.monotonicNow();
Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java?rev=1619019&r1=1619018&r2=1619019&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java Wed Aug 20 01:34:29 2014
@@ -764,8 +764,6 @@ public class FSDirectory implements Clos
checkSnapshot(srcInode, null);
}
-
-
private class RenameOperation {
private final INodesInPath srcIIP;
private final INodesInPath dstIIP;
@@ -798,7 +796,7 @@ public class FSDirectory implements Clos
// snapshot is taken on the dst tree, changes will be recorded in the latest
// snapshot of the src tree.
if (isSrcInSnapshot) {
- srcChild = srcChild.recordModification(srcIIP.getLatestSnapshotId());
+ srcChild.recordModification(srcIIP.getLatestSnapshotId());
}
// check srcChild for reference
@@ -928,8 +926,7 @@ public class FSDirectory implements Clos
updateCount(iip, 0, dsDelta, true);
}
- file = file.setFileReplication(replication, iip.getLatestSnapshotId(),
- inodeMap);
+ file.setFileReplication(replication, iip.getLatestSnapshotId());
final short newBR = file.getBlockReplication();
// check newBR < oldBR case.
@@ -1081,9 +1078,6 @@ public class FSDirectory implements Clos
count++;
}
- // update inodeMap
- removeFromInodeMap(Arrays.asList(allSrcInodes));
-
trgInode.setModificationTime(timestamp, trgLatestSnapshot);
trgParent.updateModificationTime(timestamp, trgLatestSnapshot);
// update quota on the parent directory ('count' files removed, 0 space)
@@ -1215,8 +1209,7 @@ public class FSDirectory implements Clos
// record modification
final int latestSnapshot = iip.getLatestSnapshotId();
- targetNode = targetNode.recordModification(latestSnapshot);
- iip.setLastINode(targetNode);
+ targetNode.recordModification(latestSnapshot);
// Remove the node from the namespace
long removed = removeLastINode(iip);
@@ -2125,7 +2118,7 @@ public class FSDirectory implements Clos
}
final int latest = iip.getLatestSnapshotId();
- dirNode = dirNode.recordModification(latest);
+ dirNode.recordModification(latest);
dirNode.setQuota(nsQuota, dsQuota);
return dirNode;
}
Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1619019&r1=1619018&r2=1619019&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Wed Aug 20 01:34:29 2014
@@ -62,6 +62,8 @@ import static org.apache.hadoop.hdfs.DFS
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_MAX_OBJECTS_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_MAX_OBJECTS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RANDOMIZE_BLOCK_LOCATIONS_PER_BLOCK;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RANDOMIZE_BLOCK_LOCATIONS_PER_BLOCK_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REPL_QUEUE_THRESHOLD_PCT_KEY;
@@ -83,9 +85,6 @@ import static org.apache.hadoop.hdfs.DFS
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROUP_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RANDOMIZE_BLOCK_LOCATIONS_PER_BLOCK;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RANDOMIZE_BLOCK_LOCATIONS_PER_BLOCK_DEFAULT;
-
import static org.apache.hadoop.util.Time.now;
import java.io.BufferedWriter;
@@ -231,6 +230,7 @@ import org.apache.hadoop.hdfs.server.nam
import org.apache.hadoop.hdfs.server.namenode.web.resources.NamenodeWebHdfsMethods;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
import org.apache.hadoop.hdfs.server.protocol.NNHAStatusHeartbeat;
import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand;
@@ -2516,7 +2516,7 @@ public class FSNamesystem implements Nam
boolean writeToEditLog,
int latestSnapshot, boolean logRetryCache)
throws IOException {
- file = file.recordModification(latestSnapshot);
+ file.recordModification(latestSnapshot);
final INodeFile cons = file.toUnderConstruction(leaseHolder, clientMachine);
leaseManager.addLease(cons.getFileUnderConstructionFeature()
@@ -3720,8 +3720,10 @@ public class FSNamesystem implements Nam
StandbyException, IOException {
FSPermissionChecker pc = getPermissionChecker();
checkOperation(OperationCategory.READ);
+ byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
readLock();
try {
+ src = FSDirectory.resolvePath(src, pathComponents, dir);
checkOperation(OperationCategory.READ);
if (isPermissionEnabled) {
checkTraverse(pc, src);
@@ -4209,7 +4211,7 @@ public class FSNamesystem implements Nam
Preconditions.checkArgument(uc != null);
leaseManager.removeLease(uc.getClientName(), src);
- pendingFile = pendingFile.recordModification(latestSnapshot);
+ pendingFile.recordModification(latestSnapshot);
// The file is no longer pending.
// Create permanent INode, update blocks. No need to replace the inode here
@@ -4298,7 +4300,30 @@ public class FSNamesystem implements Nam
throw new IOException("Block (=" + lastblock + ") not found");
}
}
- INodeFile iFile = ((INode)storedBlock.getBlockCollection()).asFile();
+ //
+ // The implementation of delete operation (see @deleteInternal method)
+ // first removes the file paths from namespace, and delays the removal
+ // of blocks to later time for better performance. When
+ // commitBlockSynchronization (this method) is called in between, the
+ // blockCollection of storedBlock could have been assigned to null by
+ // the delete operation, throw IOException here instead of NPE; if the
+ // file path is already removed from namespace by the delete operation,
+ // throw FileNotFoundException here, so not to proceed to the end of
+ // this method to add a CloseOp to the edit log for an already deleted
+ // file (See HDFS-6825).
+ //
+ BlockCollection blockCollection = storedBlock.getBlockCollection();
+ if (blockCollection == null) {
+ throw new IOException("The blockCollection of " + storedBlock
+ + " is null, likely because the file owning this block was"
+ + " deleted and the block removal is delayed");
+ }
+ INodeFile iFile = ((INode)blockCollection).asFile();
+ if (isFileDeleted(iFile)) {
+ throw new FileNotFoundException("File not found: "
+ + iFile.getFullPathName() + ", likely due to delayed block"
+ + " removal");
+ }
if (!iFile.isUnderConstruction() || storedBlock.isComplete()) {
if (LOG.isDebugEnabled()) {
LOG.debug("Unexpected block (=" + lastblock
@@ -4353,8 +4378,11 @@ public class FSNamesystem implements Nam
// Otherwise fsck will report these blocks as MISSING, especially if the
// blocksReceived from Datanodes take a long time to arrive.
for (int i = 0; i < trimmedTargets.size(); i++) {
- trimmedTargets.get(i).addBlock(
- trimmedStorages.get(i), storedBlock);
+ DatanodeStorageInfo storageInfo =
+ trimmedTargets.get(i).getStorageInfo(trimmedStorages.get(i));
+ if (storageInfo != null) {
+ storageInfo.addBlock(storedBlock);
+ }
}
}
@@ -4914,6 +4942,28 @@ public class FSNamesystem implements Nam
}
}
+ DatanodeStorageReport[] getDatanodeStorageReport(final DatanodeReportType type
+ ) throws AccessControlException, StandbyException {
+ checkSuperuserPrivilege();
+ checkOperation(OperationCategory.UNCHECKED);
+ readLock();
+ try {
+ checkOperation(OperationCategory.UNCHECKED);
+ final DatanodeManager dm = getBlockManager().getDatanodeManager();
+ final List<DatanodeDescriptor> datanodes = dm.getDatanodeListForReport(type);
+
+ DatanodeStorageReport[] reports = new DatanodeStorageReport[datanodes.size()];
+ for (int i = 0; i < reports.length; i++) {
+ final DatanodeDescriptor d = datanodes.get(i);
+ reports[i] = new DatanodeStorageReport(new DatanodeInfo(d),
+ d.getStorageReports());
+ }
+ return reports;
+ } finally {
+ readUnlock();
+ }
+ }
+
/**
* Save namespace image.
* This will save current namespace into fsimage file and empty edits file.
@@ -5811,7 +5861,7 @@ public class FSNamesystem implements Nam
}
public void processIncrementalBlockReport(final DatanodeID nodeID,
- final String poolId, final StorageReceivedDeletedBlocks srdb)
+ final StorageReceivedDeletedBlocks srdb)
throws IOException {
writeLock();
try {
@@ -6061,7 +6111,6 @@ public class FSNamesystem implements Nam
blockManager.shutdown();
}
}
-
@Override // FSNamesystemMBean
public int getNumLiveDataNodes() {
@@ -6109,6 +6158,15 @@ public class FSNamesystem implements Nam
}
/**
+ * Storages are marked as "content stale" after NN restart or fails over and
+ * before NN receives the first Heartbeat followed by the first Blockreport.
+ */
+ @Override // FSNamesystemMBean
+ public int getNumStaleStorages() {
+ return getBlockManager().getDatanodeManager().getNumStaleStorages();
+ }
+
+ /**
* Sets the current generation stamp for legacy blocks
*/
void setGenerationStampV1(long stamp) {
@@ -6262,9 +6320,28 @@ public class FSNamesystem implements Nam
private boolean isFileDeleted(INodeFile file) {
// Not in the inodeMap or in the snapshot but marked deleted.
- if (dir.getInode(file.getId()) == null ||
- file.getParent() == null || (file.isWithSnapshot() &&
- file.getFileWithSnapshotFeature().isCurrentFileDeleted())) {
+ if (dir.getInode(file.getId()) == null) {
+ return true;
+ }
+
+ // look at the path hierarchy to see if one parent is deleted by recursive
+ // deletion
+ INode tmpChild = file;
+ INodeDirectory tmpParent = file.getParent();
+ while (true) {
+ if (tmpParent == null ||
+ tmpParent.searchChildren(tmpChild.getLocalNameBytes()) < 0) {
+ return true;
+ }
+ if (tmpParent.isRoot()) {
+ break;
+ }
+ tmpChild = tmpParent;
+ tmpParent = tmpParent.getParent();
+ }
+
+ if (file.isWithSnapshot() &&
+ file.getFileWithSnapshotFeature().isCurrentFileDeleted()) {
return true;
}
return false;
@@ -8183,9 +8260,11 @@ public class FSNamesystem implements Nam
nnConf.checkAclsConfigFlag();
FSPermissionChecker pc = getPermissionChecker();
checkOperation(OperationCategory.READ);
+ byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
readLock();
try {
checkOperation(OperationCategory.READ);
+ src = FSDirectory.resolvePath(src, pathComponents, dir);
if (isPermissionEnabled) {
checkPermission(pc, src, false, null, null, null, null);
}
@@ -8288,8 +8367,10 @@ public class FSNamesystem implements Nam
}
}
checkOperation(OperationCategory.READ);
+ byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
readLock();
try {
+ src = FSDirectory.resolvePath(src, pathComponents, dir);
checkOperation(OperationCategory.READ);
if (isPermissionEnabled) {
checkPathAccess(pc, src, FsAction.READ);
@@ -8333,8 +8414,10 @@ public class FSNamesystem implements Nam
nnConf.checkXAttrsConfigFlag();
final FSPermissionChecker pc = getPermissionChecker();
checkOperation(OperationCategory.READ);
+ byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
readLock();
try {
+ src = FSDirectory.resolvePath(src, pathComponents, dir);
checkOperation(OperationCategory.READ);
if (isPermissionEnabled) {
/* To access xattr names, you need EXECUTE in the owning directory. */
@@ -8428,6 +8511,29 @@ public class FSNamesystem implements Nam
}
}
+ void checkAccess(String src, FsAction mode) throws AccessControlException,
+ FileNotFoundException, UnresolvedLinkException, IOException {
+ checkOperation(OperationCategory.READ);
+ byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
+ readLock();
+ try {
+ checkOperation(OperationCategory.READ);
+ src = FSDirectory.resolvePath(src, pathComponents, dir);
+ if (dir.getINode(src) == null) {
+ throw new FileNotFoundException("Path not found");
+ }
+ if (isPermissionEnabled) {
+ FSPermissionChecker pc = getPermissionChecker();
+ checkPathAccess(pc, src, mode);
+ }
+ } catch (AccessControlException e) {
+ logAuditEvent(false, "checkAccess", src);
+ throw e;
+ } finally {
+ readUnlock();
+ }
+ }
+
/**
* Default AuditLogger implementation; used when no access logger is
* defined in the config file. It can also be explicitly listed in the
Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java?rev=1619019&r1=1619018&r2=1619019&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java Wed Aug 20 01:34:29 2014
@@ -97,9 +97,9 @@ public abstract class INode implements I
/** Set user */
final INode setUser(String user, int latestSnapshotId)
throws QuotaExceededException {
- final INode nodeToUpdate = recordModification(latestSnapshotId);
- nodeToUpdate.setUser(user);
- return nodeToUpdate;
+ recordModification(latestSnapshotId);
+ setUser(user);
+ return this;
}
/**
* @param snapshotId
@@ -122,9 +122,9 @@ public abstract class INode implements I
/** Set group */
final INode setGroup(String group, int latestSnapshotId)
throws QuotaExceededException {
- final INode nodeToUpdate = recordModification(latestSnapshotId);
- nodeToUpdate.setGroup(group);
- return nodeToUpdate;
+ recordModification(latestSnapshotId);
+ setGroup(group);
+ return this;
}
/**
@@ -148,9 +148,9 @@ public abstract class INode implements I
/** Set the {@link FsPermission} of this {@link INode} */
INode setPermission(FsPermission permission, int latestSnapshotId)
throws QuotaExceededException {
- final INode nodeToUpdate = recordModification(latestSnapshotId);
- nodeToUpdate.setPermission(permission);
- return nodeToUpdate;
+ recordModification(latestSnapshotId);
+ setPermission(permission);
+ return this;
}
abstract AclFeature getAclFeature(int snapshotId);
@@ -164,18 +164,18 @@ public abstract class INode implements I
final INode addAclFeature(AclFeature aclFeature, int latestSnapshotId)
throws QuotaExceededException {
- final INode nodeToUpdate = recordModification(latestSnapshotId);
- nodeToUpdate.addAclFeature(aclFeature);
- return nodeToUpdate;
+ recordModification(latestSnapshotId);
+ addAclFeature(aclFeature);
+ return this;
}
abstract void removeAclFeature();
final INode removeAclFeature(int latestSnapshotId)
throws QuotaExceededException {
- final INode nodeToUpdate = recordModification(latestSnapshotId);
- nodeToUpdate.removeAclFeature();
- return nodeToUpdate;
+ recordModification(latestSnapshotId);
+ removeAclFeature();
+ return this;
}
/**
@@ -199,9 +199,9 @@ public abstract class INode implements I
final INode addXAttrFeature(XAttrFeature xAttrFeature, int latestSnapshotId)
throws QuotaExceededException {
- final INode nodeToUpdate = recordModification(latestSnapshotId);
- nodeToUpdate.addXAttrFeature(xAttrFeature);
- return nodeToUpdate;
+ recordModification(latestSnapshotId);
+ addXAttrFeature(xAttrFeature);
+ return this;
}
/**
@@ -211,9 +211,9 @@ public abstract class INode implements I
final INode removeXAttrFeature(int lastestSnapshotId)
throws QuotaExceededException {
- final INode nodeToUpdate = recordModification(lastestSnapshotId);
- nodeToUpdate.removeXAttrFeature();
- return nodeToUpdate;
+ recordModification(lastestSnapshotId);
+ removeXAttrFeature();
+ return this;
}
/**
@@ -298,11 +298,8 @@ public abstract class INode implements I
* @param latestSnapshotId The id of the latest snapshot that has been taken.
* Note that it is {@link Snapshot#CURRENT_STATE_ID}
* if no snapshots have been taken.
- * @return The current inode, which usually is the same object of this inode.
- * However, in some cases, this inode may be replaced with a new inode
- * for maintaining snapshots. The current inode is then the new inode.
*/
- abstract INode recordModification(final int latestSnapshotId)
+ abstract void recordModification(final int latestSnapshotId)
throws QuotaExceededException;
/** Check whether it's a reference. */
@@ -652,9 +649,9 @@ public abstract class INode implements I
/** Set the last modification time of inode. */
public final INode setModificationTime(long modificationTime,
int latestSnapshotId) throws QuotaExceededException {
- final INode nodeToUpdate = recordModification(latestSnapshotId);
- nodeToUpdate.setModificationTime(modificationTime);
- return nodeToUpdate;
+ recordModification(latestSnapshotId);
+ setModificationTime(modificationTime);
+ return this;
}
/**
@@ -682,9 +679,9 @@ public abstract class INode implements I
*/
public final INode setAccessTime(long accessTime, int latestSnapshotId)
throws QuotaExceededException {
- final INode nodeToUpdate = recordModification(latestSnapshotId);
- nodeToUpdate.setAccessTime(accessTime);
- return nodeToUpdate;
+ recordModification(latestSnapshotId);
+ setAccessTime(accessTime);
+ return this;
}
Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java?rev=1619019&r1=1619018&r2=1619019&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java Wed Aug 20 01:34:29 2014
@@ -157,7 +157,7 @@ public class INodeDirectory extends INod
return quota;
}
- private int searchChildren(byte[] name) {
+ int searchChildren(byte[] name) {
return children == null? -1: Collections.binarySearch(children, name);
}
@@ -318,7 +318,7 @@ public class INodeDirectory extends INod
}
@Override
- public INodeDirectory recordModification(int latestSnapshotId)
+ public void recordModification(int latestSnapshotId)
throws QuotaExceededException {
if (isInLatestSnapshot(latestSnapshotId)
&& !shouldRecordInSrcSnapshot(latestSnapshotId)) {
@@ -330,7 +330,6 @@ public class INodeDirectory extends INod
// record self in the diff list if necessary
sf.getDiffs().saveSelf2Snapshot(latestSnapshotId, this, null);
}
- return this;
}
/**
Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java?rev=1619019&r1=1619018&r2=1619019&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java Wed Aug 20 01:34:29 2014
@@ -284,7 +284,7 @@ public class INodeFile extends INodeWith
}
@Override
- public INodeFile recordModification(final int latestSnapshotId)
+ public void recordModification(final int latestSnapshotId)
throws QuotaExceededException {
if (isInLatestSnapshot(latestSnapshotId)
&& !shouldRecordInSrcSnapshot(latestSnapshotId)) {
@@ -296,7 +296,6 @@ public class INodeFile extends INodeWith
// record self in the diff list if necessary
sf.getDiffs().saveSelf2Snapshot(latestSnapshotId, this, null);
}
- return this;
}
public FileDiffList getDiffs() {
@@ -344,11 +343,10 @@ public class INodeFile extends INodeWith
/** Set the replication factor of this file. */
public final INodeFile setFileReplication(short replication,
- int latestSnapshotId, final INodeMap inodeMap)
- throws QuotaExceededException {
- final INodeFile nodeToUpdate = recordModification(latestSnapshotId);
- nodeToUpdate.setFileReplication(replication);
- return nodeToUpdate;
+ int latestSnapshotId) throws QuotaExceededException {
+ recordModification(latestSnapshotId);
+ setFileReplication(replication);
+ return this;
}
/** @return preferred block size (in bytes) of the file. */
Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeMap.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeMap.java?rev=1619019&r1=1619018&r2=1619019&view=diff
==============================================================================
--- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeMap.java (original)
+++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeMap.java Wed Aug 20 01:34:29 2014
@@ -93,9 +93,8 @@ public class INodeMap {
"", "", new FsPermission((short) 0)), 0, 0) {
@Override
- INode recordModification(int latestSnapshotId)
+ void recordModification(int latestSnapshotId)
throws QuotaExceededException {
- return null;
}
@Override