You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cn...@apache.org on 2013/06/21 08:37:39 UTC
svn commit: r1495297 [19/46] - in /hadoop/common/branches/branch-1-win: ./
bin/ conf/ ivy/ lib/jdiff/ src/c++/libhdfs/docs/
src/c++/libhdfs/tests/conf/ src/contrib/capacity-scheduler/ivy/
src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred...
Modified: hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original)
+++ hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java Fri Jun 21 06:37:27 2013
@@ -27,8 +27,10 @@ import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
+import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.net.UnknownHostException;
+import java.nio.channels.ClosedByInterruptException;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.security.NoSuchAlgorithmException;
@@ -56,6 +58,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
@@ -76,11 +79,9 @@ import org.apache.hadoop.hdfs.security.t
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager.AccessMode;
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
-import org.apache.hadoop.hdfs.server.common.GenerationStamp;
import org.apache.hadoop.hdfs.server.common.HdfsConstants;
import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption;
import org.apache.hadoop.hdfs.server.common.IncorrectVersionException;
-import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.datanode.FSDataset.VolumeInfo;
import org.apache.hadoop.hdfs.server.datanode.SecureDataNodeStarter.SecureResources;
import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeInstrumentation;
@@ -106,6 +107,7 @@ import org.apache.hadoop.hdfs.web.WebHdf
import org.apache.hadoop.hdfs.web.resources.Param;
import org.apache.hadoop.http.HttpServer;
import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.ReadaheadPool;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RemoteException;
@@ -126,6 +128,7 @@ import org.apache.hadoop.util.DiskChecke
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.ServicePlugin;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.VersionInfo;
import org.mortbay.util.ajax.JSON;
@@ -226,12 +229,26 @@ public class DataNode extends Configured
int socketTimeout;
int socketWriteTimeout = 0;
boolean transferToAllowed = true;
+ private boolean dropCacheBehindWrites = false;
+ private boolean syncBehindWrites = false;
+ private boolean dropCacheBehindReads = false;
+ private long readaheadLength = 0;
+
int writePacketSize = 0;
- private boolean supportAppends;
+ private boolean durableSync;
boolean isBlockTokenEnabled;
BlockTokenSecretManager blockTokenSecretManager;
boolean isBlockTokenInitialized = false;
+ boolean syncOnClose;
+
final String userWithLocalPathAccess;
+ private boolean connectToDnViaHostname;
+ private boolean relaxedVersionCheck;
+
+ /**
+ * Whether the DN completely skips version check with the NN.
+ */
+ private boolean noVersionCheck;
/**
* Testing hook that allows tests to delay the sending of blockReceived RPCs
@@ -242,6 +259,9 @@ public class DataNode extends Configured
public DataBlockScanner blockScanner = null;
public Daemon blockScannerThread = null;
+ /** Activated plug-ins. */
+ private List<ServicePlugin> plugins;
+
private static final Random R = new Random();
public static final String DATA_DIR_KEY = "dfs.data.dir";
@@ -267,6 +287,8 @@ public class DataNode extends Configured
public Server ipcServer;
private SecureResources secureResources = null;
+
+ ReadaheadPool readaheadPool;
/**
* Current system time.
@@ -297,7 +319,7 @@ public class DataNode extends Configured
DFSConfigKeys.DFS_DATANODE_USER_NAME_KEY);
datanodeObject = this;
- supportAppends = conf.getBoolean("dfs.support.append", false);
+ durableSync = conf.getBoolean("dfs.durable.sync", true);
this.userWithLocalPathAccess = conf
.get(DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY);
try {
@@ -351,6 +373,13 @@ public class DataNode extends Configured
true);
this.writePacketSize = conf.getInt("dfs.write.packet.size", 64*1024);
+ this.relaxedVersionCheck = conf.getBoolean(
+ CommonConfigurationKeys.HADOOP_RELAXED_VERSION_CHECK_KEY,
+ CommonConfigurationKeys.HADOOP_RELAXED_VERSION_CHECK_DEFAULT);
+ noVersionCheck = conf.getBoolean(
+ CommonConfigurationKeys.HADOOP_SKIP_VERSION_CHECK_KEY,
+ CommonConfigurationKeys.HADOOP_SKIP_VERSION_CHECK_DEFAULT);
+
InetSocketAddress socAddr = DataNode.getStreamingAddr(conf);
int tmpPort = socAddr.getPort();
storage = new DataStorage();
@@ -416,13 +445,26 @@ public class DataNode extends Configured
selfAddr = new InetSocketAddress(ss.getInetAddress().getHostAddress(),
tmpPort);
this.dnRegistration.setName(machineName + ":" + tmpPort);
- LOG.info("Opened info server at " + tmpPort);
+ LOG.info("Opened data transfer server at " + tmpPort);
this.threadGroup = new ThreadGroup("dataXceiverServer");
this.dataXceiverServer = new Daemon(threadGroup,
new DataXceiverServer(ss, conf, this));
this.threadGroup.setDaemon(true); // auto destroy when empty
+ this.readaheadLength = conf.getLong(
+ DFSConfigKeys.DFS_DATANODE_READAHEAD_BYTES_KEY,
+ DFSConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT);
+ this.dropCacheBehindWrites = conf.getBoolean(
+ DFSConfigKeys.DFS_DATANODE_DROP_CACHE_BEHIND_WRITES_KEY,
+ DFSConfigKeys.DFS_DATANODE_DROP_CACHE_BEHIND_WRITES_DEFAULT);
+ this.syncBehindWrites = conf.getBoolean(
+ DFSConfigKeys.DFS_DATANODE_SYNC_BEHIND_WRITES_KEY,
+ DFSConfigKeys.DFS_DATANODE_SYNC_BEHIND_WRITES_DEFAULT);
+ this.dropCacheBehindReads = conf.getBoolean(
+ DFSConfigKeys.DFS_DATANODE_DROP_CACHE_BEHIND_READS_KEY,
+ DFSConfigKeys.DFS_DATANODE_DROP_CACHE_BEHIND_READS_DEFAULT);
+
this.blockReportInterval =
conf.getLong("dfs.blockreport.intervalMsec", BLOCKREPORT_INTERVAL);
this.initialBlockReportDelay = conf.getLong("dfs.blockreport.initialDelay",
@@ -433,6 +475,11 @@ public class DataNode extends Configured
"dfs.blockreport.intervalMsec." + " Setting initial delay to 0 msec:");
}
this.heartBeatInterval = conf.getLong("dfs.heartbeat.interval", HEARTBEAT_INTERVAL) * 1000L;
+
+ // do we need to sync block file contents to disk when blockfile is closed?
+ this.syncOnClose = conf.getBoolean(DFSConfigKeys.DFS_DATANODE_SYNCONCLOSE_KEY,
+ DFSConfigKeys.DFS_DATANODE_SYNCONCLOSE_DEFAULT);
+
DataNode.nameNodeAddr = nameNodeAddr;
//initialize periodic block scanner
@@ -446,9 +493,16 @@ public class DataNode extends Configured
blockScanner = new DataBlockScanner(this, (FSDataset)data, conf);
} else {
LOG.info("Periodic Block Verification is disabled because " +
- reason + ".");
+ reason);
}
+ readaheadPool = ReadaheadPool.getInstance();
+
+ this.connectToDnViaHostname = conf.getBoolean(
+ DFSConfigKeys.DFS_DATANODE_USE_DN_HOSTNAME,
+ DFSConfigKeys.DFS_DATANODE_USE_DN_HOSTNAME_DEFAULT);
+ LOG.debug("Connect to datanode via hostname is " + connectToDnViaHostname);
+
//create a servlet to serve full-file content
InetSocketAddress infoSocAddr = DataNode.getInfoAddr(conf);
String infoHost = infoSocAddr.getHostName();
@@ -508,6 +562,16 @@ public class DataNode extends Configured
dnRegistration.setIpcPort(ipcServer.getListenerAddress().getPort());
LOG.info("dnRegistration = " + dnRegistration);
+
+ plugins = conf.getInstances("dfs.datanode.plugins", ServicePlugin.class);
+ for (ServicePlugin p: plugins) {
+ try {
+ p.start(this);
+ LOG.info("Started plug-in " + p);
+ } catch (Throwable t) {
+ LOG.warn("ServicePlugin " + p + " could not be started", t);
+ }
+ }
}
private ObjectName mxBean = null;
@@ -547,6 +611,43 @@ public class DataNode extends Configured
SocketChannel.open().socket() : new Socket();
}
+ /**
+ * @return true if this datanode is permitted to connect to
+ * the given namenode version
+ */
+ boolean isPermittedVersion(NamespaceInfo nsInfo) {
+ boolean versionMatch =
+ nsInfo.getVersion().equals(VersionInfo.getVersion());
+ boolean revisionMatch =
+ nsInfo.getRevision().equals(VersionInfo.getRevision());
+ if (revisionMatch && !versionMatch) {
+ throw new AssertionError("Invalid build. The revisions match" +
+ " but the NN version is " + nsInfo.getVersion() +
+ " and the DN version is " + VersionInfo.getVersion());
+ }
+ if (noVersionCheck) {
+ LOG.info("Permitting datanode version '" + VersionInfo.getVersion() +
+ "' and revision '" + VersionInfo.getRevision() +
+ "' to connect to namenode version '" + nsInfo.getVersion() +
+ "' and revision '" + nsInfo.getRevision() + "' because " +
+ CommonConfigurationKeys.HADOOP_SKIP_VERSION_CHECK_KEY +
+ " is enabled");
+ return true;
+ } else {
+ if (relaxedVersionCheck) {
+ if (versionMatch && !revisionMatch) {
+ LOG.info("Permitting datanode revision " + VersionInfo.getRevision() +
+ " to connect to namenode revision " + nsInfo.getRevision() +
+ " because " + CommonConfigurationKeys.HADOOP_RELAXED_VERSION_CHECK_KEY +
+ " is enabled");
+ }
+ return versionMatch;
+ } else {
+ return revisionMatch;
+ }
+ }
+ }
+
private NamespaceInfo handshake() throws IOException {
NamespaceInfo nsInfo = new NamespaceInfo();
while (shouldRun) {
@@ -560,13 +661,17 @@ public class DataNode extends Configured
} catch (InterruptedException ie) {}
}
}
- String errorMsg = null;
- // verify build version
- if( ! nsInfo.getBuildVersion().equals( Storage.getBuildVersion() )) {
- errorMsg = "Incompatible build versions: namenode BV = "
- + nsInfo.getBuildVersion() + "; datanode BV = "
- + Storage.getBuildVersion();
- LOG.fatal( errorMsg );
+ if (!isPermittedVersion(nsInfo)) {
+ String errorMsg = "Shutting down. Incompatible version or revision." +
+ "DataNode version '" + VersionInfo.getVersion() +
+ "' and revision '" + VersionInfo.getRevision() +
+ "' and NameNode version '" + nsInfo.getVersion() +
+ "' and revision '" + nsInfo.getRevision() +
+ " and " + CommonConfigurationKeys.HADOOP_RELAXED_VERSION_CHECK_KEY +
+ " is " + (relaxedVersionCheck ? "enabled" : "not enabled") +
+ " and " + CommonConfigurationKeys.HADOOP_SKIP_VERSION_CHECK_KEY +
+ " is " + (noVersionCheck ? "enabled" : "not enabled");
+ LOG.fatal(errorMsg);
notifyNamenode(DatanodeProtocol.NOTIFY, errorMsg);
throw new IOException( errorMsg );
}
@@ -584,9 +689,10 @@ public class DataNode extends Configured
}
public static InterDatanodeProtocol createInterDataNodeProtocolProxy(
- DatanodeID datanodeid, final Configuration conf, final int socketTimeout) throws IOException {
- final InetSocketAddress addr = NetUtils.createSocketAddr(
- datanodeid.getHost() + ":" + datanodeid.getIpcPort());
+ DatanodeInfo info, final Configuration conf, final int socketTimeout,
+ boolean connectToDnViaHostname) throws IOException {
+ final String dnName = info.getNameWithIpcPort(connectToDnViaHostname);
+ final InetSocketAddress addr = NetUtils.createSocketAddr(dnName);
if (InterDatanodeProtocol.LOG.isDebugEnabled()) {
InterDatanodeProtocol.LOG.info("InterDatanodeProtocol addr=" + addr);
}
@@ -720,12 +826,13 @@ public class DataNode extends Configured
dnRegistration.exportedKeys = ExportedBlockKeys.DUMMY_KEYS;
}
- if (supportAppends) {
+ if (durableSync) {
Block[] bbwReport = data.getBlocksBeingWrittenReport();
- long[] blocksBeingWritten = BlockListAsLongs
- .convertToArrayLongs(bbwReport);
+ long[] blocksBeingWritten =
+ BlockListAsLongs.convertToArrayLongs(bbwReport);
namenode.blocksBeingWrittenReport(dnRegistration, blocksBeingWritten);
}
+
// random short delay - helps scatter the BR from all DNs
// - but we can start generating the block report immediately
data.requestAsyncBlockReport();
@@ -739,6 +846,17 @@ public class DataNode extends Configured
* Otherwise, deadlock might occur.
*/
public void shutdown() {
+ if (plugins != null) {
+ for (ServicePlugin p : plugins) {
+ try {
+ p.stop();
+ LOG.info("Stopped plug-in " + p);
+ } catch (Throwable t) {
+ LOG.warn("ServicePlugin " + p + " could not be stopped", t);
+ }
+ }
+ }
+
this.unRegisterMXBean();
if (infoServer != null) {
try {
@@ -812,10 +930,19 @@ public class DataNode extends Configured
/** Check if there is no space in disk
* @param e that caused this checkDiskError call
**/
- protected void checkDiskError(Exception e ) throws IOException {
-
- LOG.warn("checkDiskError: exception: ", e);
-
+ protected void checkDiskError(Exception e ) throws IOException {
+ LOG.warn("checkDiskError: exception: ", e);
+ if (e instanceof SocketException || e instanceof SocketTimeoutException
+ || e instanceof ClosedByInterruptException
+ || e.getMessage().startsWith("An established connection was aborted")
+ || e.getMessage().startsWith("Broken pipe")
+ || e.getMessage().startsWith("Connection reset")
+ || e.getMessage().contains("java.nio.channels.SocketChannel")) {
+ LOG.info("Not checking disk as checkDiskError was called on a network" +
+ " related exception");
+ return;
+ }
+
if (e.getMessage() != null &&
e.getMessage().startsWith("No space left on device")) {
throw new DiskOutOfSpaceException("No space left on device");
@@ -872,7 +999,8 @@ public class DataNode extends Configured
}
/** Number of concurrent xceivers per node. */
- int getXceiverCount() {
+ @Override // DataNodeMXBean
+ public int getXceiverCount() {
return threadGroup == null ? 0 : threadGroup.activeCount();
}
@@ -881,7 +1009,6 @@ public class DataNode extends Configured
* forever calling remote NameNode functions.
*/
public void offerService() throws Exception {
-
LOG.info("using BLOCKREPORT_INTERVAL of " + blockReportInterval + "msec" +
" Initial delay: " + initialBlockReportDelay + "msec");
@@ -913,7 +1040,6 @@ public class DataNode extends Configured
xmitsInProgress.get(),
getXceiverCount());
myMetrics.addHeartBeat(now() - startTime);
- //LOG.info("Just sent heartbeat, with name " + localName);
if (!processCommand(cmds))
continue;
}
@@ -1012,7 +1138,7 @@ public class DataNode extends Configured
// start block scanner
if (blockScanner != null && blockScannerThread == null &&
upgradeManager.isUpgradeCompleted()) {
- LOG.info("Starting Periodic block scanner.");
+ LOG.info("Starting Periodic block scanner");
blockScannerThread = new Daemon(blockScanner);
blockScannerThread.start();
}
@@ -1117,7 +1243,7 @@ public class DataNode extends Configured
}
data.invalidate(toDelete);
} catch(IOException e) {
- checkDiskError();
+ // Exceptions caught here are not expected to be disk-related.
throw e;
}
myMetrics.incrBlocksRemoved(toDelete.length);
@@ -1196,7 +1322,7 @@ public class DataNode extends Configured
) throws IOException {
if (!data.isValidBlock(block)) {
// block does not exist or is under-construction
- String errStr = "Can't send invalid block " + block;
+ String errStr = "Can't send invalid " + block;
LOG.info(errStr);
notifyNamenode(DatanodeProtocol.INVALID_BLOCK, errStr);
return;
@@ -1209,7 +1335,7 @@ public class DataNode extends Configured
namenode.reportBadBlocks(new LocatedBlock[]{
new LocatedBlock(block, new DatanodeInfo[] {
new DatanodeInfo(dnRegistration)})});
- LOG.info("Can't replicate block " + block
+ LOG.info("Can't replicate " + block
+ " because on-disk length " + onDiskLength
+ " is shorter than NameNode recorded length " + block.getNumBytes());
return;
@@ -1223,7 +1349,7 @@ public class DataNode extends Configured
xfersBuilder.append(xferTargets[i].getName());
xfersBuilder.append(" ");
}
- LOG.info(dnRegistration + " Starting thread to transfer block " +
+ LOG.info(dnRegistration + " Starting thread to transfer " +
block + " to " + xfersBuilder);
}
@@ -1238,7 +1364,7 @@ public class DataNode extends Configured
try {
transferBlock(blocks[i], xferTargets[i]);
} catch (IOException ie) {
- LOG.warn("Failed to transfer block " + blocks[i], ie);
+ LOG.warn("Failed to transfer " + blocks[i], ie);
}
}
}
@@ -1383,9 +1509,10 @@ public class DataNode extends Configured
BlockSender blockSender = null;
try {
- InetSocketAddress curTarget =
- NetUtils.createSocketAddr(targets[0].getName());
+ final String dnName = targets[0].getName(connectToDnViaHostname);
+ InetSocketAddress curTarget = NetUtils.createSocketAddr(dnName);
sock = newSocket();
+ LOG.debug("Connecting to " + dnName);
NetUtils.connect(sock, curTarget, socketTimeout);
sock.setSoTimeout(targets.length * socketTimeout);
@@ -1426,14 +1553,17 @@ public class DataNode extends Configured
blockSender.sendBlock(out, baseStream, null);
// no response necessary
- LOG.info(dnRegistration + ":Transmitted block " + b + " to " + curTarget);
+ LOG.info(dnRegistration + ":Transmitted " + b + " to " + curTarget);
} catch (IOException ie) {
LOG.warn(dnRegistration + ":Failed to transfer " + b + " to " + targets[0].getName()
+ " got " + StringUtils.stringifyException(ie));
// check if there are any disk problem
- datanode.checkDiskError();
-
+ try{
+ checkDiskError(ie);
+ } catch(IOException e) {
+ LOG.warn("DataNode.checkDiskError failed in run() with: ", e);
+ }
} finally {
xmitsInProgress.getAndDecrement();
IOUtils.closeStream(blockSender);
@@ -1512,7 +1642,7 @@ public class DataNode extends Configured
conf = new Configuration();
if (!parseArguments(args, conf)) {
printUsage();
- return null;
+ System.exit(-2);
}
if (conf.get("dfs.network.script") != null) {
LOG.error("This configuration for rack identification is not supported" +
@@ -1747,9 +1877,9 @@ public class DataNode extends Configured
data.finalizeBlockIfNeeded(newblock);
myMetrics.incrBlocksWritten();
notifyNamenodeReceivedBlock(newblock, EMPTY_DEL_HINT);
- LOG.info("Received block " + newblock +
+ LOG.info("Received " + newblock +
" of size " + newblock.getNumBytes() +
- " as part of lease recovery.");
+ " as part of lease recovery");
}
}
@@ -1875,13 +2005,12 @@ public class DataNode extends Configured
private LocatedBlock recoverBlock(Block block, boolean keepLength,
DatanodeInfo[] targets, boolean closeFile) throws IOException {
- DatanodeID[] datanodeids = (DatanodeID[])targets;
// If the block is already being recovered, then skip recovering it.
// This can happen if the namenode and client start recovering the same
// file at the same time.
synchronized (ongoingRecovery) {
if (ongoingRecovery.get(block.getWithWildcardGS()) != null) {
- String msg = "Block " + block + " is already being recovered, " +
+ String msg = block + " is already being recovered, " +
" ignoring this request to recover it.";
LOG.info(msg);
throw new IOException(msg);
@@ -1901,13 +2030,14 @@ public class DataNode extends Configured
int rwrCount = 0;
List<BlockRecord> blockRecords = new ArrayList<BlockRecord>();
- for(DatanodeID id : datanodeids) {
+ for (DatanodeInfo id : targets) {
try {
- InterDatanodeProtocol datanode = dnRegistration.equals(id)?
- this: DataNode.createInterDataNodeProtocolProxy(id, getConf(), socketTimeout);
+ InterDatanodeProtocol datanode = dnRegistration.equals(id) ? this
+ : DataNode.createInterDataNodeProtocolProxy(
+ id, getConf(), socketTimeout, connectToDnViaHostname);
BlockRecoveryInfo info = datanode.startBlockRecovery(block);
if (info == null) {
- LOG.info("No block metadata found for block " + block + " on datanode "
+ LOG.info("No block metadata found for " + block + " on datanode "
+ id);
continue;
}
@@ -1962,7 +2092,7 @@ public class DataNode extends Configured
if (syncList.isEmpty() && errorCount > 0) {
throw new IOException("All datanodes failed: block=" + block
- + ", datanodeids=" + Arrays.asList(datanodeids));
+ + ", datanodeids=" + Arrays.asList(targets));
}
if (!keepLength) {
block.setNumBytes(minlength);
@@ -2133,4 +2263,20 @@ public class DataNode extends Configured
(DataXceiverServer) this.dataXceiverServer.getRunnable();
return dxcs.balanceThrottler.getBandwidth();
}
+
+ long getReadaheadLength() {
+ return readaheadLength;
+ }
+
+ boolean shouldDropCacheBehindWrites() {
+ return dropCacheBehindWrites;
+ }
+
+ boolean shouldDropCacheBehindReads() {
+ return dropCacheBehindReads;
+ }
+
+ boolean shouldSyncBehindWrites() {
+ return syncBehindWrites;
+ }
}
Modified: hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNodeMXBean.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNodeMXBean.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNodeMXBean.java (original)
+++ hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNodeMXBean.java Fri Jun 21 06:37:27 2013
@@ -63,4 +63,10 @@ public interface DataNodeMXBean {
* @return the volume info
*/
public String getVolumeInfo();
+
+ /**
+ * Returns an estimate of the number of Datanode threads
+ * actively transferring blocks.
+ */
+ public int getXceiverCount();
}
Modified: hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataStorage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataStorage.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataStorage.java (original)
+++ hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataStorage.java Fri Jun 21 06:37:27 2013
@@ -115,11 +115,11 @@ public class DataStorage extends Storage
break;
case NON_EXISTENT:
// ignore this storage
- LOG.info("Storage directory " + dataDir + " does not exist.");
+ LOG.info("Storage directory " + dataDir + " does not exist");
it.remove();
continue;
case NOT_FORMATTED: // format
- LOG.info("Storage directory " + dataDir + " is not formatted.");
+ LOG.info("Storage directory " + dataDir + " is not formatted");
LOG.info("Formatting ...");
format(sd, nsInfo);
break;
@@ -291,7 +291,7 @@ public class DataStorage extends Storage
// rename tmp to previous
rename(tmpDir, prevDir);
LOG.info( hardLink.linkStats.report());
- LOG.info("Upgrade of " + sd.getRoot()+ " is complete.");
+ LOG.info("Upgrade of " + sd.getRoot()+ " is complete");
}
void doRollback( StorageDirectory sd,
@@ -327,7 +327,7 @@ public class DataStorage extends Storage
rename(prevDir, curDir);
// delete tmp dir
deleteDir(tmpDir);
- LOG.info("Rollback of " + sd.getRoot() + " is complete.");
+ LOG.info("Rollback of " + sd.getRoot() + " is complete");
}
void doFinalize(StorageDirectory sd) throws IOException {
@@ -350,9 +350,9 @@ public class DataStorage extends Storage
try {
deleteDir(tmpDir);
} catch(IOException ex) {
- LOG.error("Finalize upgrade for " + dataDirPath + " failed.", ex);
+ LOG.error("Finalize upgrade for " + dataDirPath + " failed", ex);
}
- LOG.info("Finalize upgrade for " + dataDirPath + " is complete.");
+ LOG.info("Finalize upgrade for " + dataDirPath + " is complete");
}
public String toString() { return "Finalize " + dataDirPath; }
}).start();
Modified: hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java (original)
+++ hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java Fri Jun 21 06:37:27 2013
@@ -28,6 +28,7 @@ import java.net.Socket;
import java.net.SocketException;
import org.apache.commons.logging.Log;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
@@ -58,6 +59,7 @@ class DataXceiver implements Runnable, F
final String localAddress; // local address of this daemon
DataNode datanode;
DataXceiverServer dataXceiverServer;
+ private boolean connectToDnViaHostname;
public DataXceiver(Socket s, DataNode datanode,
DataXceiverServer dataXceiverServer) {
@@ -69,6 +71,9 @@ class DataXceiver implements Runnable, F
remoteAddress = s.getRemoteSocketAddress().toString();
localAddress = s.getLocalSocketAddress().toString();
LOG.debug("Number of active connections is: " + datanode.getXceiverCount());
+ this.connectToDnViaHostname = datanode.getConf().getBoolean(
+ DFSConfigKeys.DFS_DATANODE_USE_DN_HOSTNAME,
+ DFSConfigKeys.DFS_DATANODE_USE_DN_HOSTNAME_DEFAULT);
}
/**
@@ -169,7 +174,7 @@ class DataXceiver implements Runnable, F
out.writeShort(DataTransferProtocol.OP_STATUS_ERROR_ACCESS_TOKEN);
out.flush();
throw new IOException("Access token verification failed, for client "
- + remoteAddress + " for OP_READ_BLOCK for block " + block);
+ + remoteAddress + " for OP_READ_BLOCK for " + block);
} finally {
IOUtils.closeStream(out);
}
@@ -182,7 +187,7 @@ class DataXceiver implements Runnable, F
? String.format(DN_CLIENTTRACE_FORMAT, localAddress, remoteAddress,
"%d", "HDFS_READ", clientName, "%d",
datanode.dnRegistration.getStorageID(), block, "%d")
- : datanode.dnRegistration + " Served block " + block + " to " +
+ : datanode.dnRegistration + " Served " + block + " to " +
s.getInetAddress();
try {
try {
@@ -217,9 +222,8 @@ class DataXceiver implements Runnable, F
* Earlier version shutdown() datanode if there is disk error.
*/
LOG.warn(datanode.dnRegistration + ":Got exception while serving " +
- block + " to " +
- s.getInetAddress() + ":\n" +
- StringUtils.stringifyException(ioe) );
+ block + " to " + s.getInetAddress() + ":\n" +
+ StringUtils.stringifyException(ioe) );
throw ioe;
} finally {
IOUtils.closeStream(out);
@@ -242,9 +246,8 @@ class DataXceiver implements Runnable, F
//
Block block = new Block(in.readLong(),
dataXceiverServer.estimateBlockSize, in.readLong());
- LOG.info("Receiving block " + block +
- " src: " + remoteAddress +
- " dest: " + localAddress);
+ LOG.info("Receiving " + block + " src: " + remoteAddress + " dest: "
+ + localAddress);
int pipelineSize = in.readInt(); // num of datanodes in entire pipeline
boolean isRecovery = in.readBoolean(); // is this part of recovery?
String client = Text.readString(in); // working on behalf of this client
@@ -280,7 +283,7 @@ class DataXceiver implements Runnable, F
replyOut.flush();
}
throw new IOException("Access token verification failed, for client "
- + remoteAddress + " for OP_WRITE_BLOCK for block " + block);
+ + remoteAddress + " for OP_WRITE_BLOCK for " + block);
} finally {
IOUtils.closeStream(replyOut);
}
@@ -308,14 +311,17 @@ class DataXceiver implements Runnable, F
if (targets.length > 0) {
InetSocketAddress mirrorTarget = null;
// Connect to backup machine
+ final String mirrorAddrString =
+ targets[0].getName(connectToDnViaHostname);
mirrorNode = targets[0].getName();
- mirrorTarget = NetUtils.createSocketAddr(mirrorNode);
+ mirrorTarget = NetUtils.createSocketAddr(mirrorAddrString);
mirrorSock = datanode.newSocket();
try {
int timeoutValue = datanode.socketTimeout +
(HdfsConstants.READ_TIMEOUT_EXTENSION * numTargets);
int writeTimeout = datanode.socketWriteTimeout +
(HdfsConstants.WRITE_TIMEOUT_EXTENSION * numTargets);
+ LOG.debug("Connecting to " + mirrorAddrString);
NetUtils.connect(mirrorSock, mirrorTarget, timeoutValue);
mirrorSock.setSoTimeout(timeoutValue);
mirrorSock.setSendBufferSize(DEFAULT_DATA_SOCKET_SIZE);
@@ -373,9 +379,9 @@ class DataXceiver implements Runnable, F
if (client.length() > 0) {
throw e;
} else {
- LOG.info(datanode.dnRegistration + ":Exception transfering block " +
+ LOG.info(datanode.dnRegistration + ":Exception transfering " +
block + " to mirror " + mirrorNode +
- ". continuing without the mirror.\n" +
+ "- continuing without the mirror\n" +
StringUtils.stringifyException(e));
}
}
@@ -403,10 +409,8 @@ class DataXceiver implements Runnable, F
// the block is finalized in the PacketResponder.
if (client.length() == 0) {
datanode.notifyNamenodeReceivedBlock(block, DataNode.EMPTY_DEL_HINT);
- LOG.info("Received block " + block +
- " src: " + remoteAddress +
- " dest: " + localAddress +
- " of size " + block.getNumBytes());
+ LOG.info("Received " + block + " src: " + remoteAddress + " dest: "
+ + localAddress + " size " + block.getNumBytes());
}
if (datanode.blockScanner != null) {
@@ -446,7 +450,7 @@ class DataXceiver implements Runnable, F
out.flush();
throw new IOException(
"Access token verification failed, for client " + remoteAddress
- + " for OP_BLOCK_CHECKSUM for block " + block);
+ + " for OP_BLOCK_CHECKSUM for " + block);
} finally {
IOUtils.closeStream(out);
}
@@ -504,7 +508,7 @@ class DataXceiver implements Runnable, F
BlockTokenSecretManager.AccessMode.COPY);
} catch (InvalidToken e) {
LOG.warn("Invalid access token in request from "
- + remoteAddress + " for OP_COPY_BLOCK for block " + block);
+ + remoteAddress + " for OP_COPY_BLOCK for " + block);
sendResponse(s,
(short) DataTransferProtocol.OP_STATUS_ERROR_ACCESS_TOKEN,
datanode.socketWriteTimeout);
@@ -514,7 +518,7 @@ class DataXceiver implements Runnable, F
if (!dataXceiverServer.balanceThrottler.acquire()) { // not able to start
LOG.info("Not able to copy block " + blockId + " to "
- + s.getRemoteSocketAddress() + " because threads quota is exceeded.");
+ + s.getRemoteSocketAddress() + " because threads quota is exceeded");
sendResponse(s, (short)DataTransferProtocol.OP_STATUS_ERROR,
datanode.socketWriteTimeout);
return;
@@ -544,7 +548,7 @@ class DataXceiver implements Runnable, F
datanode.myMetrics.incrBytesRead((int) read);
datanode.myMetrics.incrBlocksRead();
- LOG.info("Copied block " + block + " to " + s.getRemoteSocketAddress());
+ LOG.info("Copied " + block + " to " + s.getRemoteSocketAddress());
} catch (IOException ioe) {
isOpSuccess = false;
throw ioe;
@@ -608,9 +612,11 @@ class DataXceiver implements Runnable, F
try {
// get the output stream to the proxy
- InetSocketAddress proxyAddr = NetUtils.createSocketAddr(
- proxySource.getName());
+ final String proxyAddrString =
+ proxySource.getName(connectToDnViaHostname);
+ InetSocketAddress proxyAddr = NetUtils.createSocketAddr(proxyAddrString);
proxySock = datanode.newSocket();
+ LOG.debug("Connecting to " + proxyAddrString);
NetUtils.connect(proxySock, proxyAddr, datanode.socketTimeout);
proxySock.setSoTimeout(datanode.socketTimeout);
@@ -633,11 +639,11 @@ class DataXceiver implements Runnable, F
short status = proxyReply.readShort();
if (status != DataTransferProtocol.OP_STATUS_SUCCESS) {
if (status == DataTransferProtocol.OP_STATUS_ERROR_ACCESS_TOKEN) {
- throw new IOException("Copy block " + block + " from "
+ throw new IOException("Copy " + block + " from "
+ proxySock.getRemoteSocketAddress()
+ " failed due to access token error");
}
- throw new IOException("Copy block " + block + " from "
+ throw new IOException("Copy " + block + " from "
+ proxySock.getRemoteSocketAddress() + " failed");
}
// open a block receiver and check if the block does not exist
@@ -653,7 +659,7 @@ class DataXceiver implements Runnable, F
// notify name node
datanode.notifyNamenodeReceivedBlock(block, sourceID);
- LOG.info("Moved block " + block +
+ LOG.info("Moved " + block +
" from " + s.getRemoteSocketAddress());
} catch (IOException ioe) {
Modified: hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java (original)
+++ hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java Fri Jun 21 06:37:27 2013
@@ -32,6 +32,7 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.hdfs.protocol.FSConstants;
import org.apache.hadoop.hdfs.server.balancer.Balancer;
import org.apache.hadoop.util.Daemon;
+import org.apache.hadoop.hdfs.util.DataTransferThrottler;
import org.apache.hadoop.util.StringUtils;
/**
@@ -63,7 +64,7 @@ class DataXceiverServer implements Runna
* It limits the number of block moves for balancing and
* the total amount of bandwidth they can use.
*/
- static class BlockBalanceThrottler extends BlockTransferThrottler {
+ static class BlockBalanceThrottler extends DataTransferThrottler {
private int numThreads;
/**Constructor
Modified: hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DatanodeBlockInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DatanodeBlockInfo.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DatanodeBlockInfo.java (original)
+++ hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DatanodeBlockInfo.java Fri Jun 21 06:37:27 2013
@@ -109,15 +109,15 @@ class DatanodeBlockInfo {
return false;
}
if (file == null || volume == null) {
- throw new IOException("detachBlock:Block not found. " + block);
+ throw new IOException("detachBlock: not found " + block);
}
File meta = FSDataset.getMetaFile(file, block);
if (meta == null) {
- throw new IOException("Meta file not found for block " + block);
+ throw new IOException("Meta file not found for " + block);
}
if (HardLink.getLinkCount(file) > numLinks) {
- DataNode.LOG.info("CopyOnWrite for block " + block);
+ DataNode.LOG.info("CopyOnWrite for " + block);
detachFile(file, block);
}
if (HardLink.getLinkCount(meta) > numLinks) {
Modified: hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java (original)
+++ hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java Fri Jun 21 06:37:27 2013
@@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.server.da
import java.io.File;
import java.io.FileInputStream;
+import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.FilenameFilter;
import java.io.IOException;
@@ -356,7 +357,7 @@ public class FSDataset implements FSCons
this.reserved = conf.getLong("dfs.datanode.du.reserved", 0);
this.dataDir = new FSDir(currentDir);
this.currentDir = currentDir;
- boolean supportAppends = conf.getBoolean("dfs.support.append", false);
+ boolean durableSync = conf.getBoolean("dfs.durable.sync", true);
File parent = currentDir.getParentFile();
this.detachDir = new File(parent, "detach");
@@ -376,7 +377,7 @@ public class FSDataset implements FSCons
// should not be deleted.
blocksBeingWritten = new File(parent, "blocksBeingWritten");
if (blocksBeingWritten.exists()) {
- if (supportAppends) {
+ if (durableSync) {
recoverBlocksBeingWritten(blocksBeingWritten);
} else {
FileUtil.fullyDelete(blocksBeingWritten);
@@ -573,7 +574,7 @@ public class FSDataset implements FSCons
// add this block to block set
blockSet.add(block);
if (DataNode.LOG.isDebugEnabled()) {
- DataNode.LOG.debug("recoverBlocksBeingWritten for block " + block);
+ DataNode.LOG.debug("recoverBlocksBeingWritten for " + block);
}
}
}
@@ -774,7 +775,7 @@ public class FSDataset implements FSCons
volumes = fsvs; // replace array of volumes
}
Log.info("Completed FSVolumeSet.checkDirs. Removed=" + removed_size +
- "volumes. List of current volumes: " + toString());
+ " volumes. Current volumes: " + toString());
return removed_vols;
}
@@ -1077,7 +1078,7 @@ public class FSDataset implements FSCons
/**
* Get File name for a given block.
*/
- public synchronized File getBlockFile(Block b) throws IOException {
+ public File getBlockFile(Block b) throws IOException {
File f = validateBlockFile(b);
if(f == null) {
if (InterDatanodeProtocol.LOG.isDebugEnabled()) {
@@ -1098,27 +1099,60 @@ public class FSDataset implements FSCons
return info;
}
- public synchronized InputStream getBlockInputStream(Block b) throws IOException {
- if (isNativeIOAvailable) {
- return NativeIO.getShareDeleteFileInputStream(getBlockFile(b));
- } else {
- return new FileInputStream(getBlockFile(b));
+ public InputStream getBlockInputStream(Block b) throws IOException {
+ File f = getBlockFileNoExistsCheck(b);
+ try {
+ if (isNativeIOAvailable) {
+ return NativeIO.getShareDeleteFileInputStream(f);
+ } else {
+ return new FileInputStream(f);
+ }
+ } catch (FileNotFoundException fnfe) {
+ throw new IOException("Block " + b + " is not valid. "
+ + "Expected block file at " + f + " does not exist.");
}
}
- public synchronized InputStream getBlockInputStream(Block b, long seekOffset) throws IOException {
- File blockFile = getBlockFile(b);
- if (isNativeIOAvailable) {
- return NativeIO.getShareDeleteFileInputStream(blockFile, seekOffset);
- } else {
- RandomAccessFile blockInFile = new RandomAccessFile(blockFile, "r");
- if (seekOffset > 0) {
- blockInFile.seek(seekOffset);
+ /**
+ * Return the File associated with a block, without first checking that it
+ * exists. This should be used when the next operation is going to open the
+ * file for read anyway, and thus the exists check is redundant.
+ */
+ private File getBlockFileNoExistsCheck(Block b) throws IOException {
+ File f = getFile(b);
+ if (f == null) {
+ throw new IOException("Block " + b + " is not valid");
+ }
+ return f;
+ }
+
+ public InputStream getBlockInputStream(Block b, long seekOffset)
+ throws IOException {
+ File blockFile = getBlockFileNoExistsCheck(b);
+ try {
+ if (isNativeIOAvailable) {
+ return NativeIO.getShareDeleteFileInputStream(blockFile, seekOffset);
+ } else {
+ RandomAccessFile blockInFile;
+ try {
+ blockInFile = new RandomAccessFile(blockFile, "r");
+ } catch (FileNotFoundException fnfe) {
+ throw new IOException("Block " + b + " is not valid. "
+ + "Expected block file at " + blockFile + " does not exist.");
+ }
+
+ if (seekOffset > 0) {
+ blockInFile.seek(seekOffset);
+ }
+ return new FileInputStream(blockInFile.getFD());
}
- return new FileInputStream(blockInFile.getFD());
+ } catch (FileNotFoundException fnfe) {
+ throw new IOException("Block " + b + " is not valid. "
+ + "Expected block file at " + blockFile + " does not exist.");
}
}
+
/**
* Returns handles to the block file and its metadata file
*/
@@ -1467,7 +1501,7 @@ public class FSDataset implements FSCons
volumeMap.put(b, new DatanodeBlockInfo(v, f));
} else {
// reopening block for appending to it.
- DataNode.LOG.info("Reopen Block for append " + b);
+ DataNode.LOG.info("Reopen for append " + b);
v = volumeMap.get(b).getVolume();
f = createTmpFile(v, b, replicationRequest);
File blkfile = getBlockFile(b);
@@ -1486,19 +1520,18 @@ public class FSDataset implements FSCons
DataNode.LOG.debug("Renaming " + blkfile + " to " + f);
if (!blkfile.renameTo(f)) {
if (!f.delete()) {
- throw new IOException("Block " + b + " reopen failed. " +
+ throw new IOException(b + " reopen failed. " +
" Unable to remove file " + f);
}
if (!blkfile.renameTo(f)) {
- throw new IOException("Block " + b + " reopen failed. " +
+ throw new IOException(b + " reopen failed. " +
" Unable to move block file " + blkfile +
" to tmp dir " + f);
}
}
}
if (f == null) {
- DataNode.LOG.warn("Block " + b + " reopen failed " +
- " Unable to locate tmp file.");
+ DataNode.LOG.warn(b + " reopen failed. Unable to locate tmp file");
throw new IOException("Block " + b + " reopen failed " +
" Unable to locate tmp file.");
}
@@ -1739,9 +1772,10 @@ public class FSDataset implements FSCons
long st = System.currentTimeMillis();
// broken out to a static method to simplify testing
reconcileRoughBlockScan(seenOnDisk, volumeMap, ongoingCreates);
- DataNode.LOG.info(
- "Reconciled asynchronous block report against current state in " +
- (System.currentTimeMillis() - st) + " ms");
+ if (DataNode.LOG.isDebugEnabled()) {
+ DataNode.LOG.debug("Reconciled block report with current state in "
+ + (System.currentTimeMillis() - st) + "ms");
+ }
blockReport = seenOnDisk.keySet();
}
@@ -2245,7 +2279,6 @@ public class FSDataset implements FSCons
waitForReportRequest();
assert requested && scan == null;
- DataNode.LOG.info("Starting asynchronous block report scan");
long st = System.currentTimeMillis();
HashMap<Block, File> result = fsd.roughBlockScan();
DataNode.LOG.info("Finished asynchronous block report scan in "
Modified: hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDatasetAsyncDiskService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDatasetAsyncDiskService.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDatasetAsyncDiskService.java (original)
+++ hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDatasetAsyncDiskService.java Fri Jun 21 06:37:27 2013
@@ -130,7 +130,7 @@ class FSDatasetAsyncDiskService {
// clear the executor map so that calling execute again will fail.
executors = null;
- LOG.info("All async disk service threads have been shut down.");
+ LOG.info("All async disk service threads have been shut down");
}
}
@@ -140,7 +140,7 @@ class FSDatasetAsyncDiskService {
*/
void deleteAsync(FSDataset.FSVolume volume, File blockFile,
File metaFile, long dfsBytes, String blockName) {
- DataNode.LOG.info("Scheduling block " + blockName + " file " + blockFile
+ DataNode.LOG.info("Scheduling " + blockName + " file " + blockFile
+ " for deletion");
ReplicaFileDeleteTask deletionTask =
new ReplicaFileDeleteTask(volume, blockFile, metaFile, dfsBytes,
@@ -175,18 +175,18 @@ class FSDatasetAsyncDiskService {
@Override
public String toString() {
// Called in AsyncDiskService.execute for displaying error messages.
- return "deletion of block " + blockName + " with block file " + blockFile
+ return "deletion of " + blockName + " with file " + blockFile
+ " and meta file " + metaFile + " from volume " + volume;
}
@Override
public void run() {
if ( !blockFile.delete() || ( !metaFile.delete() && metaFile.exists() ) ) {
- DataNode.LOG.warn("Unexpected error trying to delete block "
+ DataNode.LOG.warn("Unexpected error trying to delete "
+ blockName + " at file " + blockFile + ". Ignored.");
} else {
volume.decDfsUsed(dfsBytes);
- DataNode.LOG.info("Deleted block " + blockName + " at file " + blockFile);
+ DataNode.LOG.info("Deleted " + blockName + " at file " + blockFile);
}
}
};
Modified: hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/datanode/UpgradeManagerDatanode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/datanode/UpgradeManagerDatanode.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/datanode/UpgradeManagerDatanode.java (original)
+++ hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/datanode/UpgradeManagerDatanode.java Fri Jun 21 06:37:27 2013
@@ -54,7 +54,7 @@ class UpgradeManagerDatanode extends Upg
DataNode.LOG.info("\n Distributed upgrade for DataNode "
+ dataNode.dnRegistration.getName()
+ " version " + getUpgradeVersion() + " to current LV "
- + FSConstants.LAYOUT_VERSION + " is initialized.");
+ + FSConstants.LAYOUT_VERSION + " is initialized");
UpgradeObjectDatanode curUO = (UpgradeObjectDatanode)currentUpgrades.first();
curUO.setDatanode(dataNode);
upgradeState = curUO.preUpgradeAction(nsInfo);
@@ -99,7 +99,7 @@ class UpgradeManagerDatanode extends Upg
DataNode.LOG.info("\n Distributed upgrade for DataNode version "
+ getUpgradeVersion() + " to current LV "
+ FSConstants.LAYOUT_VERSION + " cannot be started. "
- + "The upgrade object is not defined.");
+ + "The upgrade object is not defined");
return false;
}
upgradeState = true;
@@ -111,7 +111,7 @@ class UpgradeManagerDatanode extends Upg
DataNode.LOG.info("\n Distributed upgrade for DataNode "
+ dataNode.dnRegistration.getName()
+ " version " + getUpgradeVersion() + " to current LV "
- + FSConstants.LAYOUT_VERSION + " is started.");
+ + FSConstants.LAYOUT_VERSION + " is started");
return true;
}
@@ -141,7 +141,7 @@ class UpgradeManagerDatanode extends Upg
DataNode.LOG.info("\n Distributed upgrade for DataNode "
+ dataNode.dnRegistration.getName()
+ " version " + getUpgradeVersion() + " to current LV "
- + FSConstants.LAYOUT_VERSION + " is complete.");
+ + FSConstants.LAYOUT_VERSION + " is complete");
}
synchronized void shutdownUpgrade() {
Modified: hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/datanode/UpgradeObjectDatanode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/datanode/UpgradeObjectDatanode.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/datanode/UpgradeObjectDatanode.java (original)
+++ hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/datanode/UpgradeObjectDatanode.java Fri Jun 21 06:37:27 2013
@@ -109,7 +109,7 @@ public abstract class UpgradeObjectDatan
if(getUpgradeStatus() < 100) {
DataNode.LOG.info("\n Distributed upgrade for DataNode version "
+ getVersion() + " to current LV "
- + FSConstants.LAYOUT_VERSION + " cannot be completed.");
+ + FSConstants.LAYOUT_VERSION + " cannot be completed");
}
// Complete the upgrade by calling the manager method
Modified: hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java (original)
+++ hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java Fri Jun 21 06:37:27 2013
@@ -19,7 +19,6 @@ package org.apache.hadoop.hdfs.server.da
import java.io.IOException;
import java.io.InputStream;
-import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URISyntaxException;
@@ -39,7 +38,6 @@ import javax.ws.rs.QueryParam;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
-import javax.ws.rs.core.StreamingOutput;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -341,31 +339,10 @@ public class DatanodeWebHdfsMethods {
IOUtils.cleanup(LOG, dfsclient);
throw ioe;
}
- final DFSDataInputStream dis = in;
- final StreamingOutput streaming = new StreamingOutput() {
- @Override
- public void write(final OutputStream out) throws IOException {
- final Long n = length.getValue();
- DFSDataInputStream dfsin = dis;
- DFSClient client = dfsclient;
- try {
- if (n == null) {
- IOUtils.copyBytes(dfsin, out, b);
- } else {
- IOUtils.copyBytes(dfsin, out, n, b, false);
- }
- dfsin.close();
- dfsin = null;
- client.close();
- client = null;
- } finally {
- IOUtils.cleanup(LOG, dfsin);
- IOUtils.cleanup(LOG, client);
- }
- }
- };
-
- return Response.ok(streaming).type(
+
+ final long n = length.getValue() != null? length.getValue()
+ : in.getVisibleLength();
+ return Response.ok(new OpenEntity(in, n, dfsclient)).type(
MediaType.APPLICATION_OCTET_STREAM).build();
}
case GETFILECHECKSUM:
Added: hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/datanode/web/resources/OpenEntity.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/datanode/web/resources/OpenEntity.java?rev=1495297&view=auto
==============================================================================
--- hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/datanode/web/resources/OpenEntity.java (added)
+++ hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/datanode/web/resources/OpenEntity.java Fri Jun 21 06:37:27 2013
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode.web.resources;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.lang.annotation.Annotation;
+import java.lang.reflect.Type;
+
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.MultivaluedMap;
+import javax.ws.rs.ext.MessageBodyWriter;
+import javax.ws.rs.ext.Provider;
+
+import org.apache.hadoop.hdfs.DFSClient;
+import org.apache.hadoop.hdfs.DFSClient.DFSDataInputStream;
+import org.apache.hadoop.io.IOUtils;
+
+/**
+ * A response entity for a DFSDataInputStream.
+ */
+public class OpenEntity {
+ private final DFSDataInputStream in;
+ private final long length;
+ private final DFSClient dfsclient;
+
+ OpenEntity(final DFSDataInputStream in, final long length,
+ final DFSClient dfsclient) {
+ this.in = in;
+ this.length = length;
+ this.dfsclient = dfsclient;
+ }
+
+ /**
+ * A {@link MessageBodyWriter} for {@link OpenEntity}.
+ */
+ @Provider
+ public static class Writer implements MessageBodyWriter<OpenEntity> {
+
+ @Override
+ public boolean isWriteable(Class<?> clazz, Type genericType,
+ Annotation[] annotations, MediaType mediaType) {
+ return clazz == OpenEntity.class
+ && MediaType.APPLICATION_OCTET_STREAM_TYPE.isCompatible(mediaType);
+ }
+
+ @Override
+ public long getSize(OpenEntity e, Class<?> type, Type genericType,
+ Annotation[] annotations, MediaType mediaType) {
+ return e.length;
+ }
+
+ @Override
+ public void writeTo(OpenEntity e, Class<?> type, Type genericType,
+ Annotation[] annotations, MediaType mediaType,
+ MultivaluedMap<String, Object> httpHeaders, OutputStream out
+ ) throws IOException {
+ try {
+ IOUtils.copyBytes(e.in, out, e.length, 4096, false);
+ } finally {
+ IOUtils.cleanup(DatanodeWebHdfsMethods.LOG, e.in);
+ IOUtils.cleanup(DatanodeWebHdfsMethods.LOG, e.dfsclient);
+ }
+ }
+ }
+}
\ No newline at end of file
Modified: hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/BlocksMap.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/BlocksMap.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/BlocksMap.java (original)
+++ hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/BlocksMap.java Fri Jun 21 06:37:27 2013
@@ -58,6 +58,10 @@ class BlocksMap {
INodeFile getINode() {
return inode;
}
+
+ void setINode(INodeFile inode) {
+ this.inode = inode;
+ }
DatanodeDescriptor getDatanode(int index) {
assert this.triplets != null : "BlockInfo is not initialized";
@@ -311,39 +315,13 @@ class BlocksMap {
private GSet<Block, BlockInfo> blocks;
BlocksMap(int initialCapacity, float loadFactor) {
- this.capacity = computeCapacity();
+ // Use 2% of total memory to size the GSet capacity
+ this.capacity = LightWeightGSet.computeCapacity(2.0, "BlocksMap");
this.blocks = new LightWeightGSet<Block, BlockInfo>(capacity);
}
- /**
- * Let t = 2% of max memory.
- * Let e = round(log_2 t).
- * Then, we choose capacity = 2^e/(size of reference),
- * unless it is outside the close interval [1, 2^30].
- */
- private static int computeCapacity() {
- //VM detection
- //See http://java.sun.com/docs/hotspot/HotSpotFAQ.html#64bit_detection
- final String vmBit = System.getProperty("sun.arch.data.model");
-
- //2% of max memory
- final double twoPC = Runtime.getRuntime().maxMemory()/50.0;
-
- //compute capacity
- final int e1 = (int)(Math.log(twoPC)/Math.log(2.0) + 0.5);
- final int e2 = e1 - ("32".equals(vmBit)? 2: 3);
- final int exponent = e2 < 0? 0: e2 > 30? 30: e2;
- final int c = 1 << exponent;
-
- LightWeightGSet.LOG.info("VM type = " + vmBit + "-bit");
- LightWeightGSet.LOG.info("2% max memory = " + twoPC/(1 << 20) + " MB");
- LightWeightGSet.LOG.info("capacity = 2^" + exponent
- + " = " + c + " entries");
- return c;
- }
-
void close() {
- blocks = null;
+ blocks.clear();
}
/**
Modified: hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSClusterStats.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSClusterStats.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSClusterStats.java (original)
+++ hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSClusterStats.java Fri Jun 21 06:37:27 2013
@@ -29,7 +29,15 @@ public interface FSClusterStats {
* @return a count of the total number of block transfers and block
* writes that are currently occuring on the cluster.
*/
+ public int getTotalLoad();
- public int getTotalLoad() ;
+ /**
+ * Indicate whether or not the cluster is now avoiding to use stale DataNodes
+ * for writing.
+ *
+ * @return True if the cluster is currently avoiding using stale DataNodes for
+ * writing targets, and false otherwise.
+ */
+ public boolean shouldAvoidStaleDataNodesForWrite();
}
-
\ No newline at end of file
+
Modified: hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java (original)
+++ hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java Fri Jun 21 06:37:27 2013
@@ -176,16 +176,13 @@ class FSDirectory implements FSConstants
newNode = addNode(path, newNode, -1, false);
}
if (newNode == null) {
- NameNode.stateChangeLog.info("DIR* FSDirectory.addFile: "
- +"failed to add "+path
- +" to the file system");
+ NameNode.stateChangeLog.info("DIR* addFile: " + "failed to add " + path);
return null;
}
// add create file record to log, record new generation stamp
fsImage.getEditLog().logOpenFile(path, newNode);
- NameNode.stateChangeLog.debug("DIR* FSDirectory.addFile: "
- +path+" is added to the file system");
+ NameNode.stateChangeLog.debug("DIR* addFile: " + path + " is added");
return newNode;
}
@@ -291,7 +288,7 @@ class FSDirectory implements FSConstants
NameNode.stateChangeLog.debug("DIR* FSDirectory.addFile: "
+ path + " with " + block
- + " block is added to the in-memory "
+ + " is added to the in-memory "
+ "file system");
}
return block;
@@ -308,7 +305,7 @@ class FSDirectory implements FSConstants
fsImage.getEditLog().logOpenFile(path, file);
NameNode.stateChangeLog.debug("DIR* FSDirectory.persistBlocks: "
+path+" with "+ file.getBlocks().length
- +" blocks is persisted to the file system");
+ +" blocks is persisted");
}
}
@@ -323,7 +320,7 @@ class FSDirectory implements FSConstants
if (NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("DIR* FSDirectory.closeFile: "
+path+" with "+ file.getBlocks().length
- +" blocks is persisted to the file system");
+ +" blocks is persisted");
}
}
}
@@ -345,8 +342,7 @@ class FSDirectory implements FSConstants
// write modified block locations to log
fsImage.getEditLog().logOpenFile(path, fileNode);
NameNode.stateChangeLog.debug("DIR* FSDirectory.addFile: "
- +path+" with "+block
- +" block is added to the file system");
+ + path + " with "+ block +" is added to the");
// update space consumed
INode[] pathINodes = getExistingPathINodes(path);
updateCount(pathINodes, pathINodes.length-1, 0,
@@ -589,21 +585,90 @@ class FSDirectory implements FSConstants
}
}
}
+
+ /**
+ * Concat - see {@link #unprotectedConcat(String, String[], long)}
+ */
+ public void concat(String target, String [] srcs) throws IOException{
+ synchronized(rootDir) {
+ // actual move
+ waitForReady();
+ long timestamp = FSNamesystem.now();
+ unprotectedConcat(target, srcs, timestamp);
+ // do the commit
+ fsImage.getEditLog().logConcat(target, srcs, timestamp);
+ }
+ }
+
+
+
+ /**
+ * Moves all the blocks from {@code srcs} in the order of {@code srcs} array
+ * and appends them to {@code target}.
+ * The {@code srcs} files are deleted.
+ * @param target file to move the blocks to
+ * @param srcs list of file to move the blocks from
+ * Must be public because also called from EditLogs
+ * NOTE: - it does not update quota since concat is restricted to same dir.
+ */
+ public void unprotectedConcat(String target, String [] srcs, long timestamp) {
+ if (NameNode.stateChangeLog.isDebugEnabled()) {
+ NameNode.stateChangeLog.debug("DIR* FSNamesystem.concat to "+target);
+ }
+ // do the move
+ final INode[] trgINodes = getExistingPathINodes(target);
+ INodeFile trgInode = (INodeFile) trgINodes[trgINodes.length-1];
+ INodeDirectory trgParent = (INodeDirectory)trgINodes[trgINodes.length-2];
+
+ INodeFile [] allSrcInodes = new INodeFile[srcs.length];
+ int i = 0;
+ int totalBlocks = 0;
+ for(String src : srcs) {
+ INodeFile srcInode = getFileINode(src);
+ allSrcInodes[i++] = srcInode;
+ totalBlocks += srcInode.blocks.length;
+ }
+ trgInode.appendBlocks(allSrcInodes, totalBlocks); // copy the blocks
+
+ // since we are in the same dir - we can use same parent to remove files
+ int count = 0;
+ for(INodeFile nodeToRemove: allSrcInodes) {
+ if(nodeToRemove == null) continue;
+
+ nodeToRemove.blocks = null;
+ trgParent.removeChild(nodeToRemove);
+ count++;
+ }
+
+ trgInode.setModificationTime(timestamp);
+ trgParent.setModificationTime(timestamp);
+ // update quota on the parent directory ('count' files removed, 0 space)
+ unprotectedUpdateCount(trgINodes, trgINodes.length-1, -count, 0);
+ }
+
/**
- * Remove the file from management, return blocks
+ * Delete the target directory and collect the blocks under it
+ *
+ * @param src
+ * Path of a directory to delete
+ * @param collectedBlocks
+ * Blocks under the deleted directory
+ * @return true on successful deletion; else false
*/
- boolean delete(String src) {
+ boolean delete(String src, List<Block>collectedBlocks) {
if (NameNode.stateChangeLog.isDebugEnabled()) {
- NameNode.stateChangeLog.debug("DIR* FSDirectory.delete: "+src);
+ NameNode.stateChangeLog.debug("DIR* FSDirectory.delete: " + src);
}
waitForReady();
long now = FSNamesystem.now();
- int filesRemoved = unprotectedDelete(src, now);
+ int filesRemoved = unprotectedDelete(src, collectedBlocks, now);
if (filesRemoved <= 0) {
return false;
}
incrDeletedFileCount(filesRemoved);
+ // Blocks will be deleted later by the caller of this method
+ FSNamesystem.getFSNamesystem().removePathAndBlocks(src, null);
fsImage.getEditLog().logDelete(src, now);
return true;
}
@@ -625,14 +690,36 @@ class FSDirectory implements FSConstants
}
/**
- * Delete a path from the name space
- * Update the count at each ancestor directory with quota
- * @param src a string representation of a path to an inode
- * @param modificationTime the time the inode is removed
- * @param deletedBlocks the place holder for the blocks to be removed
+ * Delete a path from the name space Update the count at each ancestor
+ * directory with quota
+ *
+ * @param src
+ * a string representation of a path to an inode
+ *
+ * @param mTime
+ * the time the inode is removed
+ */
+ void unprotectedDelete(String src, long mTime) {
+ List<Block> collectedBlocks = new ArrayList<Block>();
+ int filesRemoved = unprotectedDelete(src, collectedBlocks, mTime);
+ if (filesRemoved > 0) {
+ namesystem.removePathAndBlocks(src, collectedBlocks);
+ }
+ }
+
+ /**
+ * Delete a path from the name space Update the count at each ancestor
+ * directory with quota
+ *
+ * @param src
+ * a string representation of a path to an inode
+ * @param collectedBlocks
+ * blocks collected from the deleted path
+ * @param mtime
+ * the time the inode is removed
* @return the number of inodes deleted; 0 if no inodes are deleted.
- */
- int unprotectedDelete(String src, long modificationTime) {
+ */
+ int unprotectedDelete(String src, List<Block> collectedBlocks, long mtime) {
src = normalizePath(src);
synchronized (rootDir) {
@@ -643,32 +730,27 @@ class FSDirectory implements FSConstants
NameNode.stateChangeLog.debug("DIR* FSDirectory.unprotectedDelete: "
+"failed to remove "+src+" because it does not exist");
return 0;
- } else if (inodes.length == 1) { // src is the root
+ }
+ if (inodes.length == 1) { // src is the root
NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedDelete: " +
"failed to remove " + src +
" because the root is not allowed to be deleted");
return 0;
- } else {
- try {
- // Remove the node from the namespace
- removeChild(inodes, inodes.length-1);
- // set the parent's modification time
- inodes[inodes.length-2].setModificationTime(modificationTime);
- // GC all the blocks underneath the node.
- ArrayList<Block> v = new ArrayList<Block>();
- int filesRemoved = targetNode.collectSubtreeBlocksAndClear(v);
- namesystem.removePathAndBlocks(src, v);
- if (NameNode.stateChangeLog.isDebugEnabled()) {
- NameNode.stateChangeLog.debug("DIR* FSDirectory.unprotectedDelete: "
- +src+" is removed");
- }
- return filesRemoved;
- } catch (IOException e) {
- NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedDelete: " +
- "failed to remove " + src + " because " + e.getMessage());
- return 0;
- }
+ }
+ int pos = inodes.length - 1;
+ targetNode = removeChild(inodes, pos);
+ if (targetNode == null) {
+ return 0;
+ }
+ // set the parent's modification time
+ inodes[pos - 1].setModificationTime(mtime);
+ int filesRemoved = targetNode
+ .collectSubtreeBlocksAndClear(collectedBlocks);
+ if (NameNode.stateChangeLog.isDebugEnabled()) {
+ NameNode.stateChangeLog.debug("DIR* FSDirectory.unprotectedDelete: "
+ + src + " is removed");
}
+ return filesRemoved;
}
}
@@ -892,6 +974,25 @@ class FSDirectory implements FSConstants
}
}
+ /**
+ * updates quota without verification
+ * callers responsibility is to make sure quota is not exceeded
+ * @param inodes
+ * @param numOfINodes
+ * @param nsDelta
+ * @param dsDelta
+ */
+ void unprotectedUpdateCount(INode[] inodes, int numOfINodes,
+ long nsDelta, long dsDelta) {
+ for(int i=0; i < numOfINodes; i++) {
+ if (inodes[i].isQuotaSet()) { // a directory with quota
+ INodeDirectoryWithQuota node =(INodeDirectoryWithQuota)inodes[i];
+ node.addSpaceConsumed(nsDelta, dsDelta);
+ }
+ }
+ }
+
+
/** Return the name of the path represented by inodes at [0, pos] */
private static String getFullPathName(INode[] inodes, int pos) {
StringBuilder fullPathName = new StringBuilder();
@@ -1399,4 +1500,8 @@ class FSDirectory implements FSConstants
inode.setLocalName(name.getBytes());
}
}
+
+ void shutdown() {
+ nameCache.reset();
+ }
}
Modified: hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java (original)
+++ hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java Fri Jun 21 06:37:27 2013
@@ -86,6 +86,7 @@ public class FSEditLog {
private static final byte OP_CLEAR_NS_QUOTA = 12; // clear namespace quota
private static final byte OP_TIMES = 13; // sets mod & access time on a file
private static final byte OP_SET_QUOTA = 14; // sets name and disk quotas.
+ private static final byte OP_CONCAT_DELETE = 16; // concat files.
private static final byte OP_GET_DELEGATION_TOKEN = 18; //new delegation token
private static final byte OP_RENEW_DELEGATION_TOKEN = 19; //renew delegation token
private static final byte OP_CANCEL_DELEGATION_TOKEN = 20; //cancel delegation token
@@ -650,8 +651,8 @@ public class FSEditLog {
* This is where we apply edits that we've been writing to disk all
* along.
*/
- static int loadFSEdits(EditLogInputStream edits, int tolerationLength
- ) throws IOException {
+ static int loadFSEdits(EditLogInputStream edits, int tolerationLength,
+ MetaRecoveryContext recovery) throws IOException {
FSNamesystem fsNamesys = FSNamesystem.getFSNamesystem();
FSDirectory fsDir = fsNamesys.dir;
int numEdits = 0;
@@ -664,10 +665,13 @@ public class FSEditLog {
numOpSetPerm = 0, numOpSetOwner = 0, numOpSetGenStamp = 0,
numOpTimes = 0, numOpGetDelegationToken = 0,
numOpRenewDelegationToken = 0, numOpCancelDelegationToken = 0,
- numOpUpdateMasterKey = 0, numOpOther = 0;
-
+ numOpUpdateMasterKey = 0, numOpOther = 0,
+ numOpConcatDelete = 0;
+ long highestGenStamp = -1;
long startTime = FSNamesystem.now();
+ LOG.info("Start loading edits file " + edits.getName());
+ //
// Keep track of the file offsets of the last several opcodes.
// This is handy when manually recovering corrupted edits files.
PositionTrackingInputStream tracker =
@@ -840,7 +844,28 @@ public class FSEditLog {
short replication = adjustReplication(readShort(in));
fsDir.unprotectedSetReplication(path, replication, null);
break;
- }
+ }
+ case OP_CONCAT_DELETE: {
+ if (logVersion > -22) {
+ throw new IOException("Unexpected opcode " + opcode
+ + " for version " + logVersion);
+ }
+ numOpConcatDelete++;
+ int length = in.readInt();
+ if (length < 3) { // trg, srcs.., timestam
+ throw new IOException("Incorrect data format. "
+ + "ConcatDelete operation.");
+ }
+ String trg = FSImage.readString(in);
+ int srcSize = length - 1 - 1; //trg and timestamp
+ String [] srcs = new String [srcSize];
+ for(int i=0; i<srcSize;i++) {
+ srcs[i]= FSImage.readString(in);
+ }
+ timestamp = readLong(in);
+ fsDir.unprotectedConcat(trg, srcs, timestamp);
+ break;
+ }
case OP_RENAME: {
numOpRename++;
int length = in.readInt();
@@ -896,6 +921,11 @@ public class FSEditLog {
case OP_SET_GENSTAMP: {
numOpSetGenStamp++;
long lw = in.readLong();
+ if ((highestGenStamp != -1) && (highestGenStamp + 1 != lw)) {
+ throw new IOException("OP_SET_GENSTAMP tried to set a genstamp of " + lw +
+ " but the previous highest genstamp was " + highestGenStamp);
+ }
+ highestGenStamp = lw;
fsDir.namesystem.setGenerationStamp(lw);
break;
}
@@ -1068,7 +1098,7 @@ public class FSEditLog {
in.reset(); //reset to the beginning position of this transaction
} else {
//edit log toleration feature is disabled
- throw new IOException(msg, t);
+ MetaRecoveryContext.editLogLoaderPrompt(msg, recovery);
}
} finally {
try {
@@ -1093,6 +1123,7 @@ public class FSEditLog {
+ " numOpRenewDelegationToken = " + numOpRenewDelegationToken
+ " numOpCancelDelegationToken = " + numOpCancelDelegationToken
+ " numOpUpdateMasterKey = " + numOpUpdateMasterKey
+ + " numOpConcatDelete = " + numOpConcatDelete
+ " numOpOther = " + numOpOther);
}
@@ -1378,6 +1409,21 @@ public class FSEditLog {
logEdit(OP_SET_OWNER, new UTF8(src), u, g);
}
+ /**
+ * concat(trg,src..) log
+ */
+ void logConcat(String trg, String [] srcs, long timestamp) {
+ int size = 1 + srcs.length + 1; // trg, srcs, timestamp
+ UTF8 info[] = new UTF8[size];
+ int idx = 0;
+ info[idx++] = new UTF8(trg);
+ for(int i=0; i<srcs.length; i++) {
+ info[idx++] = new UTF8(srcs[i]);
+ }
+ info[idx] = FSEditLog.toLogLong(timestamp);
+ logEdit(OP_CONCAT_DELETE, new ArrayWritable(UTF8.class, info));
+ }
+
/**
* Add delete file record to edit log
*/
@@ -1538,7 +1584,8 @@ public class FSEditLog {
// file exists.
//
getEditFile(sd).delete();
- if (!getEditNewFile(sd).renameTo(getEditFile(sd))) {
+ if (!getEditNewFile(sd).renameTo(getEditFile(sd))) {
+ sd.unlock();
removeEditsForStorageDir(sd);
fsimage.updateRemovedDirs(sd);
it.remove();
Modified: hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSImage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSImage.java?rev=1495297&r1=1495296&r2=1495297&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSImage.java (original)
+++ hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSImage.java Fri Jun 21 06:37:27 2013
@@ -141,7 +141,7 @@ public class FSImage extends Storage {
/** Flag to restore removed storage directories at checkpointing */
private boolean restoreRemovedDirs = DFSConfigKeys.DFS_NAMENODE_NAME_DIR_RESTORE_DEFAULT;
-
+
private int editsTolerationLength = DFSConfigKeys.DFS_NAMENODE_EDITS_TOLERATION_LENGTH_DEFAULT;
/**
@@ -375,14 +375,15 @@ public class FSImage extends Storage {
case REGULAR:
// just load the image
}
- return loadFSImage();
+ return loadFSImage(startOpt.createRecoveryContext());
}
private void doUpgrade() throws IOException {
+ MetaRecoveryContext recovery = null;
if(getDistributedUpgradeState()) {
// only distributed upgrade need to continue
// don't do version upgrade
- this.loadFSImage();
+ this.loadFSImage(recovery);
initializeDistributedUpgrade();
return;
}
@@ -398,7 +399,7 @@ public class FSImage extends Storage {
}
// load the latest image
- this.loadFSImage();
+ this.loadFSImage(recovery);
// Do upgrade for each directory
long oldCTime = this.getCTime();
@@ -742,7 +743,7 @@ public class FSImage extends Storage {
* @return whether the image should be saved
* @throws IOException
*/
- boolean loadFSImage() throws IOException {
+ boolean loadFSImage(MetaRecoveryContext recovery) throws IOException {
// Now check all curFiles and see which is the newest
long latestNameCheckpointTime = Long.MIN_VALUE;
long latestEditsCheckpointTime = Long.MIN_VALUE;
@@ -822,22 +823,27 @@ public class FSImage extends Storage {
needToSave |= recoverInterruptedCheckpoint(latestNameSD, latestEditsSD);
long startTime = FSNamesystem.now();
- long imageSize = getImageFile(latestNameSD, NameNodeFile.IMAGE).length();
+ File imageFile = getImageFile(latestNameSD, NameNodeFile.IMAGE);
+ long imageSize = imageFile.length();
//
// Load in bits
//
latestNameSD.read();
- needToSave |= loadFSImage(getImageFile(latestNameSD, NameNodeFile.IMAGE));
- LOG.info("Image file of size " + imageSize + " loaded in "
+ LOG.info("Start loading image file " + imageFile.getPath().toString());
+ needToSave |= loadFSImage(imageFile);
+ LOG.info("Image file " + imageFile.getPath().toString() +
+ " of size " + imageSize + " bytes loaded in "
+ (FSNamesystem.now() - startTime)/1000 + " seconds.");
// Load latest edits
- if (latestNameCheckpointTime > latestEditsCheckpointTime)
+ if (latestNameCheckpointTime > latestEditsCheckpointTime) {
// the image is already current, discard edits
needToSave |= true;
- else // latestNameCheckpointTime == latestEditsCheckpointTime
- needToSave |= (loadFSEdits(latestEditsSD) > 0);
+ FSNamesystem.getFSNamesystem().dir.updateCountForINodeWithQuota();
+ } else { // latestNameCheckpointTime == latestEditsCheckpointTime
+ needToSave |= (loadFSEdits(latestEditsSD, recovery) > 0);
+ }
return needToSave;
}
@@ -1015,16 +1021,17 @@ public class FSImage extends Storage {
* @return number of edits loaded
* @throws IOException
*/
- int loadFSEdits(StorageDirectory sd) throws IOException {
+ int loadFSEdits(StorageDirectory sd, MetaRecoveryContext recovery)
+ throws IOException {
int numEdits = 0;
EditLogFileInputStream edits =
new EditLogFileInputStream(getImageFile(sd, NameNodeFile.EDITS));
- numEdits = FSEditLog.loadFSEdits(edits, editsTolerationLength);
+ numEdits = FSEditLog.loadFSEdits(edits, editsTolerationLength, recovery);
edits.close();
File editsNew = getImageFile(sd, NameNodeFile.EDITS_NEW);
if (editsNew.exists() && editsNew.length() > 0) {
edits = new EditLogFileInputStream(editsNew);
- numEdits += FSEditLog.loadFSEdits(edits, editsTolerationLength);
+ numEdits += FSEditLog.loadFSEdits(edits, editsTolerationLength, recovery);
edits.close();
}
// update the counts.
@@ -1063,8 +1070,9 @@ public class FSImage extends Storage {
out.close();
}
- LOG.info("Image file of size " + newFile.length() + " saved in "
- + (FSNamesystem.now() - startTime)/1000 + " seconds.");
+ LOG.info("Image file " + newFile + " of size " + newFile.length() +
+ " bytes saved in " + (FSNamesystem.now() - startTime)/1000 +
+ " seconds.");
}
/**
@@ -1237,7 +1245,7 @@ public class FSImage extends Storage {
FSEditLog.LOG.info(DFSConfigKeys.DFS_NAMENODE_EDITS_TOLERATION_LENGTH_KEY
+ " = " + editsTolerationLength);
}
-
+
/** restore a metadata file */
private static void restoreFile(File src, File dstdir, String dstfile)
throws IOException {
@@ -1816,9 +1824,9 @@ public class FSImage extends Storage {
}
static private final UTF8 U_STR = new UTF8();
- static String readString(DataInputStream in) throws IOException {
+ public static String readString(DataInputStream in) throws IOException {
U_STR.readFields(in);
- return U_STR.toString();
+ return U_STR.toStringChecked();
}
static String readString_EmptyAsNull(DataInputStream in) throws IOException {
@@ -1826,7 +1834,7 @@ public class FSImage extends Storage {
return s.isEmpty()? null: s;
}
- static byte[] readBytes(DataInputStream in) throws IOException {
+ public static byte[] readBytes(DataInputStream in) throws IOException {
U_STR.readFields(in);
int len = U_STR.getLength();
byte[] bytes = new byte[len];