You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by cm...@apache.org on 2014/08/20 01:50:25 UTC
svn commit: r1619012 [14/35] - in
/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project: hadoop-hdfs-httpfs/
hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/client/
hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/server/ hadoop...
Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java Tue Aug 19 23:49:39 2014
@@ -18,13 +18,19 @@
package org.apache.hadoop.hdfs.server.datanode;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.Futures;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.*;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.HardLink;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.LayoutVersion;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NodeType;
@@ -35,13 +41,31 @@ import org.apache.hadoop.hdfs.server.com
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.nativeio.NativeIO;
import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.DiskChecker;
-import java.io.*;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.RandomAccessFile;
import java.nio.channels.FileLock;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
/**
* Data storage information file.
@@ -149,43 +173,99 @@ public class DataStorage extends Storage
}
/**
- * Analyze storage directories.
- * Recover from previous transitions if required.
- * Perform fs state transition if necessary depending on the namespace info.
- * Read storage info.
- * <br>
- * This method should be synchronized between multiple DN threads. Only the
- * first DN thread does DN level storage dir recoverTransitionRead.
- *
+ * {{@inheritDoc org.apache.hadoop.hdfs.server.common.Storage#writeAll()}}
+ */
+ private void writeAll(Collection<StorageDirectory> dirs) throws IOException {
+ this.layoutVersion = getServiceLayoutVersion();
+ for (StorageDirectory dir : dirs) {
+ writeProperties(dir);
+ }
+ }
+
+ /**
+ * Add a list of volumes to be managed by DataStorage. If the volume is empty,
+ * format it, otherwise recover it from previous transitions if required.
+ *
+ * @param datanode the reference to DataNode.
* @param nsInfo namespace information
* @param dataDirs array of data storage directories
* @param startOpt startup option
* @throws IOException
*/
- synchronized void recoverTransitionRead(DataNode datanode,
+ synchronized void addStorageLocations(DataNode datanode,
NamespaceInfo nsInfo, Collection<StorageLocation> dataDirs,
StartupOption startOpt)
throws IOException {
- if (initialized) {
- // DN storage has been initialized, no need to do anything
- return;
+ // Similar to recoverTransitionRead, it first ensures the datanode level
+ // format is completed.
+ List<StorageLocation> tmpDataDirs =
+ new ArrayList<StorageLocation>(dataDirs);
+ addStorageLocations(datanode, nsInfo, tmpDataDirs, startOpt, false, true);
+
+ Collection<File> bpDataDirs = new ArrayList<File>();
+ String bpid = nsInfo.getBlockPoolID();
+ for (StorageLocation dir : dataDirs) {
+ File dnRoot = dir.getFile();
+ File bpRoot = BlockPoolSliceStorage.getBpRoot(bpid, new File(dnRoot,
+ STORAGE_DIR_CURRENT));
+ bpDataDirs.add(bpRoot);
}
- LOG.info("Data-node version: " + HdfsConstants.DATANODE_LAYOUT_VERSION
- + " and name-node layout version: " + nsInfo.getLayoutVersion());
-
- // 1. For each data directory calculate its state and
- // check whether all is consistent before transitioning.
- // Format and recover.
- this.storageDirs = new ArrayList<StorageDirectory>(dataDirs.size());
- ArrayList<StorageState> dataDirStates = new ArrayList<StorageState>(dataDirs.size());
+ // mkdir for the list of BlockPoolStorage
+ makeBlockPoolDataDir(bpDataDirs, null);
+ BlockPoolSliceStorage bpStorage = this.bpStorageMap.get(bpid);
+ if (bpStorage == null) {
+ bpStorage = new BlockPoolSliceStorage(
+ nsInfo.getNamespaceID(), bpid, nsInfo.getCTime(),
+ nsInfo.getClusterID());
+ }
+
+ bpStorage.recoverTransitionRead(datanode, nsInfo, bpDataDirs, startOpt);
+ addBlockPoolStorage(bpid, bpStorage);
+ }
+
+ /**
+ * Add a list of volumes to be managed by this DataStorage. If the volume is
+ * empty, it formats the volume, otherwise it recovers it from previous
+ * transitions if required.
+ *
+ * If isInitialize is false, only the directories that have finished the
+ * doTransition() process will be added into DataStorage.
+ *
+ * @param datanode the reference to DataNode.
+ * @param nsInfo namespace information
+ * @param dataDirs array of data storage directories
+ * @param startOpt startup option
+ * @param isInitialize whether it is called when DataNode starts up.
+ * @throws IOException
+ */
+ private synchronized void addStorageLocations(DataNode datanode,
+ NamespaceInfo nsInfo, Collection<StorageLocation> dataDirs,
+ StartupOption startOpt, boolean isInitialize, boolean ignoreExistingDirs)
+ throws IOException {
+ Set<String> existingStorageDirs = new HashSet<String>();
+ for (int i = 0; i < getNumStorageDirs(); i++) {
+ existingStorageDirs.add(getStorageDir(i).getRoot().getAbsolutePath());
+ }
+
+ // 1. For each data directory calculate its state and check whether all is
+ // consistent before transitioning. Format and recover.
+ ArrayList<StorageState> dataDirStates =
+ new ArrayList<StorageState>(dataDirs.size());
+ List<StorageDirectory> addedStorageDirectories =
+ new ArrayList<StorageDirectory>();
for(Iterator<StorageLocation> it = dataDirs.iterator(); it.hasNext();) {
File dataDir = it.next().getFile();
+ if (existingStorageDirs.contains(dataDir.getAbsolutePath())) {
+ LOG.info("Storage directory " + dataDir + " has already been used.");
+ it.remove();
+ continue;
+ }
StorageDirectory sd = new StorageDirectory(dataDir);
StorageState curState;
try {
curState = sd.analyzeStorage(startOpt, this);
// sd is locked but not opened
- switch(curState) {
+ switch (curState) {
case NORMAL:
break;
case NON_EXISTENT:
@@ -194,7 +274,8 @@ public class DataStorage extends Storage
it.remove();
continue;
case NOT_FORMATTED: // format
- LOG.info("Storage directory " + dataDir + " is not formatted");
+ LOG.info("Storage directory " + dataDir + " is not formatted for "
+ + nsInfo.getBlockPoolID());
LOG.info("Formatting ...");
format(sd, nsInfo, datanode.getDatanodeUuid());
break;
@@ -208,28 +289,82 @@ public class DataStorage extends Storage
//continue with other good dirs
continue;
}
- // add to the storage list
- addStorageDir(sd);
+ if (isInitialize) {
+ addStorageDir(sd);
+ }
+ addedStorageDirectories.add(sd);
dataDirStates.add(curState);
}
- if (dataDirs.size() == 0 || dataDirStates.size() == 0) // none of the data dirs exist
+ if (dataDirs.size() == 0 || dataDirStates.size() == 0) {
+ // none of the data dirs exist
+ if (ignoreExistingDirs) {
+ return;
+ }
throw new IOException(
"All specified directories are not accessible or do not exist.");
+ }
// 2. Do transitions
// Each storage directory is treated individually.
- // During startup some of them can upgrade or rollback
- // while others could be uptodate for the regular startup.
- for(int idx = 0; idx < getNumStorageDirs(); idx++) {
- doTransition(datanode, getStorageDir(idx), nsInfo, startOpt);
- createStorageID(getStorageDir(idx));
+ // During startup some of them can upgrade or rollback
+ // while others could be up-to-date for the regular startup.
+ for (Iterator<StorageDirectory> it = addedStorageDirectories.iterator();
+ it.hasNext(); ) {
+ StorageDirectory sd = it.next();
+ try {
+ doTransition(datanode, sd, nsInfo, startOpt);
+ createStorageID(sd);
+ } catch (IOException e) {
+ if (!isInitialize) {
+ sd.unlock();
+ it.remove();
+ continue;
+ }
+ unlockAll();
+ throw e;
+ }
}
+
+ // 3. Update all successfully loaded storages. Some of them might have just
+ // been formatted.
+ this.writeAll(addedStorageDirectories);
+
+ // 4. Make newly loaded storage directories visible for service.
+ if (!isInitialize) {
+ this.storageDirs.addAll(addedStorageDirectories);
+ }
+ }
+
+ /**
+ * Analyze storage directories.
+ * Recover from previous transitions if required.
+ * Perform fs state transition if necessary depending on the namespace info.
+ * Read storage info.
+ * <br>
+ * This method should be synchronized between multiple DN threads. Only the
+ * first DN thread does DN level storage dir recoverTransitionRead.
+ *
+ * @param nsInfo namespace information
+ * @param dataDirs array of data storage directories
+ * @param startOpt startup option
+ * @throws IOException
+ */
+ synchronized void recoverTransitionRead(DataNode datanode,
+ NamespaceInfo nsInfo, Collection<StorageLocation> dataDirs,
+ StartupOption startOpt)
+ throws IOException {
+ if (initialized) {
+ // DN storage has been initialized, no need to do anything
+ return;
+ }
+ LOG.info("DataNode version: " + HdfsConstants.DATANODE_LAYOUT_VERSION
+ + " and NameNode layout version: " + nsInfo.getLayoutVersion());
+
+ this.storageDirs = new ArrayList<StorageDirectory>(dataDirs.size());
+ addStorageLocations(datanode, nsInfo, dataDirs, startOpt, true, false);
- // 3. Update all storages. Some of them might have just been formatted.
- this.writeAll();
-
- // 4. mark DN storage is initialized
+ // mark DN storage is initialized
this.initialized = true;
}
@@ -256,6 +391,7 @@ public class DataStorage extends Storage
STORAGE_DIR_CURRENT));
bpDataDirs.add(bpRoot);
}
+
// mkdir for the list of BlockPoolStorage
makeBlockPoolDataDir(bpDataDirs, null);
BlockPoolSliceStorage bpStorage = new BlockPoolSliceStorage(
@@ -483,7 +619,7 @@ public class DataStorage extends Storage
// do upgrade
if (this.layoutVersion > HdfsConstants.DATANODE_LAYOUT_VERSION) {
- doUpgrade(sd, nsInfo); // upgrade
+ doUpgrade(datanode, sd, nsInfo); // upgrade
return;
}
@@ -518,7 +654,8 @@ public class DataStorage extends Storage
* @param sd storage directory
* @throws IOException on error
*/
- void doUpgrade(StorageDirectory sd, NamespaceInfo nsInfo) throws IOException {
+ void doUpgrade(DataNode datanode, StorageDirectory sd, NamespaceInfo nsInfo)
+ throws IOException {
// If the existing on-disk layout version supportes federation, simply
// update its layout version.
if (DataNodeLayoutVersion.supports(
@@ -563,7 +700,8 @@ public class DataStorage extends Storage
BlockPoolSliceStorage bpStorage = new BlockPoolSliceStorage(nsInfo.getNamespaceID(),
nsInfo.getBlockPoolID(), nsInfo.getCTime(), nsInfo.getClusterID());
bpStorage.format(curDir, nsInfo);
- linkAllBlocks(tmpDir, bbwDir, new File(curBpDir, STORAGE_DIR_CURRENT));
+ linkAllBlocks(datanode, tmpDir, bbwDir, new File(curBpDir,
+ STORAGE_DIR_CURRENT));
// 4. Write version file under <SD>/current
layoutVersion = HdfsConstants.DATANODE_LAYOUT_VERSION;
@@ -741,22 +879,22 @@ public class DataStorage extends Storage
*
* @throws IOException If error occurs during hardlink
*/
- private void linkAllBlocks(File fromDir, File fromBbwDir, File toDir)
- throws IOException {
+ private void linkAllBlocks(DataNode datanode, File fromDir, File fromBbwDir,
+ File toDir) throws IOException {
HardLink hardLink = new HardLink();
// do the link
int diskLayoutVersion = this.getLayoutVersion();
if (DataNodeLayoutVersion.supports(
LayoutVersion.Feature.APPEND_RBW_DIR, diskLayoutVersion)) {
// hardlink finalized blocks in tmpDir/finalized
- linkBlocks(new File(fromDir, STORAGE_DIR_FINALIZED),
+ linkBlocks(datanode, new File(fromDir, STORAGE_DIR_FINALIZED),
new File(toDir, STORAGE_DIR_FINALIZED), diskLayoutVersion, hardLink);
// hardlink rbw blocks in tmpDir/rbw
- linkBlocks(new File(fromDir, STORAGE_DIR_RBW),
+ linkBlocks(datanode, new File(fromDir, STORAGE_DIR_RBW),
new File(toDir, STORAGE_DIR_RBW), diskLayoutVersion, hardLink);
} else { // pre-RBW version
// hardlink finalized blocks in tmpDir
- linkBlocks(fromDir, new File(toDir, STORAGE_DIR_FINALIZED),
+ linkBlocks(datanode, fromDir, new File(toDir, STORAGE_DIR_FINALIZED),
diskLayoutVersion, hardLink);
if (fromBbwDir.exists()) {
/*
@@ -765,15 +903,67 @@ public class DataStorage extends Storage
* NOT underneath the 'current' directory in those releases. See
* HDFS-3731 for details.
*/
- linkBlocks(fromBbwDir,
+ linkBlocks(datanode, fromBbwDir,
new File(toDir, STORAGE_DIR_RBW), diskLayoutVersion, hardLink);
}
}
LOG.info( hardLink.linkStats.report() );
}
+
+ private static class LinkArgs {
+ public File src;
+ public File dst;
+
+ public LinkArgs(File src, File dst) {
+ this.src = src;
+ this.dst = dst;
+ }
+ }
+
+ static void linkBlocks(DataNode datanode, File from, File to, int oldLV,
+ HardLink hl) throws IOException {
+ boolean upgradeToIdBasedLayout = false;
+ // If we are upgrading from a version older than the one where we introduced
+ // block ID-based layout AND we're working with the finalized directory,
+ // we'll need to upgrade from the old flat layout to the block ID-based one
+ if (oldLV > DataNodeLayoutVersion.Feature.BLOCKID_BASED_LAYOUT.getInfo().
+ getLayoutVersion() && to.getName().equals(STORAGE_DIR_FINALIZED)) {
+ upgradeToIdBasedLayout = true;
+ }
+
+ final List<LinkArgs> idBasedLayoutSingleLinks = Lists.newArrayList();
+ linkBlocksHelper(from, to, oldLV, hl, upgradeToIdBasedLayout, to,
+ idBasedLayoutSingleLinks);
+ int numLinkWorkers = datanode.getConf().getInt(
+ DFSConfigKeys.DFS_DATANODE_BLOCK_ID_LAYOUT_UPGRADE_THREADS_KEY,
+ DFSConfigKeys.DFS_DATANODE_BLOCK_ID_LAYOUT_UPGRADE_THREADS);
+ ExecutorService linkWorkers = Executors.newFixedThreadPool(numLinkWorkers);
+ final int step = idBasedLayoutSingleLinks.size() / numLinkWorkers + 1;
+ List<Future<Void>> futures = Lists.newArrayList();
+ for (int i = 0; i < idBasedLayoutSingleLinks.size(); i += step) {
+ final int iCopy = i;
+ futures.add(linkWorkers.submit(new Callable<Void>() {
+ @Override
+ public Void call() throws IOException {
+ int upperBound = Math.min(iCopy + step,
+ idBasedLayoutSingleLinks.size());
+ for (int j = iCopy; j < upperBound; j++) {
+ LinkArgs cur = idBasedLayoutSingleLinks.get(j);
+ NativeIO.link(cur.src, cur.dst);
+ }
+ return null;
+ }
+ }));
+ }
+ linkWorkers.shutdown();
+ for (Future<Void> f : futures) {
+ Futures.get(f, IOException.class);
+ }
+ }
- static void linkBlocks(File from, File to, int oldLV, HardLink hl)
- throws IOException {
+ static void linkBlocksHelper(File from, File to, int oldLV, HardLink hl,
+ boolean upgradeToIdBasedLayout, File blockRoot,
+ List<LinkArgs> idBasedLayoutSingleLinks) throws IOException {
if (!from.exists()) {
return;
}
@@ -800,9 +990,6 @@ public class DataStorage extends Storage
// from is a directory
hl.linkStats.countDirs++;
- if (!to.mkdirs())
- throw new IOException("Cannot create directory " + to);
-
String[] blockNames = from.list(new java.io.FilenameFilter() {
@Override
public boolean accept(File dir, String name) {
@@ -810,12 +997,36 @@ public class DataStorage extends Storage
}
});
+ // If we are upgrading to block ID-based layout, we don't want to recreate
+ // any subdirs from the source that contain blocks, since we have a new
+ // directory structure
+ if (!upgradeToIdBasedLayout || !to.getName().startsWith(
+ BLOCK_SUBDIR_PREFIX)) {
+ if (!to.mkdirs())
+ throw new IOException("Cannot create directory " + to);
+ }
+
// Block files just need hard links with the same file names
// but a different directory
if (blockNames.length > 0) {
- HardLink.createHardLinkMult(from, blockNames, to);
- hl.linkStats.countMultLinks++;
- hl.linkStats.countFilesMultLinks += blockNames.length;
+ if (upgradeToIdBasedLayout) {
+ for (String blockName : blockNames) {
+ long blockId = Block.getBlockId(blockName);
+ File blockLocation = DatanodeUtil.idToBlockDir(blockRoot, blockId);
+ if (!blockLocation.exists()) {
+ if (!blockLocation.mkdirs()) {
+ throw new IOException("Failed to mkdirs " + blockLocation);
+ }
+ }
+ idBasedLayoutSingleLinks.add(new LinkArgs(new File(from, blockName),
+ new File(blockLocation, blockName)));
+ hl.linkStats.countSingleLinks++;
+ }
+ } else {
+ HardLink.createHardLinkMult(from, blockNames, to);
+ hl.linkStats.countMultLinks++;
+ hl.linkStats.countFilesMultLinks += blockNames.length;
+ }
} else {
hl.linkStats.countEmptyDirs++;
}
@@ -829,8 +1040,9 @@ public class DataStorage extends Storage
}
});
for(int i = 0; i < otherNames.length; i++)
- linkBlocks(new File(from, otherNames[i]),
- new File(to, otherNames[i]), oldLV, hl);
+ linkBlocksHelper(new File(from, otherNames[i]),
+ new File(to, otherNames[i]), oldLV, hl, upgradeToIdBasedLayout,
+ blockRoot, idBasedLayoutSingleLinks);
}
/**
Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java Tue Aug 19 23:49:39 2014
@@ -36,28 +36,27 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.InterruptedIOException;
import java.io.OutputStream;
-import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketException;
-import java.net.UnknownHostException;
import java.nio.channels.ClosedChannelException;
+import java.security.MessageDigest;
import java.util.Arrays;
import org.apache.commons.logging.Log;
import org.apache.hadoop.hdfs.ExtendedBlockId;
+import org.apache.hadoop.hdfs.StorageType;
import org.apache.hadoop.hdfs.net.Peer;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
-import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferEncryptor;
-import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferEncryptor.InvalidMagicNumberException;
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
import org.apache.hadoop.hdfs.protocol.datatransfer.Op;
import org.apache.hadoop.hdfs.protocol.datatransfer.Receiver;
import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
+import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactory;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientReadStatusProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumResponseProto;
@@ -83,7 +82,7 @@ import org.apache.hadoop.security.token.
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.DataChecksum;
-import com.google.common.net.InetAddresses;
+import com.google.common.base.Preconditions;
import com.google.protobuf.ByteString;
@@ -104,7 +103,8 @@ class DataXceiver extends Receiver imple
private long opStartTime; //the start time of receiving an Op
private final InputStream socketIn;
private OutputStream socketOut;
-
+ private BlockReceiver blockReceiver = null;
+
/**
* Client Name used in previous operation. Not available on first request
* on the socket.
@@ -160,6 +160,12 @@ class DataXceiver extends Receiver imple
return socketOut;
}
+ public void sendOOB() throws IOException, InterruptedException {
+ LOG.info("Sending OOB to peer: " + peer);
+ if(blockReceiver!=null)
+ blockReceiver.sendOOB();
+ }
+
/**
* Read/write data from/to the DataXceiverServer.
*/
@@ -169,27 +175,14 @@ class DataXceiver extends Receiver imple
Op op = null;
try {
- dataXceiverServer.addPeer(peer, Thread.currentThread());
+ dataXceiverServer.addPeer(peer, Thread.currentThread(), this);
peer.setWriteTimeout(datanode.getDnConf().socketWriteTimeout);
InputStream input = socketIn;
- if ((!peer.hasSecureChannel()) && dnConf.encryptDataTransfer &&
- !dnConf.trustedChannelResolver.isTrusted(getClientAddress(peer))){
- IOStreamPair encryptedStreams = null;
- try {
- encryptedStreams = DataTransferEncryptor.getEncryptedStreams(socketOut,
- socketIn, datanode.blockPoolTokenSecretManager,
- dnConf.encryptionAlgorithm);
- } catch (InvalidMagicNumberException imne) {
- LOG.info("Failed to read expected encryption handshake from client " +
- "at " + peer.getRemoteAddressString() + ". Perhaps the client " +
- "is running an older version of Hadoop which does not support " +
- "encryption");
- return;
- }
- input = encryptedStreams.in;
- socketOut = encryptedStreams.out;
- }
- input = new BufferedInputStream(input, HdfsConstants.SMALL_BUFFER_SIZE);
+ IOStreamPair saslStreams = datanode.saslServer.receive(peer, socketOut,
+ socketIn, datanode.getDatanodeId());
+ input = new BufferedInputStream(saslStreams.in,
+ HdfsConstants.SMALL_BUFFER_SIZE);
+ socketOut = saslStreams.out;
super.initialize(new DataInputStream(input));
@@ -261,19 +254,6 @@ class DataXceiver extends Receiver imple
}
}
}
-
- /**
- * Returns InetAddress from peer
- * The getRemoteAddressString is the form /ip-address:port
- * The ip-address is extracted from peer and InetAddress is formed
- * @param peer
- * @return
- * @throws UnknownHostException
- */
- private static InetAddress getClientAddress(Peer peer) {
- return InetAddresses.forString(
- peer.getRemoteAddressString().split(":")[0].substring(1));
- }
@Override
public void requestShortCircuitFds(final ExtendedBlock blk,
@@ -552,9 +532,11 @@ class DataXceiver extends Receiver imple
@Override
public void writeBlock(final ExtendedBlock block,
+ final StorageType storageType,
final Token<BlockTokenIdentifier> blockToken,
final String clientname,
final DatanodeInfo[] targets,
+ final StorageType[] targetStorageTypes,
final DatanodeInfo srcDataNode,
final BlockConstructionStage stage,
final int pipelineSize,
@@ -609,7 +591,6 @@ class DataXceiver extends Receiver imple
DataOutputStream mirrorOut = null; // stream to next target
DataInputStream mirrorIn = null; // reply from next target
Socket mirrorSock = null; // socket to next target
- BlockReceiver blockReceiver = null; // responsible for data handling
String mirrorNode = null; // the name:port of next target
String firstBadLink = ""; // first datanode that failed in connection setup
Status mirrorInStatus = SUCCESS;
@@ -618,12 +599,13 @@ class DataXceiver extends Receiver imple
if (isDatanode ||
stage != BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) {
// open a block receiver
- blockReceiver = new BlockReceiver(block, in,
+ blockReceiver = new BlockReceiver(block, storageType, in,
peer.getRemoteAddressString(),
peer.getLocalAddressString(),
stage, latestGenerationStamp, minBytesRcvd, maxBytesRcvd,
clientname, srcDataNode, datanode, requestedChecksum,
cachingStrategy);
+
storageUuid = blockReceiver.getStorageUuid();
} else {
storageUuid = datanode.data.recoverClose(
@@ -654,25 +636,20 @@ class DataXceiver extends Receiver imple
OutputStream unbufMirrorOut = NetUtils.getOutputStream(mirrorSock,
writeTimeout);
InputStream unbufMirrorIn = NetUtils.getInputStream(mirrorSock);
- if (dnConf.encryptDataTransfer &&
- !dnConf.trustedChannelResolver.isTrusted(mirrorSock.getInetAddress())) {
- IOStreamPair encryptedStreams =
- DataTransferEncryptor.getEncryptedStreams(
- unbufMirrorOut, unbufMirrorIn,
- datanode.blockPoolTokenSecretManager
- .generateDataEncryptionKey(block.getBlockPoolId()));
-
- unbufMirrorOut = encryptedStreams.out;
- unbufMirrorIn = encryptedStreams.in;
- }
+ DataEncryptionKeyFactory keyFactory =
+ datanode.getDataEncryptionKeyFactoryForBlock(block);
+ IOStreamPair saslStreams = datanode.saslClient.socketSend(mirrorSock,
+ unbufMirrorOut, unbufMirrorIn, keyFactory, blockToken, targets[0]);
+ unbufMirrorOut = saslStreams.out;
+ unbufMirrorIn = saslStreams.in;
mirrorOut = new DataOutputStream(new BufferedOutputStream(unbufMirrorOut,
HdfsConstants.SMALL_BUFFER_SIZE));
mirrorIn = new DataInputStream(unbufMirrorIn);
- new Sender(mirrorOut).writeBlock(originalBlock, blockToken,
- clientname, targets, srcDataNode, stage, pipelineSize,
- minBytesRcvd, maxBytesRcvd, latestGenerationStamp, requestedChecksum,
- cachingStrategy);
+ new Sender(mirrorOut).writeBlock(originalBlock, targetStorageTypes[0],
+ blockToken, clientname, targets, targetStorageTypes, srcDataNode,
+ stage, pipelineSize, minBytesRcvd, maxBytesRcvd,
+ latestGenerationStamp, requestedChecksum, cachingStrategy);
mirrorOut.flush();
@@ -737,7 +714,7 @@ class DataXceiver extends Receiver imple
if (blockReceiver != null) {
String mirrorAddr = (mirrorSock == null) ? null : mirrorNode;
blockReceiver.receiveBlock(mirrorOut, mirrorIn, replyOut,
- mirrorAddr, null, targets);
+ mirrorAddr, null, targets, false);
// send close-ack for transfer-RBW/Finalized
if (isTransfer) {
@@ -776,6 +753,7 @@ class DataXceiver extends Receiver imple
IOUtils.closeStream(replyOut);
IOUtils.closeSocket(mirrorSock);
IOUtils.closeStream(blockReceiver);
+ blockReceiver = null;
}
//update metrics
@@ -787,7 +765,8 @@ class DataXceiver extends Receiver imple
public void transferBlock(final ExtendedBlock blk,
final Token<BlockTokenIdentifier> blockToken,
final String clientName,
- final DatanodeInfo[] targets) throws IOException {
+ final DatanodeInfo[] targets,
+ final StorageType[] targetStorageTypes) throws IOException {
checkAccess(socketOut, true, blk, blockToken,
Op.TRANSFER_BLOCK, BlockTokenSecretManager.AccessMode.COPY);
previousOpClientName = clientName;
@@ -796,13 +775,51 @@ class DataXceiver extends Receiver imple
final DataOutputStream out = new DataOutputStream(
getOutputStream());
try {
- datanode.transferReplicaForPipelineRecovery(blk, targets, clientName);
+ datanode.transferReplicaForPipelineRecovery(blk, targets,
+ targetStorageTypes, clientName);
writeResponse(Status.SUCCESS, null, out);
} finally {
IOUtils.closeStream(out);
}
}
-
+
+ private MD5Hash calcPartialBlockChecksum(ExtendedBlock block,
+ long requestLength, DataChecksum checksum, DataInputStream checksumIn)
+ throws IOException {
+ final int bytesPerCRC = checksum.getBytesPerChecksum();
+ final int csize = checksum.getChecksumSize();
+ final byte[] buffer = new byte[4*1024];
+ MessageDigest digester = MD5Hash.getDigester();
+
+ long remaining = requestLength / bytesPerCRC * csize;
+ for (int toDigest = 0; remaining > 0; remaining -= toDigest) {
+ toDigest = checksumIn.read(buffer, 0,
+ (int) Math.min(remaining, buffer.length));
+ if (toDigest < 0) {
+ break;
+ }
+ digester.update(buffer, 0, toDigest);
+ }
+
+ int partialLength = (int) (requestLength % bytesPerCRC);
+ if (partialLength > 0) {
+ byte[] buf = new byte[partialLength];
+ final InputStream blockIn = datanode.data.getBlockInputStream(block,
+ requestLength - partialLength);
+ try {
+ // Get the CRC of the partialLength.
+ IOUtils.readFully(blockIn, buf, 0, partialLength);
+ } finally {
+ IOUtils.closeStream(blockIn);
+ }
+ checksum.update(buf, 0, partialLength);
+ byte[] partialCrc = new byte[csize];
+ checksum.writeValue(partialCrc, 0, true);
+ digester.update(partialCrc);
+ }
+ return new MD5Hash(digester.digest());
+ }
+
@Override
public void blockChecksum(final ExtendedBlock block,
final Token<BlockTokenIdentifier> blockToken) throws IOException {
@@ -810,25 +827,32 @@ class DataXceiver extends Receiver imple
getOutputStream());
checkAccess(out, true, block, blockToken,
Op.BLOCK_CHECKSUM, BlockTokenSecretManager.AccessMode.READ);
- updateCurrentThreadName("Reading metadata for block " + block);
- final LengthInputStream metadataIn =
- datanode.data.getMetaDataInputStream(block);
- final DataInputStream checksumIn = new DataInputStream(new BufferedInputStream(
- metadataIn, HdfsConstants.IO_FILE_BUFFER_SIZE));
+ // client side now can specify a range of the block for checksum
+ long requestLength = block.getNumBytes();
+ Preconditions.checkArgument(requestLength >= 0);
+ long visibleLength = datanode.data.getReplicaVisibleLength(block);
+ boolean partialBlk = requestLength < visibleLength;
+ updateCurrentThreadName("Reading metadata for block " + block);
+ final LengthInputStream metadataIn = datanode.data
+ .getMetaDataInputStream(block);
+
+ final DataInputStream checksumIn = new DataInputStream(
+ new BufferedInputStream(metadataIn, HdfsConstants.IO_FILE_BUFFER_SIZE));
updateCurrentThreadName("Getting checksum for block " + block);
try {
//read metadata file
- final BlockMetadataHeader header = BlockMetadataHeader.readHeader(checksumIn);
- final DataChecksum checksum = header.getChecksum();
+ final BlockMetadataHeader header = BlockMetadataHeader
+ .readHeader(checksumIn);
+ final DataChecksum checksum = header.getChecksum();
+ final int csize = checksum.getChecksumSize();
final int bytesPerCRC = checksum.getBytesPerChecksum();
- final long crcPerBlock = checksum.getChecksumSize() > 0
- ? (metadataIn.getLength() - BlockMetadataHeader.getHeaderSize())/checksum.getChecksumSize()
- : 0;
-
- //compute block checksum
- final MD5Hash md5 = MD5Hash.digest(checksumIn);
+ final long crcPerBlock = csize <= 0 ? 0 :
+ (metadataIn.getLength() - BlockMetadataHeader.getHeaderSize()) / csize;
+ final MD5Hash md5 = partialBlk && crcPerBlock > 0 ?
+ calcPartialBlockChecksum(block, requestLength, checksum, checksumIn)
+ : MD5Hash.digest(checksumIn);
if (LOG.isDebugEnabled()) {
LOG.debug("block=" + block + ", bytesPerCRC=" + bytesPerCRC
+ ", crcPerBlock=" + crcPerBlock + ", md5=" + md5);
@@ -841,8 +865,7 @@ class DataXceiver extends Receiver imple
.setBytesPerCrc(bytesPerCRC)
.setCrcPerBlock(crcPerBlock)
.setMd5(ByteString.copyFrom(md5.getDigest()))
- .setCrcType(PBHelper.convert(checksum.getChecksumType()))
- )
+ .setCrcType(PBHelper.convert(checksum.getChecksumType())))
.build()
.writeDelimitedTo(out);
out.flush();
@@ -931,6 +954,7 @@ class DataXceiver extends Receiver imple
@Override
public void replaceBlock(final ExtendedBlock block,
+ final StorageType storageType,
final Token<BlockTokenIdentifier> blockToken,
final String delHint,
final DatanodeInfo proxySource) throws IOException {
@@ -966,7 +990,7 @@ class DataXceiver extends Receiver imple
String errMsg = null;
BlockReceiver blockReceiver = null;
DataInputStream proxyReply = null;
-
+ DataOutputStream replyOut = new DataOutputStream(getOutputStream());
try {
// get the output stream to the proxy
final String dnAddr = proxySource.getXferAddr(connectToDnViaHostname);
@@ -981,17 +1005,12 @@ class DataXceiver extends Receiver imple
OutputStream unbufProxyOut = NetUtils.getOutputStream(proxySock,
dnConf.socketWriteTimeout);
InputStream unbufProxyIn = NetUtils.getInputStream(proxySock);
- if (dnConf.encryptDataTransfer &&
- !dnConf.trustedChannelResolver.isTrusted(
- proxySock.getInetAddress())) {
- IOStreamPair encryptedStreams =
- DataTransferEncryptor.getEncryptedStreams(
- unbufProxyOut, unbufProxyIn,
- datanode.blockPoolTokenSecretManager
- .generateDataEncryptionKey(block.getBlockPoolId()));
- unbufProxyOut = encryptedStreams.out;
- unbufProxyIn = encryptedStreams.in;
- }
+ DataEncryptionKeyFactory keyFactory =
+ datanode.getDataEncryptionKeyFactoryForBlock(block);
+ IOStreamPair saslStreams = datanode.saslClient.socketSend(proxySock,
+ unbufProxyOut, unbufProxyIn, keyFactory, blockToken, proxySource);
+ unbufProxyOut = saslStreams.out;
+ unbufProxyIn = saslStreams.in;
proxyOut = new DataOutputStream(new BufferedOutputStream(unbufProxyOut,
HdfsConstants.SMALL_BUFFER_SIZE));
@@ -1021,15 +1040,15 @@ class DataXceiver extends Receiver imple
DataChecksum remoteChecksum = DataTransferProtoUtil.fromProto(
checksumInfo.getChecksum());
// open a block receiver and check if the block does not exist
- blockReceiver = new BlockReceiver(
- block, proxyReply, proxySock.getRemoteSocketAddress().toString(),
+ blockReceiver = new BlockReceiver(block, storageType,
+ proxyReply, proxySock.getRemoteSocketAddress().toString(),
proxySock.getLocalSocketAddress().toString(),
null, 0, 0, 0, "", null, datanode, remoteChecksum,
CachingStrategy.newDropBehind());
// receive a block
- blockReceiver.receiveBlock(null, null, null, null,
- dataXceiverServer.balanceThrottler, null);
+ blockReceiver.receiveBlock(null, null, replyOut, null,
+ dataXceiverServer.balanceThrottler, null, true);
// notify name node
datanode.notifyNamenodeReceivedBlock(
@@ -1064,6 +1083,7 @@ class DataXceiver extends Receiver imple
IOUtils.closeStream(proxyOut);
IOUtils.closeStream(blockReceiver);
IOUtils.closeStream(proxyReply);
+ IOUtils.closeStream(replyOut);
}
//update metrics
@@ -1077,7 +1097,7 @@ class DataXceiver extends Receiver imple
/**
* Utility function for sending a response.
*
- * @param opStatus status message to write
+ * @param status status message to write
* @param message message to send to the client or other DN
*/
private void sendResponse(Status status,
Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java Tue Aug 19 23:49:39 2014
@@ -27,11 +27,11 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.net.Peer;
import org.apache.hadoop.hdfs.net.PeerServer;
-import org.apache.hadoop.hdfs.server.balancer.Balancer;
import org.apache.hadoop.hdfs.util.DataTransferThrottler;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.Daemon;
+import com.google.common.annotations.VisibleForTesting;
/**
* Server used for receiving/sending a block of data.
@@ -45,6 +45,7 @@ class DataXceiverServer implements Runna
private final PeerServer peerServer;
private final DataNode datanode;
private final HashMap<Peer, Thread> peers = new HashMap<Peer, Thread>();
+ private final HashMap<Peer, DataXceiver> peersXceiver = new HashMap<Peer, DataXceiver>();
private boolean closed = false;
/**
@@ -63,14 +64,17 @@ class DataXceiverServer implements Runna
*/
static class BlockBalanceThrottler extends DataTransferThrottler {
private int numThreads;
+ private int maxThreads;
/**Constructor
*
* @param bandwidth Total amount of bandwidth can be used for balancing
*/
- private BlockBalanceThrottler(long bandwidth) {
+ private BlockBalanceThrottler(long bandwidth, int maxThreads) {
super(bandwidth);
+ this.maxThreads = maxThreads;
LOG.info("Balancing bandwith is "+ bandwidth + " bytes/s");
+ LOG.info("Number threads for balancing is "+ maxThreads);
}
/** Check if the block move can start.
@@ -79,7 +83,7 @@ class DataXceiverServer implements Runna
* the counter is incremented; False otherwise.
*/
synchronized boolean acquire() {
- if (numThreads >= Balancer.MAX_NUM_CONCURRENT_MOVES) {
+ if (numThreads >= maxThreads) {
return false;
}
numThreads++;
@@ -120,8 +124,10 @@ class DataXceiverServer implements Runna
//set up parameter for cluster balancing
this.balanceThrottler = new BlockBalanceThrottler(
- conf.getLong(DFSConfigKeys.DFS_DATANODE_BALANCE_BANDWIDTHPERSEC_KEY,
- DFSConfigKeys.DFS_DATANODE_BALANCE_BANDWIDTHPERSEC_DEFAULT));
+ conf.getLong(DFSConfigKeys.DFS_DATANODE_BALANCE_BANDWIDTHPERSEC_KEY,
+ DFSConfigKeys.DFS_DATANODE_BALANCE_BANDWIDTHPERSEC_DEFAULT),
+ conf.getInt(DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY,
+ DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT));
}
@Override
@@ -212,18 +218,38 @@ class DataXceiverServer implements Runna
}
}
- synchronized void addPeer(Peer peer, Thread t) throws IOException {
+ synchronized void addPeer(Peer peer, Thread t, DataXceiver xceiver)
+ throws IOException {
if (closed) {
throw new IOException("Server closed.");
}
peers.put(peer, t);
+ peersXceiver.put(peer, xceiver);
}
synchronized void closePeer(Peer peer) {
peers.remove(peer);
+ peersXceiver.remove(peer);
IOUtils.cleanup(null, peer);
}
+ // Sending OOB to all peers
+ public synchronized void sendOOBToPeers() {
+ if (!datanode.shutdownForUpgrade) {
+ return;
+ }
+
+ for (Peer p : peers.keySet()) {
+ try {
+ peersXceiver.get(p).sendOOB();
+ } catch (IOException e) {
+ LOG.warn("Got error when sending OOB message.", e);
+ } catch (InterruptedException e) {
+ LOG.warn("Interrupted when sending OOB message.");
+ }
+ }
+ }
+
// Notify all peers of the shutdown and restart.
// datanode.shouldRun should still be true and datanode.restarting should
// be set true before calling this method.
@@ -242,6 +268,7 @@ class DataXceiverServer implements Runna
IOUtils.cleanup(LOG, p);
}
peers.clear();
+ peersXceiver.clear();
}
// Return the number of peers.
@@ -249,7 +276,14 @@ class DataXceiverServer implements Runna
return peers.size();
}
+ // Return the number of peers and DataXceivers.
+ @VisibleForTesting
+ synchronized int getNumPeersXceiver() {
+ return peersXceiver.size();
+ }
+
synchronized void releasePeer(Peer peer) {
peers.remove(peer);
+ peersXceiver.remove(peer);
}
}
Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DatanodeUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DatanodeUtil.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DatanodeUtil.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DatanodeUtil.java Tue Aug 19 23:49:39 2014
@@ -30,6 +30,8 @@ public class DatanodeUtil {
public static final String DISK_ERROR = "Possible disk error: ";
+ private static final String SEP = System.getProperty("file.separator");
+
/** Get the cause of an I/O exception if caused by a possible disk error
* @param ioe an I/O exception
* @return cause if the I/O exception is caused by a possible disk error;
@@ -78,4 +80,38 @@ public class DatanodeUtil {
public static File getUnlinkTmpFile(File f) {
return new File(f.getParentFile(), f.getName()+UNLINK_BLOCK_SUFFIX);
}
+
+ /**
+ * Checks whether there are any files anywhere in the directory tree rooted
+ * at dir (directories don't count as files). dir must exist
+ * @return true if there are no files
+ * @throws IOException if unable to list subdirectories
+ */
+ public static boolean dirNoFilesRecursive(File dir) throws IOException {
+ File[] contents = dir.listFiles();
+ if (contents == null) {
+ throw new IOException("Cannot list contents of " + dir);
+ }
+ for (File f : contents) {
+ if (!f.isDirectory() || (f.isDirectory() && !dirNoFilesRecursive(f))) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ /**
+ * Get the directory where a finalized block with this ID should be stored.
+ * Do not attempt to create the directory.
+ * @param root the root directory where finalized blocks are stored
+ * @param blockId
+ * @return
+ */
+ public static File idToBlockDir(File root, long blockId) {
+ int d1 = (int)((blockId >> 16) & 0xff);
+ int d2 = (int)((blockId >> 8) & 0xff);
+ String path = DataStorage.BLOCK_SUBDIR_PREFIX + d1 + SEP +
+ DataStorage.BLOCK_SUBDIR_PREFIX + d2;
+ return new File(root, path);
+ }
}
Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java Tue Aug 19 23:49:39 2014
@@ -108,8 +108,7 @@ public class DirectoryScanner implements
ScanInfoPerBlockPool(int sz) {super(sz);}
/**
- * Merges "that" ScanInfoPerBlockPool into this one
- * @param that
+ * Merges {@code that} ScanInfoPerBlockPool into this one
*/
public void addAll(ScanInfoPerBlockPool that) {
if (that == null) return;
Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FinalizedReplica.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FinalizedReplica.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FinalizedReplica.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FinalizedReplica.java Tue Aug 19 23:49:39 2014
@@ -54,7 +54,7 @@ public class FinalizedReplica extends Re
/**
* Copy constructor.
- * @param from
+ * @param from where to copy construct from
*/
public FinalizedReplica(FinalizedReplica from) {
super(from);
Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaBeingWritten.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaBeingWritten.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaBeingWritten.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaBeingWritten.java Tue Aug 19 23:49:39 2014
@@ -68,7 +68,7 @@ public class ReplicaBeingWritten extends
/**
* Copy constructor.
- * @param from
+ * @param from where to copy from
*/
public ReplicaBeingWritten(ReplicaBeingWritten from) {
super(from);
Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java Tue Aug 19 23:49:39 2014
@@ -89,7 +89,7 @@ public class ReplicaInPipeline extends R
/**
* Copy constructor.
- * @param from
+ * @param from where to copy from
*/
public ReplicaInPipeline(ReplicaInPipeline from) {
super(from);
Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipelineInterface.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipelineInterface.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipelineInterface.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipelineInterface.java Tue Aug 19 23:49:39 2014
@@ -40,7 +40,7 @@ public interface ReplicaInPipelineInterf
/**
* Set the number bytes that have acked
- * @param bytesAcked
+ * @param bytesAcked number bytes acked
*/
void setBytesAcked(long bytesAcked);
Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java Tue Aug 19 23:49:39 2014
@@ -54,10 +54,10 @@ abstract public class ReplicaInfo extend
private File baseDir;
/**
- * Ints representing the sub directory path from base dir to the directory
- * containing this replica.
+ * Whether or not this replica's parent directory includes subdirs, in which
+ * case we can generate them based on the replica's block ID
*/
- private int[] subDirs;
+ private boolean hasSubdirs;
private static final Map<String, File> internedBaseDirs = new HashMap<String, File>();
@@ -100,7 +100,7 @@ abstract public class ReplicaInfo extend
/**
* Copy constructor.
- * @param from
+ * @param from where to copy from
*/
ReplicaInfo(ReplicaInfo from) {
this(from, from.getVolume(), from.getDir());
@@ -151,18 +151,8 @@ abstract public class ReplicaInfo extend
* @return the parent directory path where this replica is located
*/
File getDir() {
- if (subDirs == null) {
- return null;
- }
-
- StringBuilder sb = new StringBuilder();
- for (int i : subDirs) {
- sb.append(DataStorage.BLOCK_SUBDIR_PREFIX);
- sb.append(i);
- sb.append("/");
- }
- File ret = new File(baseDir, sb.toString());
- return ret;
+ return hasSubdirs ? DatanodeUtil.idToBlockDir(baseDir,
+ getBlockId()) : baseDir;
}
/**
@@ -175,54 +165,46 @@ abstract public class ReplicaInfo extend
private void setDirInternal(File dir) {
if (dir == null) {
- subDirs = null;
baseDir = null;
return;
}
- ReplicaDirInfo replicaDirInfo = parseSubDirs(dir);
- this.subDirs = replicaDirInfo.subDirs;
+ ReplicaDirInfo dirInfo = parseBaseDir(dir);
+ this.hasSubdirs = dirInfo.hasSubidrs;
synchronized (internedBaseDirs) {
- if (!internedBaseDirs.containsKey(replicaDirInfo.baseDirPath)) {
+ if (!internedBaseDirs.containsKey(dirInfo.baseDirPath)) {
// Create a new String path of this file and make a brand new File object
// to guarantee we drop the reference to the underlying char[] storage.
- File baseDir = new File(replicaDirInfo.baseDirPath);
- internedBaseDirs.put(replicaDirInfo.baseDirPath, baseDir);
+ File baseDir = new File(dirInfo.baseDirPath);
+ internedBaseDirs.put(dirInfo.baseDirPath, baseDir);
}
- this.baseDir = internedBaseDirs.get(replicaDirInfo.baseDirPath);
+ this.baseDir = internedBaseDirs.get(dirInfo.baseDirPath);
}
}
-
+
@VisibleForTesting
public static class ReplicaDirInfo {
- @VisibleForTesting
public String baseDirPath;
-
- @VisibleForTesting
- public int[] subDirs;
+ public boolean hasSubidrs;
+
+ public ReplicaDirInfo (String baseDirPath, boolean hasSubidrs) {
+ this.baseDirPath = baseDirPath;
+ this.hasSubidrs = hasSubidrs;
+ }
}
@VisibleForTesting
- public static ReplicaDirInfo parseSubDirs(File dir) {
- ReplicaDirInfo ret = new ReplicaDirInfo();
+ public static ReplicaDirInfo parseBaseDir(File dir) {
File currentDir = dir;
- List<Integer> subDirList = new ArrayList<Integer>();
+ boolean hasSubdirs = false;
while (currentDir.getName().startsWith(DataStorage.BLOCK_SUBDIR_PREFIX)) {
- // Prepend the integer into the list.
- subDirList.add(0, Integer.parseInt(currentDir.getName().replaceFirst(
- DataStorage.BLOCK_SUBDIR_PREFIX, "")));
+ hasSubdirs = true;
currentDir = currentDir.getParentFile();
}
- ret.subDirs = new int[subDirList.size()];
- for (int i = 0; i < subDirList.size(); i++) {
- ret.subDirs[i] = subDirList.get(i);
- }
-
- ret.baseDirPath = currentDir.getAbsolutePath();
- return ret;
+ return new ReplicaDirInfo(currentDir.getAbsolutePath(), hasSubdirs);
}
/**
Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaUnderRecovery.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaUnderRecovery.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaUnderRecovery.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaUnderRecovery.java Tue Aug 19 23:49:39 2014
@@ -50,7 +50,7 @@ public class ReplicaUnderRecovery extend
/**
* Copy constructor.
- * @param from
+ * @param from where to copy from
*/
public ReplicaUnderRecovery(ReplicaUnderRecovery from) {
super(from);
Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaWaitingToBeRecovered.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaWaitingToBeRecovered.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaWaitingToBeRecovered.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaWaitingToBeRecovered.java Tue Aug 19 23:49:39 2014
@@ -60,7 +60,7 @@ public class ReplicaWaitingToBeRecovered
/**
* Copy constructor.
- * @param from
+ * @param from where to copy from
*/
public ReplicaWaitingToBeRecovered(ReplicaWaitingToBeRecovered from) {
super(from);
Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ShortCircuitRegistry.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ShortCircuitRegistry.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ShortCircuitRegistry.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ShortCircuitRegistry.java Tue Aug 19 23:49:39 2014
@@ -74,7 +74,7 @@ import com.google.common.collect.HashMul
* DN also marks the block's slots as "unanchorable" to prevent additional
* clients from initiating these operations in the future.
*
- * The counterpart fo this class on the client is {@link DfsClientShmManager}.
+ * The counterpart of this class on the client is {@link DfsClientShmManager}.
*/
public class ShortCircuitRegistry {
public static final Log LOG = LogFactory.getLog(ShortCircuitRegistry.class);
@@ -217,7 +217,32 @@ public class ShortCircuitRegistry {
}
return allowMunlock;
}
-
+
+ /**
+ * Invalidate any slot associated with a blockId that we are invalidating
+ * (deleting) from this DataNode. When a slot is invalid, the DFSClient will
+ * not use the corresponding replica for new read or mmap operations (although
+ * existing, ongoing read or mmap operations will complete.)
+ *
+ * @param blockId The block ID.
+ */
+ public synchronized void processBlockInvalidation(ExtendedBlockId blockId) {
+ if (!enabled) return;
+ final Set<Slot> affectedSlots = slots.get(blockId);
+ if (!affectedSlots.isEmpty()) {
+ final StringBuilder bld = new StringBuilder();
+ String prefix = "";
+ bld.append("Block ").append(blockId).append(" has been invalidated. ").
+ append("Marking short-circuit slots as invalid: ");
+ for (Slot slot : affectedSlots) {
+ slot.makeInvalid();
+ bld.append(prefix).append(slot.toString());
+ prefix = ", ";
+ }
+ LOG.info(bld.toString());
+ }
+ }
+
public static class NewShmInfo implements Closeable {
public final ShmId shmId;
public final FileInputStream stream;
Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StorageLocation.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StorageLocation.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StorageLocation.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StorageLocation.java Tue Aug 19 23:49:39 2014
@@ -78,7 +78,8 @@ public class StorageLocation {
* @return A StorageLocation object if successfully parsed, null otherwise.
* Does not throw any exceptions.
*/
- static StorageLocation parse(String rawLocation) throws IOException {
+ public static StorageLocation parse(String rawLocation)
+ throws IOException, SecurityException {
Matcher matcher = regex.matcher(rawLocation);
StorageType storageType = StorageType.DEFAULT;
String location = rawLocation;
Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/AvailableSpaceVolumeChoosingPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/AvailableSpaceVolumeChoosingPolicy.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/AvailableSpaceVolumeChoosingPolicy.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/AvailableSpaceVolumeChoosingPolicy.java Tue Aug 19 23:49:39 2014
@@ -45,11 +45,19 @@ public class AvailableSpaceVolumeChoosin
private static final Log LOG = LogFactory.getLog(AvailableSpaceVolumeChoosingPolicy.class);
- private static final Random RAND = new Random();
+ private final Random random;
private long balancedSpaceThreshold = DFS_DATANODE_AVAILABLE_SPACE_VOLUME_CHOOSING_POLICY_BALANCED_SPACE_THRESHOLD_DEFAULT;
private float balancedPreferencePercent = DFS_DATANODE_AVAILABLE_SPACE_VOLUME_CHOOSING_POLICY_BALANCED_SPACE_PREFERENCE_FRACTION_DEFAULT;
+ AvailableSpaceVolumeChoosingPolicy(Random random) {
+ this.random = random;
+ }
+
+ public AvailableSpaceVolumeChoosingPolicy() {
+ this(new Random());
+ }
+
@Override
public synchronized void setConf(Configuration conf) {
balancedSpaceThreshold = conf.getLong(
@@ -128,7 +136,7 @@ public class AvailableSpaceVolumeChoosin
(highAvailableVolumes.size() * balancedPreferencePercent) /
preferencePercentScaler;
if (mostAvailableAmongLowVolumes < replicaSize ||
- RAND.nextFloat() < scaledPreferencePercent) {
+ random.nextFloat() < scaledPreferencePercent) {
volume = roundRobinPolicyHighAvailable.chooseVolume(
highAvailableVolumes,
replicaSize);
@@ -165,13 +173,8 @@ public class AvailableSpaceVolumeChoosin
}
/**
- * Check if the available space on all the volumes is roughly equal.
- *
- * @param volumes the volumes to check
- * @return true if all volumes' free space is within the configured threshold,
- * false otherwise.
- * @throws IOException
- * in the event of error checking amount of available space
+ * @return true if all volumes' free space is within the
+ * configured threshold, false otherwise.
*/
public boolean areAllVolumesWithinFreeSpaceThreshold() {
long leastAvailable = Long.MAX_VALUE;
Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java Tue Aug 19 23:49:39 2014
@@ -19,14 +19,17 @@ package org.apache.hadoop.hdfs.server.da
import java.io.File;
+import java.io.FileDescriptor;
import java.io.IOException;
import java.io.InputStream;
+import java.util.Collection;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.StorageType;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
@@ -37,6 +40,7 @@ import org.apache.hadoop.hdfs.server.dat
import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica;
import org.apache.hadoop.hdfs.server.datanode.Replica;
import org.apache.hadoop.hdfs.server.datanode.ReplicaInPipelineInterface;
+import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetFactory;
import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
@@ -89,6 +93,10 @@ public interface FsDatasetSpi<V extends
/** @return a list of volumes. */
public List<V> getVolumes();
+ /** Add an array of StorageLocation to FsDataset. */
+ public void addVolumes(Collection<StorageLocation> volumes)
+ throws IOException;
+
/** @return a storage with the given storage ID */
public DatanodeStorage getStorage(final String storageUuid);
@@ -124,16 +132,14 @@ public interface FsDatasetSpi<V extends
/**
* Returns the specified block's on-disk length (excluding metadata)
- * @param b
* @return the specified block's on-disk length (excluding metadta)
- * @throws IOException
+ * @throws IOException on error
*/
public long getLength(ExtendedBlock b) throws IOException;
/**
* Get reference to the replica meta info in the replicasMap.
* To be called from methods that are synchronized on {@link FSDataset}
- * @param blockId
* @return replica from the replicas map
*/
@Deprecated
@@ -151,8 +157,8 @@ public interface FsDatasetSpi<V extends
/**
* Returns an input stream at specified offset of the specified block
- * @param b
- * @param seekOffset
+ * @param b block
+ * @param seekOffset offset with in the block to seek to
* @return an input stream to read the contents of the specified block,
* starting at the offset
* @throws IOException
@@ -163,9 +169,6 @@ public interface FsDatasetSpi<V extends
/**
* Returns an input stream at specified offset of the specified block
* The block is still in the tmp directory and is not finalized
- * @param b
- * @param blkoff
- * @param ckoff
* @return an input stream to read the contents of the specified block,
* starting at the offset
* @throws IOException
@@ -180,8 +183,8 @@ public interface FsDatasetSpi<V extends
* @return the meta info of the replica which is being written to
* @throws IOException if an error occurs
*/
- public ReplicaInPipelineInterface createTemporary(ExtendedBlock b
- ) throws IOException;
+ public ReplicaInPipelineInterface createTemporary(StorageType storageType,
+ ExtendedBlock b) throws IOException;
/**
* Creates a RBW replica and returns the meta info of the replica
@@ -190,8 +193,8 @@ public interface FsDatasetSpi<V extends
* @return the meta info of the replica which is being written to
* @throws IOException if an error occurs
*/
- public ReplicaInPipelineInterface createRbw(ExtendedBlock b
- ) throws IOException;
+ public ReplicaInPipelineInterface createRbw(StorageType storageType,
+ ExtendedBlock b) throws IOException;
/**
* Recovers a RBW replica and returns the meta info of the replica
@@ -256,7 +259,6 @@ public interface FsDatasetSpi<V extends
* Finalizes the block previously opened for writing using writeToBlock.
* The block size is what is in the parameter b and it must match the amount
* of data written
- * @param b
* @throws IOException
*/
public void finalizeBlock(ExtendedBlock b) throws IOException;
@@ -264,7 +266,6 @@ public interface FsDatasetSpi<V extends
/**
* Unfinalizes the block previously opened for writing using writeToBlock.
* The temporary file associated with this block is deleted.
- * @param b
* @throws IOException
*/
public void unfinalizeBlock(ExtendedBlock b) throws IOException;
@@ -289,14 +290,12 @@ public interface FsDatasetSpi<V extends
/**
* Is the block valid?
- * @param b
* @return - true if the specified block is valid
*/
public boolean isValidBlock(ExtendedBlock b);
/**
* Is the block a valid RBW?
- * @param b
* @return - true if the specified block is a valid RBW
*/
public boolean isValidRbw(ExtendedBlock b);
@@ -327,7 +326,7 @@ public interface FsDatasetSpi<V extends
* Determine if the specified block is cached.
* @param bpid Block pool id
* @param blockIds - block id
- * @returns true if the block is cached
+ * @return true if the block is cached
*/
public boolean isCached(String bpid, long blockId);
@@ -440,5 +439,12 @@ public interface FsDatasetSpi<V extends
* @return true when trash is enabled
*/
public boolean trashEnabled(String bpid);
+
+ /**
+ * submit a sync_file_range request to AsyncDiskService
+ */
+ public void submitBackgroundSyncFileRangeRequest(final ExtendedBlock block,
+ final FileDescriptor fd, final long offset, final long nbytes,
+ final int flags);
}
Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java Tue Aug 19 23:49:39 2014
@@ -59,7 +59,8 @@ class BlockPoolSlice {
private final String bpid;
private final FsVolumeImpl volume; // volume to which this BlockPool belongs to
private final File currentDir; // StorageDirectory/current/bpid/current
- private final LDir finalizedDir; // directory store Finalized replica
+ // directory where finalized replicas are stored
+ private final File finalizedDir;
private final File rbwDir; // directory store RBW replica
private final File tmpDir; // directory store Temporary replica
private static final String DU_CACHE_FILE = "dfsUsed";
@@ -74,7 +75,7 @@ class BlockPoolSlice {
* @param bpid Block pool Id
* @param volume {@link FsVolumeImpl} to which this BlockPool belongs to
* @param bpDir directory corresponding to the BlockPool
- * @param conf
+ * @param conf configuration
* @throws IOException
*/
BlockPoolSlice(String bpid, FsVolumeImpl volume, File bpDir,
@@ -82,8 +83,13 @@ class BlockPoolSlice {
this.bpid = bpid;
this.volume = volume;
this.currentDir = new File(bpDir, DataStorage.STORAGE_DIR_CURRENT);
- final File finalizedDir = new File(
+ this.finalizedDir = new File(
currentDir, DataStorage.STORAGE_DIR_FINALIZED);
+ if (!this.finalizedDir.exists()) {
+ if (!this.finalizedDir.mkdirs()) {
+ throw new IOException("Failed to mkdirs " + this.finalizedDir);
+ }
+ }
// Files that were being written when the datanode was last shutdown
// are now moved back to the data directory. It is possible that
@@ -95,16 +101,6 @@ class BlockPoolSlice {
FileUtil.fullyDelete(tmpDir);
}
this.rbwDir = new File(currentDir, DataStorage.STORAGE_DIR_RBW);
- final boolean supportAppends = conf.getBoolean(
- DFSConfigKeys.DFS_SUPPORT_APPEND_KEY,
- DFSConfigKeys.DFS_SUPPORT_APPEND_DEFAULT);
- if (rbwDir.exists() && !supportAppends) {
- FileUtil.fullyDelete(rbwDir);
- }
- final int maxBlocksPerDir = conf.getInt(
- DFSConfigKeys.DFS_DATANODE_NUMBLOCKS_KEY,
- DFSConfigKeys.DFS_DATANODE_NUMBLOCKS_DEFAULT);
- this.finalizedDir = new LDir(finalizedDir, maxBlocksPerDir);
if (!rbwDir.mkdirs()) { // create rbw directory if not exist
if (!rbwDir.isDirectory()) {
throw new IOException("Mkdirs failed to create " + rbwDir.toString());
@@ -137,7 +133,7 @@ class BlockPoolSlice {
}
File getFinalizedDir() {
- return finalizedDir.dir;
+ return finalizedDir;
}
File getRbwDir() {
@@ -245,26 +241,57 @@ class BlockPoolSlice {
}
File addBlock(Block b, File f) throws IOException {
- File blockFile = finalizedDir.addBlock(b, f);
+ File blockDir = DatanodeUtil.idToBlockDir(finalizedDir, b.getBlockId());
+ if (!blockDir.exists()) {
+ if (!blockDir.mkdirs()) {
+ throw new IOException("Failed to mkdirs " + blockDir);
+ }
+ }
+ File blockFile = FsDatasetImpl.moveBlockFiles(b, f, blockDir);
File metaFile = FsDatasetUtil.getMetaFile(blockFile, b.getGenerationStamp());
dfsUsage.incDfsUsed(b.getNumBytes()+metaFile.length());
return blockFile;
}
void checkDirs() throws DiskErrorException {
- finalizedDir.checkDirTree();
+ DiskChecker.checkDirs(finalizedDir);
DiskChecker.checkDir(tmpDir);
DiskChecker.checkDir(rbwDir);
}
void getVolumeMap(ReplicaMap volumeMap) throws IOException {
// add finalized replicas
- finalizedDir.getVolumeMap(bpid, volumeMap, volume);
+ addToReplicasMap(volumeMap, finalizedDir, true);
// add rbw replicas
addToReplicasMap(volumeMap, rbwDir, false);
}
/**
+ * Recover an unlinked tmp file on datanode restart. If the original block
+ * does not exist, then the tmp file is renamed to be the
+ * original file name and the original name is returned; otherwise the tmp
+ * file is deleted and null is returned.
+ */
+ File recoverTempUnlinkedBlock(File unlinkedTmp) throws IOException {
+ File blockFile = FsDatasetUtil.getOrigFile(unlinkedTmp);
+ if (blockFile.exists()) {
+ // If the original block file still exists, then no recovery is needed.
+ if (!unlinkedTmp.delete()) {
+ throw new IOException("Unable to cleanup unlinked tmp file " +
+ unlinkedTmp);
+ }
+ return null;
+ } else {
+ if (!unlinkedTmp.renameTo(blockFile)) {
+ throw new IOException("Unable to rename unlinked tmp file " +
+ unlinkedTmp);
+ }
+ return blockFile;
+ }
+ }
+
+
+ /**
* Add replicas under the given directory to the volume map
* @param volumeMap the replicas map
* @param dir an input directory
@@ -273,23 +300,34 @@ class BlockPoolSlice {
*/
void addToReplicasMap(ReplicaMap volumeMap, File dir, boolean isFinalized
) throws IOException {
- File blockFiles[] = FileUtil.listFiles(dir);
- for (File blockFile : blockFiles) {
- if (!Block.isBlockFilename(blockFile))
+ File files[] = FileUtil.listFiles(dir);
+ for (File file : files) {
+ if (file.isDirectory()) {
+ addToReplicasMap(volumeMap, file, isFinalized);
+ }
+
+ if (isFinalized && FsDatasetUtil.isUnlinkTmpFile(file)) {
+ file = recoverTempUnlinkedBlock(file);
+ if (file == null) { // the original block still exists, so we cover it
+ // in another iteration and can continue here
+ continue;
+ }
+ }
+ if (!Block.isBlockFilename(file))
continue;
long genStamp = FsDatasetUtil.getGenerationStampFromFile(
- blockFiles, blockFile);
- long blockId = Block.filename2id(blockFile.getName());
+ files, file);
+ long blockId = Block.filename2id(file.getName());
ReplicaInfo newReplica = null;
if (isFinalized) {
newReplica = new FinalizedReplica(blockId,
- blockFile.length(), genStamp, volume, blockFile.getParentFile());
+ file.length(), genStamp, volume, file.getParentFile());
} else {
boolean loadRwr = true;
- File restartMeta = new File(blockFile.getParent() +
- File.pathSeparator + "." + blockFile.getName() + ".restart");
+ File restartMeta = new File(file.getParent() +
+ File.pathSeparator + "." + file.getName() + ".restart");
Scanner sc = null;
try {
sc = new Scanner(restartMeta);
@@ -297,8 +335,8 @@ class BlockPoolSlice {
if (sc.hasNextLong() && (sc.nextLong() > Time.now())) {
// It didn't expire. Load the replica as a RBW.
newReplica = new ReplicaBeingWritten(blockId,
- validateIntegrityAndSetLength(blockFile, genStamp),
- genStamp, volume, blockFile.getParentFile(), null);
+ validateIntegrityAndSetLength(file, genStamp),
+ genStamp, volume, file.getParentFile(), null);
loadRwr = false;
}
sc.close();
@@ -307,7 +345,7 @@ class BlockPoolSlice {
restartMeta.getPath());
}
} catch (FileNotFoundException fnfe) {
- // nothing to do here
+ // nothing to do hereFile dir =
} finally {
if (sc != null) {
sc.close();
@@ -316,15 +354,15 @@ class BlockPoolSlice {
// Restart meta doesn't exist or expired.
if (loadRwr) {
newReplica = new ReplicaWaitingToBeRecovered(blockId,
- validateIntegrityAndSetLength(blockFile, genStamp),
- genStamp, volume, blockFile.getParentFile());
+ validateIntegrityAndSetLength(file, genStamp),
+ genStamp, volume, file.getParentFile());
}
}
ReplicaInfo oldReplica = volumeMap.add(bpid, newReplica);
if (oldReplica != null) {
FsDatasetImpl.LOG.warn("Two block files with the same block id exist " +
- "on disk: " + oldReplica.getBlockFile() + " and " + blockFile );
+ "on disk: " + oldReplica.getBlockFile() + " and " + file );
}
}
}
@@ -411,10 +449,6 @@ class BlockPoolSlice {
}
}
- void clearPath(File f) {
- finalizedDir.clearPath(f);
- }
-
@Override
public String toString() {
return currentDir.getAbsolutePath();
Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java Tue Aug 19 23:49:39 2014
@@ -19,6 +19,7 @@
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
import java.io.File;
+import java.io.FileDescriptor;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
@@ -31,6 +32,8 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
+import org.apache.hadoop.io.nativeio.NativeIO;
+import org.apache.hadoop.io.nativeio.NativeIOException;
/**
* This class is a container of multiple thread pools, each for a volume,
@@ -42,6 +45,7 @@ import org.apache.hadoop.hdfs.server.pro
* can be slow, and we don't want to use a single thread pool because that
* is inefficient when we have more than 1 volume. AsyncDiskService is the
* solution for these.
+ * Another example of async disk operation is requesting sync_file_range().
*
* This class and {@link org.apache.hadoop.util.AsyncDiskService} are similar.
* They should be combined.
@@ -57,6 +61,7 @@ class FsDatasetAsyncDiskService {
private static final long THREADS_KEEP_ALIVE_SECONDS = 60;
private final DataNode datanode;
+ private final ThreadGroup threadGroup;
private Map<File, ThreadPoolExecutor> executors
= new HashMap<File, ThreadPoolExecutor>();
@@ -66,42 +71,52 @@ class FsDatasetAsyncDiskService {
*
* The AsyncDiskServices uses one ThreadPool per volume to do the async
* disk operations.
- *
- * @param volumes The roots of the data volumes.
*/
- FsDatasetAsyncDiskService(DataNode datanode, File[] volumes) {
+ FsDatasetAsyncDiskService(DataNode datanode) {
this.datanode = datanode;
+ this.threadGroup = new ThreadGroup(getClass().getSimpleName());
+ }
+
+ private void addExecutorForVolume(final File volume) {
+ ThreadFactory threadFactory = new ThreadFactory() {
+ int counter = 0;
+
+ @Override
+ public Thread newThread(Runnable r) {
+ int thisIndex;
+ synchronized (this) {
+ thisIndex = counter++;
+ }
+ Thread t = new Thread(threadGroup, r);
+ t.setName("Async disk worker #" + thisIndex +
+ " for volume " + volume);
+ return t;
+ }
+ };
- final ThreadGroup threadGroup = new ThreadGroup(getClass().getSimpleName());
- // Create one ThreadPool per volume
- for (int v = 0 ; v < volumes.length; v++) {
- final File vol = volumes[v];
- ThreadFactory threadFactory = new ThreadFactory() {
- int counter = 0;
-
- @Override
- public Thread newThread(Runnable r) {
- int thisIndex;
- synchronized (this) {
- thisIndex = counter++;
- }
- Thread t = new Thread(threadGroup, r);
- t.setName("Async disk worker #" + thisIndex +
- " for volume " + vol);
- return t;
- }
- };
-
- ThreadPoolExecutor executor = new ThreadPoolExecutor(
- CORE_THREADS_PER_VOLUME, MAXIMUM_THREADS_PER_VOLUME,
- THREADS_KEEP_ALIVE_SECONDS, TimeUnit.SECONDS,
- new LinkedBlockingQueue<Runnable>(), threadFactory);
-
- // This can reduce the number of running threads
- executor.allowCoreThreadTimeOut(true);
- executors.put(vol, executor);
+ ThreadPoolExecutor executor = new ThreadPoolExecutor(
+ CORE_THREADS_PER_VOLUME, MAXIMUM_THREADS_PER_VOLUME,
+ THREADS_KEEP_ALIVE_SECONDS, TimeUnit.SECONDS,
+ new LinkedBlockingQueue<Runnable>(), threadFactory);
+
+ // This can reduce the number of running threads
+ executor.allowCoreThreadTimeOut(true);
+ executors.put(volume, executor);
+ }
+
+ /**
+ * Starts AsyncDiskService for a new volume
+ * @param volume the root of the new data volume.
+ */
+ synchronized void addVolume(File volume) {
+ if (executors == null) {
+ throw new RuntimeException("AsyncDiskService is already shutdown");
}
-
+ ThreadPoolExecutor executor = executors.get(volume);
+ if (executor != null) {
+ throw new RuntimeException("Volume " + volume + " is already existed.");
+ }
+ addExecutorForVolume(volume);
}
synchronized long countPendingDeletions() {
@@ -148,6 +163,21 @@ class FsDatasetAsyncDiskService {
}
}
+ public void submitSyncFileRangeRequest(FsVolumeImpl volume,
+ final FileDescriptor fd, final long offset, final long nbytes,
+ final int flags) {
+ execute(volume.getCurrentDir(), new Runnable() {
+ @Override
+ public void run() {
+ try {
+ NativeIO.POSIX.syncFileRangeIfPossible(fd, offset, nbytes, flags);
+ } catch (NativeIOException e) {
+ LOG.warn("sync_file_range error", e);
+ }
+ }
+ });
+ }
+
/**
* Delete the block file and meta file from the disk asynchronously, adjust
* dfsUsed statistics accordingly.