You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by ar...@apache.org on 2013/12/02 18:41:48 UTC
svn commit: r1547122 [2/5] - in
/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project:
hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/
hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/ hadoop-hdfs/
hadoop-hdfs/dev-support/ h...
Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java?rev=1547122&r1=1547121&r2=1547122&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java Mon Dec 2 17:41:44 2013
@@ -46,6 +46,7 @@ import org.apache.hadoop.hdfs.server.nam
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
+import org.apache.hadoop.hdfs.web.URLConnectionFactory;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.util.StringUtils;
@@ -87,6 +88,7 @@ public class QuorumJournalManager implem
private final AsyncLoggerSet loggers;
private int outputBufferCapacity = 512 * 1024;
+ private final URLConnectionFactory connectionFactory;
public QuorumJournalManager(Configuration conf,
URI uri, NamespaceInfo nsInfo) throws IOException {
@@ -102,6 +104,8 @@ public class QuorumJournalManager implem
this.uri = uri;
this.nsInfo = nsInfo;
this.loggers = new AsyncLoggerSet(createLoggers(loggerFactory));
+ this.connectionFactory = URLConnectionFactory
+ .newDefaultURLConnectionFactory(conf);
// Configure timeouts.
this.startSegmentTimeoutMs = conf.getInt(
@@ -475,8 +479,8 @@ public class QuorumJournalManager implem
URL url = logger.buildURLToFetchLogs(remoteLog.getStartTxId());
EditLogInputStream elis = EditLogFileInputStream.fromUrl(
- url, remoteLog.getStartTxId(), remoteLog.getEndTxId(),
- remoteLog.isInProgress());
+ connectionFactory, url, remoteLog.getStartTxId(),
+ remoteLog.getEndTxId(), remoteLog.isInProgress());
allStreams.add(elis);
}
}
Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumOutputStream.java?rev=1547122&r1=1547121&r2=1547122&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumOutputStream.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumOutputStream.java Mon Dec 2 17:41:44 2013
@@ -114,10 +114,10 @@ class QuorumOutputStream extends EditLog
}
@Override
- public String generateHtmlReport() {
+ public String generateReport() {
StringBuilder sb = new StringBuilder();
- sb.append("Writing segment beginning at txid " + segmentTxId + "<br/>\n");
- loggers.appendHtmlReport(sb);
+ sb.append("Writing segment beginning at txid " + segmentTxId + ". \n");
+ loggers.appendReport(sb);
return sb.toString();
}
Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeHttpServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeHttpServer.java?rev=1547122&r1=1547121&r2=1547122&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeHttpServer.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeHttpServer.java Mon Dec 2 17:41:44 2013
@@ -23,6 +23,8 @@ import static org.apache.hadoop.hdfs.DFS
import java.io.IOException;
import java.net.InetSocketAddress;
+import java.net.URI;
+import java.net.URISyntaxException;
import javax.servlet.ServletContext;
@@ -69,8 +71,15 @@ public class JournalNodeHttpServer {
bindAddr.getHostName()));
int tmpInfoPort = bindAddr.getPort();
+ URI httpEndpoint;
+ try {
+ httpEndpoint = new URI("http://" + NetUtils.getHostPortString(bindAddr));
+ } catch (URISyntaxException e) {
+ throw new IOException(e);
+ }
+
httpServer = new HttpServer.Builder().setName("journal")
- .setBindAddress(bindAddr.getHostName()).setPort(tmpInfoPort)
+ .addEndpoint(httpEndpoint)
.setFindPort(tmpInfoPort == 0).setConf(conf).setACL(
new AccessControlList(conf.get(DFS_ADMIN, " ")))
.setSecurityEnabled(UserGroupInformation.isSecurityEnabled())
@@ -85,7 +94,7 @@ public class JournalNodeHttpServer {
httpServer.start();
// The web-server port can be ephemeral... ensure we have the correct info
- infoPort = httpServer.getPort();
+ infoPort = httpServer.getConnectorAddress(0).getPort();
LOG.info("Journal Web-server up at: " + bindAddr + ":" + infoPort);
}
@@ -104,7 +113,7 @@ public class JournalNodeHttpServer {
* Return the actual address bound to by the running server.
*/
public InetSocketAddress getAddress() {
- InetSocketAddress addr = httpServer.getListenerAddress();
+ InetSocketAddress addr = httpServer.getConnectorAddress(0);
assert addr.getPort() != 0;
return addr;
}
Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java?rev=1547122&r1=1547121&r2=1547122&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java Mon Dec 2 17:41:44 2013
@@ -22,6 +22,7 @@ import static org.apache.hadoop.util.Exi
import java.io.Closeable;
import java.io.IOException;
import java.util.Collection;
+import java.util.Date;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
@@ -208,17 +209,27 @@ public class CacheReplicationMonitor ext
/**
* Scan all CacheDirectives. Use the information to figure out
* what cache replication factor each block should have.
- *
- * @param mark Whether the current scan is setting or clearing the mark
*/
private void rescanCacheDirectives() {
FSDirectory fsDir = namesystem.getFSDirectory();
- for (CacheDirective pce : cacheManager.getEntriesById().values()) {
+ final long now = new Date().getTime();
+ for (CacheDirective directive : cacheManager.getEntriesById().values()) {
+ // Reset the directive
+ directive.clearBytesNeeded();
+ directive.clearBytesCached();
+ directive.clearFilesAffected();
+ // Skip processing this entry if it has expired
+ LOG.info("Directive expiry is at " + directive.getExpiryTime());
+ if (directive.getExpiryTime() > 0 && directive.getExpiryTime() <= now) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Skipping directive id " + directive.getId()
+ + " because it has expired (" + directive.getExpiryTime() + ">="
+ + now);
+ }
+ continue;
+ }
scannedDirectives++;
- pce.clearBytesNeeded();
- pce.clearBytesCached();
- pce.clearFilesAffected();
- String path = pce.getPath();
+ String path = directive.getPath();
INode node;
try {
node = fsDir.getINode(path);
@@ -235,11 +246,11 @@ public class CacheReplicationMonitor ext
ReadOnlyList<INode> children = dir.getChildrenList(null);
for (INode child : children) {
if (child.isFile()) {
- rescanFile(pce, child.asFile());
+ rescanFile(directive, child.asFile());
}
}
} else if (node.isFile()) {
- rescanFile(pce, node.asFile());
+ rescanFile(directive, node.asFile());
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Ignoring non-directory, non-file inode " + node +
@@ -301,7 +312,7 @@ public class CacheReplicationMonitor ext
pce.addBytesNeeded(neededTotal);
pce.addBytesCached(cachedTotal);
if (LOG.isTraceEnabled()) {
- LOG.debug("Directive " + pce.getEntryId() + " is caching " +
+ LOG.debug("Directive " + pce.getId() + " is caching " +
file.getFullPathName() + ": " + cachedTotal + "/" + neededTotal);
}
}
Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStatistics.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStatistics.java?rev=1547122&r1=1547121&r2=1547122&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStatistics.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStatistics.java Mon Dec 2 17:41:44 2013
@@ -42,6 +42,12 @@ public interface DatanodeStatistics {
/** @return the percentage of the block pool used space over the total capacity. */
public float getPercentBlockPoolUsed();
+
+ /** @return the total cache capacity of all DataNodes */
+ public long getCacheCapacity();
+
+ /** @return the total cache used by all DataNodes */
+ public long getCacheUsed();
/** @return the xceiver count */
public int getXceiverCount();
Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java?rev=1547122&r1=1547121&r2=1547122&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java Mon Dec 2 17:41:44 2013
@@ -149,6 +149,17 @@ class HeartbeatManager implements Datano
public synchronized int getXceiverCount() {
return stats.xceiverCount;
}
+
+ @Override
+ public synchronized long getCacheCapacity() {
+ return stats.cacheCapacity;
+ }
+
+ @Override
+ public synchronized long getCacheUsed() {
+ return stats.cacheUsed;
+ }
+
@Override
public synchronized long[] getStats() {
@@ -309,6 +320,8 @@ class HeartbeatManager implements Datano
private long capacityRemaining = 0L;
private long blockPoolUsed = 0L;
private int xceiverCount = 0;
+ private long cacheCapacity = 0L;
+ private long cacheUsed = 0L;
private int expiredHeartbeats = 0;
@@ -322,6 +335,8 @@ class HeartbeatManager implements Datano
} else {
capacityTotal += node.getDfsUsed();
}
+ cacheCapacity += node.getCacheCapacity();
+ cacheUsed += node.getCacheUsed();
}
private void subtract(final DatanodeDescriptor node) {
@@ -334,6 +349,8 @@ class HeartbeatManager implements Datano
} else {
capacityTotal -= node.getDfsUsed();
}
+ cacheCapacity -= node.getCacheCapacity();
+ cacheUsed -= node.getCacheUsed();
}
/** Increment expired heartbeat counter. */
Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=1547122&r1=1547121&r2=1547122&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java Mon Dec 2 17:41:44 2013
@@ -52,6 +52,7 @@ import java.util.concurrent.atomic.Atomi
import javax.management.ObjectName;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
@@ -234,6 +235,7 @@ public class DataNode extends Configured
private volatile boolean heartbeatsDisabledForTests = false;
private DataStorage storage = null;
private HttpServer infoServer = null;
+ private int infoPort;
private int infoSecurePort;
DataNodeMetrics metrics;
private InetSocketAddress streamingAddr;
@@ -354,27 +356,33 @@ public class DataNode extends Configured
String infoHost = infoSocAddr.getHostName();
int tmpInfoPort = infoSocAddr.getPort();
HttpServer.Builder builder = new HttpServer.Builder().setName("datanode")
- .setBindAddress(infoHost).setPort(tmpInfoPort)
+ .addEndpoint(URI.create("http://" + NetUtils.getHostPortString(infoSocAddr)))
.setFindPort(tmpInfoPort == 0).setConf(conf)
.setACL(new AccessControlList(conf.get(DFS_ADMIN, " ")));
- this.infoServer = (secureResources == null) ? builder.build() :
- builder.setConnector(secureResources.getListener()).build();
LOG.info("Opened info server at " + infoHost + ":" + tmpInfoPort);
if (conf.getBoolean(DFS_HTTPS_ENABLE_KEY, false)) {
- boolean needClientAuth = conf.getBoolean(DFS_CLIENT_HTTPS_NEED_AUTH_KEY,
- DFS_CLIENT_HTTPS_NEED_AUTH_DEFAULT);
InetSocketAddress secInfoSocAddr = NetUtils.createSocketAddr(conf.get(
DFS_DATANODE_HTTPS_ADDRESS_KEY, infoHost + ":" + 0));
- Configuration sslConf = new HdfsConfiguration(false);
- sslConf.addResource(conf.get(DFSConfigKeys.DFS_SERVER_HTTPS_KEYSTORE_RESOURCE_KEY,
- "ssl-server.xml"));
- this.infoServer.addSslListener(secInfoSocAddr, sslConf, needClientAuth);
+ builder.addEndpoint(URI.create("https://"
+ + NetUtils.getHostPortString(secInfoSocAddr)));
+ Configuration sslConf = new Configuration(false);
+ sslConf.setBoolean(DFSConfigKeys.DFS_CLIENT_HTTPS_NEED_AUTH_KEY, conf
+ .getBoolean(DFSConfigKeys.DFS_CLIENT_HTTPS_NEED_AUTH_KEY,
+ DFSConfigKeys.DFS_CLIENT_HTTPS_NEED_AUTH_DEFAULT));
+ sslConf.addResource(conf.get(
+ DFSConfigKeys.DFS_SERVER_HTTPS_KEYSTORE_RESOURCE_KEY,
+ DFSConfigKeys.DFS_SERVER_HTTPS_KEYSTORE_RESOURCE_DEFAULT));
+ DFSUtil.loadSslConfToHttpServerBuilder(builder, sslConf);
+
if(LOG.isDebugEnabled()) {
LOG.debug("Datanode listening for SSL on " + secInfoSocAddr);
}
infoSecurePort = secInfoSocAddr.getPort();
}
+
+ this.infoServer = (secureResources == null) ? builder.build() :
+ builder.setConnector(secureResources.getListener()).build();
this.infoServer.addInternalServlet(null, "/streamFile/*", StreamFile.class);
this.infoServer.addInternalServlet(null, "/getFileChecksum/*",
FileChecksumServlets.GetServlet.class);
@@ -390,6 +398,7 @@ public class DataNode extends Configured
WebHdfsFileSystem.PATH_PREFIX + "/*");
}
this.infoServer.start();
+ this.infoPort = infoServer.getConnectorAddress(0).getPort();
}
private void startPlugins(Configuration conf) {
@@ -712,7 +721,7 @@ public class DataNode extends Configured
this.dnConf = new DNConf(conf);
if (dnConf.maxLockedMemory > 0) {
- if (!NativeIO.isAvailable()) {
+ if (!NativeIO.POSIX.getCacheManipulator().verifyCanMlock()) {
throw new RuntimeException(String.format(
"Cannot start datanode because the configured max locked memory" +
" size (%s) is greater than zero and native code is not available.",
@@ -2320,7 +2329,7 @@ public class DataNode extends Configured
* @return the datanode's http port
*/
public int getInfoPort() {
- return infoServer.getPort();
+ return infoPort;
}
/**
Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java?rev=1547122&r1=1547121&r2=1547122&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java Mon Dec 2 17:41:44 2013
@@ -310,7 +310,16 @@ public class DataStorage extends Storage
@Override
protected void setFieldsFromProperties(Properties props, StorageDirectory sd)
throws IOException {
- setLayoutVersion(props, sd);
+ setFieldsFromProperties(props, sd, false, 0);
+ }
+
+ private void setFieldsFromProperties(Properties props, StorageDirectory sd,
+ boolean overrideLayoutVersion, int toLayoutVersion) throws IOException {
+ if (overrideLayoutVersion) {
+ this.layoutVersion = toLayoutVersion;
+ } else {
+ setLayoutVersion(props, sd);
+ }
setcTime(props, sd);
setStorageType(props, sd);
setClusterId(props, layoutVersion, sd);
@@ -374,13 +383,20 @@ public class DataStorage extends Storage
return true;
}
+ /** Read VERSION file for rollback */
+ void readProperties(StorageDirectory sd, int rollbackLayoutVersion)
+ throws IOException {
+ Properties props = readPropertiesFile(sd.getVersionFile());
+ setFieldsFromProperties(props, sd, true, rollbackLayoutVersion);
+ }
+
/**
* Analize which and whether a transition of the fs state is required
* and perform it if necessary.
*
- * Rollback if previousLV >= LAYOUT_VERSION && prevCTime <= namenode.cTime
- * Upgrade if this.LV > LAYOUT_VERSION || this.cTime < namenode.cTime
- * Regular startup if this.LV = LAYOUT_VERSION && this.cTime = namenode.cTime
+ * Rollback if the rollback startup option was specified.
+ * Upgrade if this.LV > LAYOUT_VERSION
+ * Regular startup if this.LV = LAYOUT_VERSION
*
* @param datanode Datanode to which this storage belongs to
* @param sd storage directory
@@ -420,25 +436,28 @@ public class DataStorage extends Storage
+ nsInfo.getClusterID() + "; datanode clusterID = " + getClusterID());
}
- // regular start up
- if (this.layoutVersion == HdfsConstants.LAYOUT_VERSION
- && this.cTime == nsInfo.getCTime())
+ // After addition of the federation feature, ctime check is only
+ // meaningful at BlockPoolSliceStorage level.
+
+ // regular start up.
+ if (this.layoutVersion == HdfsConstants.LAYOUT_VERSION)
return; // regular startup
// do upgrade
- if (this.layoutVersion > HdfsConstants.LAYOUT_VERSION
- || this.cTime < nsInfo.getCTime()) {
+ if (this.layoutVersion > HdfsConstants.LAYOUT_VERSION) {
doUpgrade(sd, nsInfo); // upgrade
return;
}
- // layoutVersion == LAYOUT_VERSION && this.cTime > nsInfo.cTime
- // must shutdown
- throw new IOException("Datanode state: LV = " + this.getLayoutVersion()
- + " CTime = " + this.getCTime()
- + " is newer than the namespace state: LV = "
- + nsInfo.getLayoutVersion()
- + " CTime = " + nsInfo.getCTime());
+ // layoutVersion < LAYOUT_VERSION. I.e. stored layout version is newer
+ // than the version supported by datanode. This should have been caught
+ // in readProperties(), even if rollback was not carried out or somehow
+ // failed.
+ throw new IOException("BUG: The stored LV = " + this.getLayoutVersion()
+ + " is newer than the supported LV = "
+ + HdfsConstants.LAYOUT_VERSION
+ + " or name node LV = "
+ + nsInfo.getLayoutVersion());
}
/**
@@ -464,8 +483,13 @@ public class DataStorage extends Storage
* @throws IOException on error
*/
void doUpgrade(StorageDirectory sd, NamespaceInfo nsInfo) throws IOException {
+ // If the existing on-disk layout version supportes federation, simply
+ // update its layout version.
if (LayoutVersion.supports(Feature.FEDERATION, layoutVersion)) {
- clusterID = nsInfo.getClusterID();
+ // The VERSION file is already read in. Override the layoutVersion
+ // field and overwrite the file.
+ LOG.info("Updating layout version from " + layoutVersion + " to "
+ + nsInfo.getLayoutVersion() + " for storage " + sd.getRoot());
layoutVersion = nsInfo.getLayoutVersion();
writeProperties(sd);
return;
@@ -550,15 +574,32 @@ public class DataStorage extends Storage
* <li> Remove removed.tmp </li>
* </ol>
*
- * Do nothing, if previous directory does not exist.
+ * If previous directory does not exist and the current version supports
+ * federation, perform a simple rollback of layout version. This does not
+ * involve saving/restoration of actual data.
*/
void doRollback( StorageDirectory sd,
NamespaceInfo nsInfo
) throws IOException {
File prevDir = sd.getPreviousDir();
- // regular startup if previous dir does not exist
- if (!prevDir.exists())
+ // This is a regular startup or a post-federation rollback
+ if (!prevDir.exists()) {
+ // The current datanode version supports federation and the layout
+ // version from namenode matches what the datanode supports. An invalid
+ // rollback may happen if namenode didn't rollback and datanode is
+ // running a wrong version. But this will be detected in block pool
+ // level and the invalid VERSION content will be overwritten when
+ // the error is corrected and rollback is retried.
+ if (LayoutVersion.supports(Feature.FEDERATION,
+ HdfsConstants.LAYOUT_VERSION) &&
+ HdfsConstants.LAYOUT_VERSION == nsInfo.getLayoutVersion()) {
+ readProperties(sd, nsInfo.getLayoutVersion());
+ writeProperties(sd);
+ LOG.info("Layout version rolled back to " +
+ nsInfo.getLayoutVersion() + " for storage " + sd.getRoot());
+ }
return;
+ }
DataStorage prevInfo = new DataStorage();
prevInfo.readPreviousVersionProperties(sd);
Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java?rev=1547122&r1=1547121&r2=1547122&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java Mon Dec 2 17:41:44 2013
@@ -145,6 +145,8 @@ public class FsDatasetCache {
*/
private final HashMap<Key, Value> mappableBlockMap = new HashMap<Key, Value>();
+ private final AtomicLong numBlocksCached = new AtomicLong(0);
+
private final FsDatasetImpl dataset;
private final ThreadPoolExecutor uncachingExecutor;
@@ -417,6 +419,7 @@ public class FsDatasetCache {
LOG.debug("Successfully cached block " + key.id + " in " + key.bpid +
". We are now caching " + newUsedBytes + " bytes in total.");
}
+ numBlocksCached.addAndGet(1);
success = true;
} finally {
if (!success) {
@@ -465,6 +468,7 @@ public class FsDatasetCache {
}
long newUsedBytes =
usedBytesCount.release(value.mappableBlock.getLength());
+ numBlocksCached.addAndGet(-1);
if (LOG.isDebugEnabled()) {
LOG.debug("Uncaching of block " + key.id + " in " + key.bpid +
" completed. usedBytes = " + newUsedBytes);
@@ -477,14 +481,14 @@ public class FsDatasetCache {
/**
* Get the approximate amount of cache space used.
*/
- public long getDnCacheUsed() {
+ public long getCacheUsed() {
return usedBytesCount.get();
}
/**
* Get the maximum amount of bytes we can cache. This is a constant.
*/
- public long getDnCacheCapacity() {
+ public long getCacheCapacity() {
return maxBytes;
}
@@ -496,4 +500,7 @@ public class FsDatasetCache {
return numBlocksFailedToUncache.get();
}
+ public long getNumBlocksCached() {
+ return numBlocksCached.get();
+ }
}
Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java?rev=1547122&r1=1547121&r2=1547122&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java Mon Dec 2 17:41:44 2013
@@ -341,12 +341,12 @@ class FsDatasetImpl implements FsDataset
@Override // FSDatasetMBean
public long getCacheUsed() {
- return cacheManager.getDnCacheUsed();
+ return cacheManager.getCacheUsed();
}
@Override // FSDatasetMBean
public long getCacheCapacity() {
- return cacheManager.getDnCacheCapacity();
+ return cacheManager.getCacheCapacity();
}
@Override // FSDatasetMBean
@@ -359,6 +359,11 @@ class FsDatasetImpl implements FsDataset
return cacheManager.getNumBlocksFailedToUncache();
}
+ @Override // FSDatasetMBean
+ public long getNumBlocksCached() {
+ return cacheManager.getNumBlocksCached();
+ }
+
/**
* Find the block's on-disk length
*/
Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/FSDatasetMBean.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/FSDatasetMBean.java?rev=1547122&r1=1547121&r2=1547122&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/FSDatasetMBean.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/FSDatasetMBean.java Mon Dec 2 17:41:44 2013
@@ -89,6 +89,11 @@ public interface FSDatasetMBean {
public long getCacheCapacity();
/**
+ * Returns the number of blocks cached.
+ */
+ public long getNumBlocksCached();
+
+ /**
* Returns the number of blocks that the datanode was unable to cache
*/
public long getNumBlocksFailedToCache();
Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java?rev=1547122&r1=1547121&r2=1547122&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java Mon Dec 2 17:41:44 2013
@@ -17,12 +17,12 @@
*/
package org.apache.hadoop.hdfs.server.namenode;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIST_CACHE_POOLS_NUM_RESPONSES;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIST_CACHE_POOLS_NUM_RESPONSES_DEFAULT;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIST_CACHE_DIRECTIVES_NUM_RESPONSES;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_CACHING_ENABLED_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_CACHING_ENABLED_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_CACHING_ENABLED_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIST_CACHE_DIRECTIVES_NUM_RESPONSES;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIST_CACHE_DIRECTIVES_NUM_RESPONSES_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIST_CACHE_POOLS_NUM_RESPONSES;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIST_CACHE_POOLS_NUM_RESPONSES_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS_DEFAULT;
@@ -43,17 +43,18 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.InvalidRequestException;
import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedListEntries;
+import org.apache.hadoop.fs.InvalidRequestException;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.CacheDirective;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
+import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
+import org.apache.hadoop.hdfs.protocol.CachePoolEntry;
import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
-import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
-import org.apache.hadoop.hdfs.protocol.CacheDirective;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.CacheReplicationMonitor;
@@ -99,24 +100,24 @@ public final class CacheManager {
private final BlockManager blockManager;
/**
- * Cache entries, sorted by ID.
+ * Cache directives, sorted by ID.
*
* listCacheDirectives relies on the ordering of elements in this map
* to track what has already been listed by the client.
*/
- private final TreeMap<Long, CacheDirective> entriesById =
+ private final TreeMap<Long, CacheDirective> directivesById =
new TreeMap<Long, CacheDirective>();
/**
- * The entry ID to use for a new entry. Entry IDs always increase, and are
+ * The directive ID to use for a new directive. IDs always increase, and are
* never reused.
*/
- private long nextEntryId;
+ private long nextDirectiveId;
/**
- * Cache entries, sorted by path
+ * Cache directives, sorted by path
*/
- private final TreeMap<String, List<CacheDirective>> entriesByPath =
+ private final TreeMap<String, List<CacheDirective>> directivesByPath =
new TreeMap<String, List<CacheDirective>>();
/**
@@ -177,7 +178,7 @@ public final class CacheManager {
BlockManager blockManager) {
this.namesystem = namesystem;
this.blockManager = blockManager;
- this.nextEntryId = 1;
+ this.nextDirectiveId = 1;
this.maxListCachePoolsResponses = conf.getInt(
DFS_NAMENODE_LIST_CACHE_POOLS_NUM_RESPONSES,
DFS_NAMENODE_LIST_CACHE_POOLS_NUM_RESPONSES_DEFAULT);
@@ -239,7 +240,7 @@ public final class CacheManager {
public TreeMap<Long, CacheDirective> getEntriesById() {
assert namesystem.hasReadLock();
- return entriesById;
+ return directivesById;
}
@VisibleForTesting
@@ -248,12 +249,12 @@ public final class CacheManager {
return cachedBlocks;
}
- private long getNextEntryId() throws IOException {
+ private long getNextDirectiveId() throws IOException {
assert namesystem.hasWriteLock();
- if (nextEntryId >= Long.MAX_VALUE - 1) {
+ if (nextDirectiveId >= Long.MAX_VALUE - 1) {
throw new IOException("No more available IDs.");
}
- return nextEntryId++;
+ return nextDirectiveId++;
}
// Helper getter / validation methods
@@ -301,7 +302,35 @@ public final class CacheManager {
}
/**
- * Get a CacheDirective by ID, validating the ID and that the entry
+ * Calculates the absolute expiry time of the directive from the
+ * {@link CacheDirectiveInfo.Expiration}. This converts a relative Expiration
+ * into an absolute time based on the local clock.
+ *
+ * @param directive from which to get the expiry time
+ * @param defaultValue to use if Expiration is not set
+ * @return Absolute expiry time in milliseconds since Unix epoch
+ * @throws InvalidRequestException if the Expiration is invalid
+ */
+ private static long validateExpiryTime(CacheDirectiveInfo directive,
+ long defaultValue) throws InvalidRequestException {
+ long expiryTime;
+ CacheDirectiveInfo.Expiration expiration = directive.getExpiration();
+ if (expiration != null) {
+ if (expiration.getMillis() < 0) {
+ throw new InvalidRequestException("Cannot set a negative expiration: "
+ + expiration.getMillis());
+ }
+ // Converts a relative duration into an absolute time based on the local
+ // clock
+ expiryTime = expiration.getAbsoluteMillis();
+ } else {
+ expiryTime = defaultValue;
+ }
+ return expiryTime;
+ }
+
+ /**
+ * Get a CacheDirective by ID, validating the ID and that the directive
* exists.
*/
private CacheDirective getById(long id) throws InvalidRequestException {
@@ -309,13 +338,13 @@ public final class CacheManager {
if (id <= 0) {
throw new InvalidRequestException("Invalid negative ID.");
}
- // Find the entry.
- CacheDirective entry = entriesById.get(id);
- if (entry == null) {
+ // Find the directive.
+ CacheDirective directive = directivesById.get(id);
+ if (directive == null) {
throw new InvalidRequestException("No directive with ID " + id
+ " found.");
}
- return entry;
+ return directive;
}
/**
@@ -332,122 +361,134 @@ public final class CacheManager {
// RPC handlers
- private void addInternal(CacheDirective entry) {
- entriesById.put(entry.getEntryId(), entry);
- String path = entry.getPath();
- List<CacheDirective> entryList = entriesByPath.get(path);
- if (entryList == null) {
- entryList = new ArrayList<CacheDirective>(1);
- entriesByPath.put(path, entryList);
+ private void addInternal(CacheDirective directive, CachePool pool) {
+ boolean addedDirective = pool.getDirectiveList().add(directive);
+ assert addedDirective;
+ directivesById.put(directive.getId(), directive);
+ String path = directive.getPath();
+ List<CacheDirective> directives = directivesByPath.get(path);
+ if (directives == null) {
+ directives = new ArrayList<CacheDirective>(1);
+ directivesByPath.put(path, directives);
}
- entryList.add(entry);
+ directives.add(directive);
+ }
+
+ /**
+ * To be called only from the edit log loading code
+ */
+ CacheDirectiveInfo addDirectiveFromEditLog(CacheDirectiveInfo directive)
+ throws InvalidRequestException {
+ long id = directive.getId();
+ CacheDirective entry =
+ new CacheDirective(
+ directive.getId(),
+ directive.getPath().toUri().getPath(),
+ directive.getReplication(),
+ directive.getExpiration().getAbsoluteMillis());
+ CachePool pool = cachePools.get(directive.getPool());
+ addInternal(entry, pool);
+ if (nextDirectiveId <= id) {
+ nextDirectiveId = id + 1;
+ }
+ return entry.toInfo();
}
public CacheDirectiveInfo addDirective(
- CacheDirectiveInfo directive, FSPermissionChecker pc)
+ CacheDirectiveInfo info, FSPermissionChecker pc)
throws IOException {
assert namesystem.hasWriteLock();
- CacheDirective entry;
+ CacheDirective directive;
try {
- CachePool pool = getCachePool(validatePoolName(directive));
+ CachePool pool = getCachePool(validatePoolName(info));
checkWritePermission(pc, pool);
- String path = validatePath(directive);
- short replication = validateReplication(directive, (short)1);
- long id;
- if (directive.getId() != null) {
- // We are loading an entry from the edit log.
- // Use the ID from the edit log.
- id = directive.getId();
- if (id <= 0) {
- throw new InvalidRequestException("can't add an ID " +
- "of " + id + ": it is not positive.");
- }
- if (id >= Long.MAX_VALUE) {
- throw new InvalidRequestException("can't add an ID " +
- "of " + id + ": it is too big.");
- }
- if (nextEntryId <= id) {
- nextEntryId = id + 1;
- }
- } else {
- // Add a new entry with the next available ID.
- id = getNextEntryId();
- }
- entry = new CacheDirective(id, path, replication, pool);
- addInternal(entry);
+ String path = validatePath(info);
+ short replication = validateReplication(info, (short)1);
+ long expiryTime = validateExpiryTime(info,
+ CacheDirectiveInfo.Expiration.EXPIRY_NEVER);
+ // All validation passed
+ // Add a new entry with the next available ID.
+ long id = getNextDirectiveId();
+ directive = new CacheDirective(id, path, replication, expiryTime);
+ addInternal(directive, pool);
} catch (IOException e) {
- LOG.warn("addDirective of " + directive + " failed: ", e);
+ LOG.warn("addDirective of " + info + " failed: ", e);
throw e;
}
- LOG.info("addDirective of " + directive + " successful.");
+ LOG.info("addDirective of " + info + " successful.");
if (monitor != null) {
monitor.kick();
}
- return entry.toDirective();
+ return directive.toInfo();
}
- public void modifyDirective(CacheDirectiveInfo directive,
+ public void modifyDirective(CacheDirectiveInfo info,
FSPermissionChecker pc) throws IOException {
assert namesystem.hasWriteLock();
String idString =
- (directive.getId() == null) ?
- "(null)" : directive.getId().toString();
+ (info.getId() == null) ?
+ "(null)" : info.getId().toString();
try {
// Check for invalid IDs.
- Long id = directive.getId();
+ Long id = info.getId();
if (id == null) {
throw new InvalidRequestException("Must supply an ID.");
}
CacheDirective prevEntry = getById(id);
checkWritePermission(pc, prevEntry.getPool());
String path = prevEntry.getPath();
- if (directive.getPath() != null) {
- path = validatePath(directive);
+ if (info.getPath() != null) {
+ path = validatePath(info);
}
+
short replication = prevEntry.getReplication();
- if (directive.getReplication() != null) {
- replication = validateReplication(directive, replication);
- }
+ replication = validateReplication(info, replication);
+
+ long expiryTime = prevEntry.getExpiryTime();
+ expiryTime = validateExpiryTime(info, expiryTime);
+
CachePool pool = prevEntry.getPool();
- if (directive.getPool() != null) {
- pool = getCachePool(validatePoolName(directive));
+ if (info.getPool() != null) {
+ pool = getCachePool(validatePoolName(info));
checkWritePermission(pc, pool);
}
removeInternal(prevEntry);
CacheDirective newEntry =
- new CacheDirective(id, path, replication, pool);
- addInternal(newEntry);
+ new CacheDirective(id, path, replication, expiryTime);
+ addInternal(newEntry, pool);
} catch (IOException e) {
LOG.warn("modifyDirective of " + idString + " failed: ", e);
throw e;
}
LOG.info("modifyDirective of " + idString + " successfully applied " +
- directive + ".");
+ info+ ".");
}
- public void removeInternal(CacheDirective existing)
+ public void removeInternal(CacheDirective directive)
throws InvalidRequestException {
assert namesystem.hasWriteLock();
- // Remove the corresponding entry in entriesByPath.
- String path = existing.getPath();
- List<CacheDirective> entries = entriesByPath.get(path);
- if (entries == null || !entries.remove(existing)) {
+ // Remove the corresponding entry in directivesByPath.
+ String path = directive.getPath();
+ List<CacheDirective> directives = directivesByPath.get(path);
+ if (directives == null || !directives.remove(directive)) {
throw new InvalidRequestException("Failed to locate entry " +
- existing.getEntryId() + " by path " + existing.getPath());
+ directive.getId() + " by path " + directive.getPath());
}
- if (entries.size() == 0) {
- entriesByPath.remove(path);
+ if (directives.size() == 0) {
+ directivesByPath.remove(path);
}
- entriesById.remove(existing.getEntryId());
+ directivesById.remove(directive.getId());
+ directive.getPool().getDirectiveList().remove(directive);
+ assert directive.getPool() == null;
}
public void removeDirective(long id, FSPermissionChecker pc)
throws IOException {
assert namesystem.hasWriteLock();
try {
- CacheDirective existing = getById(id);
- checkWritePermission(pc, existing.getPool());
- removeInternal(existing);
+ CacheDirective directive = getById(id);
+ checkWritePermission(pc, directive.getPool());
+ removeInternal(directive);
} catch (IOException e) {
LOG.warn("removeDirective of " + id + " failed: ", e);
throw e;
@@ -478,13 +519,13 @@ public final class CacheManager {
new ArrayList<CacheDirectiveEntry>(NUM_PRE_ALLOCATED_ENTRIES);
int numReplies = 0;
SortedMap<Long, CacheDirective> tailMap =
- entriesById.tailMap(prevId + 1);
+ directivesById.tailMap(prevId + 1);
for (Entry<Long, CacheDirective> cur : tailMap.entrySet()) {
if (numReplies >= maxListCacheDirectivesNumResponses) {
return new BatchedListEntries<CacheDirectiveEntry>(replies, true);
}
- CacheDirective curEntry = cur.getValue();
- CacheDirectiveInfo info = cur.getValue().toDirective();
+ CacheDirective curDirective = cur.getValue();
+ CacheDirectiveInfo info = cur.getValue().toInfo();
if (filter.getPool() != null &&
!info.getPool().equals(filter.getPool())) {
continue;
@@ -496,7 +537,7 @@ public final class CacheManager {
boolean hasPermission = true;
if (pc != null) {
try {
- pc.checkPermission(curEntry.getPool(), FsAction.READ);
+ pc.checkPermission(curDirective.getPool(), FsAction.READ);
} catch (AccessControlException e) {
hasPermission = false;
}
@@ -530,7 +571,7 @@ public final class CacheManager {
pool = CachePool.createFromInfoAndDefaults(info);
cachePools.put(pool.getPoolName(), pool);
LOG.info("Created new cache pool " + pool);
- return pool.getInfo(null);
+ return pool.getInfo(true);
}
/**
@@ -599,39 +640,34 @@ public final class CacheManager {
throw new InvalidRequestException(
"Cannot remove non-existent cache pool " + poolName);
}
-
- // Remove entries using this pool
- // TODO: could optimize this somewhat to avoid the need to iterate
- // over all entries in entriesById
- Iterator<Entry<Long, CacheDirective>> iter =
- entriesById.entrySet().iterator();
+ // Remove all directives in this pool.
+ Iterator<CacheDirective> iter = pool.getDirectiveList().iterator();
while (iter.hasNext()) {
- Entry<Long, CacheDirective> entry = iter.next();
- if (entry.getValue().getPool() == pool) {
- entriesByPath.remove(entry.getValue().getPath());
- iter.remove();
- }
+ CacheDirective directive = iter.next();
+ directivesByPath.remove(directive.getPath());
+ directivesById.remove(directive.getId());
+ iter.remove();
}
if (monitor != null) {
monitor.kick();
}
}
- public BatchedListEntries<CachePoolInfo>
+ public BatchedListEntries<CachePoolEntry>
listCachePools(FSPermissionChecker pc, String prevKey) {
assert namesystem.hasReadLock();
final int NUM_PRE_ALLOCATED_ENTRIES = 16;
- ArrayList<CachePoolInfo> results =
- new ArrayList<CachePoolInfo>(NUM_PRE_ALLOCATED_ENTRIES);
+ ArrayList<CachePoolEntry> results =
+ new ArrayList<CachePoolEntry>(NUM_PRE_ALLOCATED_ENTRIES);
SortedMap<String, CachePool> tailMap = cachePools.tailMap(prevKey, false);
int numListed = 0;
for (Entry<String, CachePool> cur : tailMap.entrySet()) {
if (numListed++ >= maxListCachePoolsResponses) {
- return new BatchedListEntries<CachePoolInfo>(results, true);
+ return new BatchedListEntries<CachePoolEntry>(results, true);
}
- results.add(cur.getValue().getInfo(pc));
+ results.add(cur.getValue().getEntry(pc));
}
- return new BatchedListEntries<CachePoolInfo>(results, false);
+ return new BatchedListEntries<CachePoolEntry>(results, false);
}
public void setCachedLocations(LocatedBlock block) {
@@ -693,13 +729,6 @@ public final class CacheManager {
for (Iterator<Long> iter = blockIds.iterator(); iter.hasNext(); ) {
Block block = new Block(iter.next());
BlockInfo blockInfo = blockManager.getStoredBlock(block);
- if (blockInfo.getGenerationStamp() < block.getGenerationStamp()) {
- // The NameNode will eventually remove or update the out-of-date block.
- // Until then, we pretend that it isn't cached.
- LOG.warn("Genstamp in cache report disagrees with our genstamp for " +
- block + ": expected genstamp " + blockInfo.getGenerationStamp());
- continue;
- }
if (!blockInfo.isComplete()) {
LOG.warn("Ignoring block id " + block.getBlockId() + ", because " +
"it is in not complete yet. It is in state " +
@@ -743,9 +772,9 @@ public final class CacheManager {
*/
public void saveState(DataOutput out, String sdPath)
throws IOException {
- out.writeLong(nextEntryId);
+ out.writeLong(nextDirectiveId);
savePools(out, sdPath);
- saveEntries(out, sdPath);
+ saveDirectives(out, sdPath);
}
/**
@@ -755,10 +784,10 @@ public final class CacheManager {
* @throws IOException
*/
public void loadState(DataInput in) throws IOException {
- nextEntryId = in.readLong();
- // pools need to be loaded first since entries point to their parent pool
+ nextDirectiveId = in.readLong();
+ // pools need to be loaded first since directives point to their parent pool
loadPools(in);
- loadEntries(in);
+ loadDirectives(in);
}
/**
@@ -773,7 +802,7 @@ public final class CacheManager {
Counter counter = prog.getCounter(Phase.SAVING_CHECKPOINT, step);
out.writeInt(cachePools.size());
for (CachePool pool: cachePools.values()) {
- pool.getInfo(null).writeTo(out);
+ pool.getInfo(true).writeTo(out);
counter.increment();
}
prog.endStep(Phase.SAVING_CHECKPOINT, step);
@@ -782,19 +811,20 @@ public final class CacheManager {
/*
* Save cache entries to fsimage
*/
- private void saveEntries(DataOutput out, String sdPath)
+ private void saveDirectives(DataOutput out, String sdPath)
throws IOException {
StartupProgress prog = NameNode.getStartupProgress();
Step step = new Step(StepType.CACHE_ENTRIES, sdPath);
prog.beginStep(Phase.SAVING_CHECKPOINT, step);
- prog.setTotal(Phase.SAVING_CHECKPOINT, step, entriesById.size());
+ prog.setTotal(Phase.SAVING_CHECKPOINT, step, directivesById.size());
Counter counter = prog.getCounter(Phase.SAVING_CHECKPOINT, step);
- out.writeInt(entriesById.size());
- for (CacheDirective entry: entriesById.values()) {
- out.writeLong(entry.getEntryId());
- Text.writeString(out, entry.getPath());
- out.writeShort(entry.getReplication());
- Text.writeString(out, entry.getPool().getPoolName());
+ out.writeInt(directivesById.size());
+ for (CacheDirective directive : directivesById.values()) {
+ out.writeLong(directive.getId());
+ Text.writeString(out, directive.getPath());
+ out.writeShort(directive.getReplication());
+ Text.writeString(out, directive.getPool().getPoolName());
+ out.writeLong(directive.getExpiryTime());
counter.increment();
}
prog.endStep(Phase.SAVING_CHECKPOINT, step);
@@ -819,38 +849,42 @@ public final class CacheManager {
}
/**
- * Load cache entries from the fsimage
+ * Load cache directives from the fsimage
*/
- private void loadEntries(DataInput in) throws IOException {
+ private void loadDirectives(DataInput in) throws IOException {
StartupProgress prog = NameNode.getStartupProgress();
Step step = new Step(StepType.CACHE_ENTRIES);
prog.beginStep(Phase.LOADING_FSIMAGE, step);
- int numberOfEntries = in.readInt();
- prog.setTotal(Phase.LOADING_FSIMAGE, step, numberOfEntries);
+ int numDirectives = in.readInt();
+ prog.setTotal(Phase.LOADING_FSIMAGE, step, numDirectives);
Counter counter = prog.getCounter(Phase.LOADING_FSIMAGE, step);
- for (int i = 0; i < numberOfEntries; i++) {
- long entryId = in.readLong();
+ for (int i = 0; i < numDirectives; i++) {
+ long directiveId = in.readLong();
String path = Text.readString(in);
short replication = in.readShort();
String poolName = Text.readString(in);
+ long expiryTime = in.readLong();
// Get pool reference by looking it up in the map
CachePool pool = cachePools.get(poolName);
if (pool == null) {
- throw new IOException("Entry refers to pool " + poolName +
+ throw new IOException("Directive refers to pool " + poolName +
", which does not exist.");
}
- CacheDirective entry =
- new CacheDirective(entryId, path, replication, pool);
- if (entriesById.put(entry.getEntryId(), entry) != null) {
- throw new IOException("An entry with ID " + entry.getEntryId() +
+ CacheDirective directive =
+ new CacheDirective(directiveId, path, replication, expiryTime);
+ boolean addedDirective = pool.getDirectiveList().add(directive);
+ assert addedDirective;
+ if (directivesById.put(directive.getId(), directive) != null) {
+ throw new IOException("A directive with ID " + directive.getId() +
" already exists");
}
- List<CacheDirective> entries = entriesByPath.get(entry.getPath());
- if (entries == null) {
- entries = new LinkedList<CacheDirective>();
- entriesByPath.put(entry.getPath(), entries);
+ List<CacheDirective> directives =
+ directivesByPath.get(directive.getPath());
+ if (directives == null) {
+ directives = new LinkedList<CacheDirective>();
+ directivesByPath.put(directive.getPath(), directives);
}
- entries.add(entry);
+ directives.add(directive);
counter.increment();
}
prog.endStep(Phase.LOADING_FSIMAGE, step);
Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CachePool.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CachePool.java?rev=1547122&r1=1547121&r2=1547122&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CachePool.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CachePool.java Mon Dec 2 17:41:44 2013
@@ -26,9 +26,13 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.protocol.CacheDirective;
+import org.apache.hadoop.hdfs.protocol.CachePoolEntry;
import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
+import org.apache.hadoop.hdfs.protocol.CachePoolStats;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.IntrusiveCollection;
import com.google.common.base.Preconditions;
@@ -69,6 +73,22 @@ public final class CachePool {
private int weight;
+ public final static class DirectiveList
+ extends IntrusiveCollection<CacheDirective> {
+ private CachePool cachePool;
+
+ private DirectiveList(CachePool cachePool) {
+ this.cachePool = cachePool;
+ }
+
+ public CachePool getCachePool() {
+ return cachePool;
+ }
+ }
+
+ @Nonnull
+ private final DirectiveList directiveList = new DirectiveList(this);
+
/**
* Create a new cache pool based on a CachePoolInfo object and the defaults.
* We will fill in information that was not supplied according to the
@@ -171,7 +191,7 @@ public final class CachePool {
* @return
* Cache pool information.
*/
- private CachePoolInfo getInfo(boolean fullInfo) {
+ CachePoolInfo getInfo(boolean fullInfo) {
CachePoolInfo info = new CachePoolInfo(poolName);
if (!fullInfo) {
return info;
@@ -183,15 +203,28 @@ public final class CachePool {
}
/**
+ * Get statistics about this CachePool.
+ *
+ * @return Cache pool statistics.
+ */
+ private CachePoolStats getStats() {
+ return new CachePoolStats.Builder().
+ setBytesNeeded(0).
+ setBytesCached(0).
+ setFilesAffected(0).
+ build();
+ }
+
+ /**
* Returns a CachePoolInfo describing this CachePool based on the permissions
* of the calling user. Unprivileged users will see only minimal descriptive
* information about the pool.
*
* @param pc Permission checker to be used to validate the user's permissions,
* or null
- * @return CachePoolInfo describing this CachePool
+ * @return CachePoolEntry describing this CachePool
*/
- public CachePoolInfo getInfo(FSPermissionChecker pc) {
+ public CachePoolEntry getEntry(FSPermissionChecker pc) {
boolean hasPermission = true;
if (pc != null) {
try {
@@ -200,7 +233,8 @@ public final class CachePool {
hasPermission = false;
}
}
- return getInfo(hasPermission);
+ return new CachePoolEntry(getInfo(hasPermission),
+ hasPermission ? getStats() : new CachePoolStats.Builder().build());
}
public String toString() {
@@ -212,4 +246,8 @@ public final class CachePool {
append(", weight:").append(weight).
append(" }").toString();
}
+
+ public DirectiveList getDirectiveList() {
+ return directiveList;
+ }
}
Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java?rev=1547122&r1=1547121&r2=1547122&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java Mon Dec 2 17:41:44 2013
@@ -36,8 +36,11 @@ import org.apache.hadoop.hdfs.DFSConfigK
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.namenode.TransferFsImage.HttpGetFailedException;
+import org.apache.hadoop.hdfs.web.URLConnectionFactory;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authentication.client.AuthenticationException;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
@@ -100,15 +103,22 @@ public class EditLogFileInputStream exte
/**
* Open an EditLogInputStream for the given URL.
*
- * @param url the url hosting the log
- * @param startTxId the expected starting txid
- * @param endTxId the expected ending txid
- * @param inProgress whether the log is in-progress
+ * @param connectionFactory
+ * the URLConnectionFactory used to create the connection.
+ * @param url
+ * the url hosting the log
+ * @param startTxId
+ * the expected starting txid
+ * @param endTxId
+ * the expected ending txid
+ * @param inProgress
+ * whether the log is in-progress
* @return a stream from which edits may be read
*/
- public static EditLogInputStream fromUrl(URL url, long startTxId,
- long endTxId, boolean inProgress) {
- return new EditLogFileInputStream(new URLLog(url),
+ public static EditLogInputStream fromUrl(
+ URLConnectionFactory connectionFactory, URL url, long startTxId,
+ long endTxId, boolean inProgress) {
+ return new EditLogFileInputStream(new URLLog(connectionFactory, url),
startTxId, endTxId, inProgress);
}
@@ -365,8 +375,12 @@ public class EditLogFileInputStream exte
private long advertisedSize = -1;
private final static String CONTENT_LENGTH = "Content-Length";
+ private final URLConnectionFactory connectionFactory;
+ private final boolean isSpnegoEnabled;
- public URLLog(URL url) {
+ public URLLog(URLConnectionFactory connectionFactory, URL url) {
+ this.connectionFactory = connectionFactory;
+ this.isSpnegoEnabled = UserGroupInformation.isSecurityEnabled();
this.url = url;
}
@@ -376,8 +390,13 @@ public class EditLogFileInputStream exte
new PrivilegedExceptionAction<InputStream>() {
@Override
public InputStream run() throws IOException {
- HttpURLConnection connection = (HttpURLConnection)
- SecurityUtil.openSecureHttpConnection(url);
+ HttpURLConnection connection;
+ try {
+ connection = (HttpURLConnection)
+ connectionFactory.openConnection(url, isSpnegoEnabled);
+ } catch (AuthenticationException e) {
+ throw new IOException(e);
+ }
if (connection.getResponseCode() != HttpURLConnection.HTTP_OK) {
throw new HttpGetFailedException(
Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogOutputStream.java?rev=1547122&r1=1547121&r2=1547122&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogOutputStream.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogOutputStream.java Mon Dec 2 17:41:44 2013
@@ -24,7 +24,6 @@ import static org.apache.hadoop.util.Tim
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.jasper.compiler.JspUtil;
/**
* A generic abstract class to support journaling of edits logs into
@@ -141,10 +140,10 @@ public abstract class EditLogOutputStrea
}
/**
- * @return a short HTML snippet suitable for describing the current
+ * @return a short text snippet suitable for describing the current
* status of the stream
*/
- public String generateHtmlReport() {
- return JspUtil.escapeXml(this.toString());
+ public String generateReport() {
+ return toString();
}
}
Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java?rev=1547122&r1=1547121&r2=1547122&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java Mon Dec 2 17:41:44 2013
@@ -87,11 +87,15 @@ import com.google.common.base.Preconditi
*
*************************************************/
public class FSDirectory implements Closeable {
- private static INodeDirectoryWithQuota createRoot(FSNamesystem namesystem) {
- final INodeDirectoryWithQuota r = new INodeDirectoryWithQuota(
+ private static INodeDirectorySnapshottable createRoot(FSNamesystem namesystem) {
+ final INodeDirectory r = new INodeDirectory(
INodeId.ROOT_INODE_ID,
INodeDirectory.ROOT_NAME,
- namesystem.createFsOwnerPermissions(new FsPermission((short) 0755)));
+ namesystem.createFsOwnerPermissions(new FsPermission((short) 0755)),
+ 0L);
+ r.addDirectoryWithQuotaFeature(
+ DirectoryWithQuotaFeature.DEFAULT_NAMESPACE_QUOTA,
+ DirectoryWithQuotaFeature.DEFAULT_DISKSPACE_QUOTA);
final INodeDirectorySnapshottable s = new INodeDirectorySnapshottable(r);
s.setSnapshotQuota(0);
return s;
@@ -107,7 +111,7 @@ public class FSDirectory implements Clos
public final static String DOT_INODES_STRING = ".inodes";
public final static byte[] DOT_INODES =
DFSUtil.string2Bytes(DOT_INODES_STRING);
- INodeDirectoryWithQuota rootDir;
+ INodeDirectory rootDir;
FSImage fsImage;
private final FSNamesystem namesystem;
private volatile boolean ready = false;
@@ -202,7 +206,7 @@ public class FSDirectory implements Clos
}
/** @return the root directory inode. */
- public INodeDirectoryWithQuota getRoot() {
+ public INodeDirectory getRoot() {
return rootDir;
}
@@ -452,8 +456,8 @@ public class FSDirectory implements Clos
boolean unprotectedRemoveBlock(String path,
INodeFile fileNode, Block block) throws IOException {
- Preconditions.checkArgument(fileNode.isUnderConstruction());
// modify file-> block and blocksMap
+ // fileNode should be under construction
boolean removed = fileNode.removeLastBlock(block);
if (!removed) {
return false;
@@ -1800,9 +1804,8 @@ public class FSDirectory implements Clos
final INode[] inodes = inodesInPath.getINodes();
for(int i=0; i < numOfINodes; i++) {
if (inodes[i].isQuotaSet()) { // a directory with quota
- INodeDirectoryWithQuota node = (INodeDirectoryWithQuota) inodes[i]
- .asDirectory();
- node.addSpaceConsumed2Cache(nsDelta, dsDelta);
+ inodes[i].asDirectory().getDirectoryWithQuotaFeature()
+ .addSpaceConsumed2Cache(nsDelta, dsDelta);
}
}
}
@@ -2035,10 +2038,11 @@ public class FSDirectory implements Clos
// Stop checking for quota when common ancestor is reached
return;
}
- if (inodes[i].isQuotaSet()) { // a directory with quota
+ final DirectoryWithQuotaFeature q
+ = inodes[i].asDirectory().getDirectoryWithQuotaFeature();
+ if (q != null) { // a directory with quota
try {
- ((INodeDirectoryWithQuota) inodes[i].asDirectory()).verifyQuota(
- nsDelta, dsDelta);
+ q.verifyQuota(nsDelta, dsDelta);
} catch (QuotaExceededException e) {
e.setPathName(getFullPathName(inodes, i));
throw e;
@@ -2385,35 +2389,14 @@ public class FSDirectory implements Clos
if (dsQuota == HdfsConstants.QUOTA_DONT_SET) {
dsQuota = oldDsQuota;
}
+ if (oldNsQuota == nsQuota && oldDsQuota == dsQuota) {
+ return null;
+ }
final Snapshot latest = iip.getLatestSnapshot();
- if (dirNode instanceof INodeDirectoryWithQuota) {
- INodeDirectoryWithQuota quotaNode = (INodeDirectoryWithQuota) dirNode;
- Quota.Counts counts = null;
- if (!quotaNode.isQuotaSet()) {
- // dirNode must be an INodeDirectoryWithSnapshot whose quota has not
- // been set yet
- counts = quotaNode.computeQuotaUsage();
- }
- // a directory with quota; so set the quota to the new value
- quotaNode.setQuota(nsQuota, dsQuota);
- if (quotaNode.isQuotaSet() && counts != null) {
- quotaNode.setSpaceConsumed(counts.get(Quota.NAMESPACE),
- counts.get(Quota.DISKSPACE));
- } else if (!quotaNode.isQuotaSet() && latest == null) {
- // do not replace the node if the node is a snapshottable directory
- // without snapshots
- if (!(quotaNode instanceof INodeDirectoryWithSnapshot)) {
- // will not come here for root because root is snapshottable and
- // root's nsQuota is always set
- return quotaNode.replaceSelf4INodeDirectory(inodeMap);
- }
- }
- } else {
- // a non-quota directory; so replace it with a directory with quota
- return dirNode.replaceSelf4Quota(latest, nsQuota, dsQuota, inodeMap);
- }
- return (oldNsQuota != nsQuota || oldDsQuota != dsQuota) ? dirNode : null;
+ dirNode = dirNode.recordModification(latest, inodeMap);
+ dirNode.setQuota(nsQuota, dsQuota);
+ return dirNode;
}
}
@@ -2442,7 +2425,8 @@ public class FSDirectory implements Clos
long totalInodes() {
readLock();
try {
- return rootDir.numItemsInTree();
+ return rootDir.getDirectoryWithQuotaFeature().getSpaceConsumed()
+ .get(Quota.NAMESPACE);
} finally {
readUnlock();
}
Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java?rev=1547122&r1=1547121&r2=1547122&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java Mon Dec 2 17:41:44 2013
@@ -953,7 +953,11 @@ public class FSEditLog implements LogsPu
.setSnapshotRoot(path);
logEdit(op);
}
-
+
+ /**
+ * Log a CacheDirectiveInfo returned from
+ * {@link CacheManager#addDirective(CacheDirectiveInfo, FSPermissionChecker)}
+ */
void logAddCacheDirectiveInfo(CacheDirectiveInfo directive,
boolean toLogRpcIds) {
AddCacheDirectiveInfoOp op =
Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java?rev=1547122&r1=1547121&r2=1547122&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java Mon Dec 2 17:41:44 2013
@@ -636,17 +636,17 @@ public class FSEditLogLoader {
fsNamesys.setLastAllocatedBlockId(allocateBlockIdOp.blockId);
break;
}
- case OP_ADD_PATH_BASED_CACHE_DIRECTIVE: {
+ case OP_ADD_CACHE_DIRECTIVE: {
AddCacheDirectiveInfoOp addOp = (AddCacheDirectiveInfoOp) op;
CacheDirectiveInfo result = fsNamesys.
- getCacheManager().addDirective(addOp.directive, null);
+ getCacheManager().addDirectiveFromEditLog(addOp.directive);
if (toAddRetryCache) {
Long id = result.getId();
fsNamesys.addCacheEntryWithPayload(op.rpcClientId, op.rpcCallId, id);
}
break;
}
- case OP_MODIFY_PATH_BASED_CACHE_DIRECTIVE: {
+ case OP_MODIFY_CACHE_DIRECTIVE: {
ModifyCacheDirectiveInfoOp modifyOp =
(ModifyCacheDirectiveInfoOp) op;
fsNamesys.getCacheManager().modifyDirective(
@@ -656,7 +656,7 @@ public class FSEditLogLoader {
}
break;
}
- case OP_REMOVE_PATH_BASED_CACHE_DIRECTIVE: {
+ case OP_REMOVE_CACHE_DIRECTIVE: {
RemoveCacheDirectiveInfoOp removeOp =
(RemoveCacheDirectiveInfoOp) op;
fsNamesys.getCacheManager().removeDirective(removeOp.id, null);
Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java?rev=1547122&r1=1547121&r2=1547122&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java Mon Dec 2 17:41:44 2013
@@ -18,9 +18,8 @@
package org.apache.hadoop.hdfs.server.namenode;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_ADD;
+import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_ADD_CACHE_DIRECTIVE;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_ADD_CACHE_POOL;
-import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_ADD_PATH_BASED_CACHE_DIRECTIVE;
-import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_MODIFY_PATH_BASED_CACHE_DIRECTIVE;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_ALLOCATE_BLOCK_ID;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_ALLOW_SNAPSHOT;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_CANCEL_DELEGATION_TOKEN;
@@ -35,10 +34,11 @@ import static org.apache.hadoop.hdfs.ser
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_GET_DELEGATION_TOKEN;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_INVALID;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_MKDIR;
+import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_MODIFY_CACHE_DIRECTIVE;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_MODIFY_CACHE_POOL;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_REASSIGN_LEASE;
+import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_REMOVE_CACHE_DIRECTIVE;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_REMOVE_CACHE_POOL;
-import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_REMOVE_PATH_BASED_CACHE_DIRECTIVE;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_RENAME;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_RENAME_OLD;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_RENAME_SNAPSHOT;
@@ -64,6 +64,7 @@ import java.io.EOFException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Date;
import java.util.EnumMap;
import java.util.List;
import java.util.zip.CheckedInputStream;
@@ -81,12 +82,12 @@ import org.apache.hadoop.fs.permission.P
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DeprecatedUTF8;
import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.LayoutVersion;
import org.apache.hadoop.hdfs.protocol.LayoutVersion.Feature;
-import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.util.XMLUtils;
import org.apache.hadoop.hdfs.util.XMLUtils.InvalidXmlException;
@@ -109,7 +110,6 @@ import org.xml.sax.helpers.AttributesImp
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
-import com.google.common.base.Strings;
/**
* Helper classes for reading the ops from an InputStream.
@@ -165,11 +165,11 @@ public abstract class FSEditLogOp {
inst.put(OP_RENAME_SNAPSHOT, new RenameSnapshotOp());
inst.put(OP_SET_GENSTAMP_V2, new SetGenstampV2Op());
inst.put(OP_ALLOCATE_BLOCK_ID, new AllocateBlockIdOp());
- inst.put(OP_ADD_PATH_BASED_CACHE_DIRECTIVE,
+ inst.put(OP_ADD_CACHE_DIRECTIVE,
new AddCacheDirectiveInfoOp());
- inst.put(OP_MODIFY_PATH_BASED_CACHE_DIRECTIVE,
+ inst.put(OP_MODIFY_CACHE_DIRECTIVE,
new ModifyCacheDirectiveInfoOp());
- inst.put(OP_REMOVE_PATH_BASED_CACHE_DIRECTIVE,
+ inst.put(OP_REMOVE_CACHE_DIRECTIVE,
new RemoveCacheDirectiveInfoOp());
inst.put(OP_ADD_CACHE_POOL, new AddCachePoolOp());
inst.put(OP_MODIFY_CACHE_POOL, new ModifyCachePoolOp());
@@ -2874,12 +2874,12 @@ public abstract class FSEditLogOp {
CacheDirectiveInfo directive;
public AddCacheDirectiveInfoOp() {
- super(OP_ADD_PATH_BASED_CACHE_DIRECTIVE);
+ super(OP_ADD_CACHE_DIRECTIVE);
}
static AddCacheDirectiveInfoOp getInstance(OpInstanceCache cache) {
return (AddCacheDirectiveInfoOp) cache
- .get(OP_ADD_PATH_BASED_CACHE_DIRECTIVE);
+ .get(OP_ADD_CACHE_DIRECTIVE);
}
public AddCacheDirectiveInfoOp setDirective(
@@ -2889,6 +2889,7 @@ public abstract class FSEditLogOp {
assert(directive.getPath() != null);
assert(directive.getReplication() != null);
assert(directive.getPool() != null);
+ assert(directive.getExpiration() != null);
return this;
}
@@ -2898,11 +2899,13 @@ public abstract class FSEditLogOp {
String path = FSImageSerialization.readString(in);
short replication = FSImageSerialization.readShort(in);
String pool = FSImageSerialization.readString(in);
+ long expiryTime = FSImageSerialization.readLong(in);
directive = new CacheDirectiveInfo.Builder().
setId(id).
setPath(new Path(path)).
setReplication(replication).
setPool(pool).
+ setExpiration(CacheDirectiveInfo.Expiration.newAbsolute(expiryTime)).
build();
readRpcIds(in, logVersion);
}
@@ -2913,6 +2916,8 @@ public abstract class FSEditLogOp {
FSImageSerialization.writeString(directive.getPath().toUri().getPath(), out);
FSImageSerialization.writeShort(directive.getReplication(), out);
FSImageSerialization.writeString(directive.getPool(), out);
+ FSImageSerialization.writeLong(
+ directive.getExpiration().getMillis(), out);
writeRpcIds(rpcClientId, rpcCallId, out);
}
@@ -2925,6 +2930,8 @@ public abstract class FSEditLogOp {
XMLUtils.addSaxString(contentHandler, "REPLICATION",
Short.toString(directive.getReplication()));
XMLUtils.addSaxString(contentHandler, "POOL", directive.getPool());
+ XMLUtils.addSaxString(contentHandler, "EXPIRATION",
+ "" + directive.getExpiration().getMillis());
appendRpcIdsToXml(contentHandler, rpcClientId, rpcCallId);
}
@@ -2935,6 +2942,8 @@ public abstract class FSEditLogOp {
setPath(new Path(st.getValue("PATH"))).
setReplication(Short.parseShort(st.getValue("REPLICATION"))).
setPool(st.getValue("POOL")).
+ setExpiration(CacheDirectiveInfo.Expiration.newAbsolute(
+ Long.parseLong(st.getValue("EXPIRATION")))).
build();
readRpcIdsFromXml(st);
}
@@ -2946,7 +2955,8 @@ public abstract class FSEditLogOp {
builder.append("id=" + directive.getId() + ",");
builder.append("path=" + directive.getPath().toUri().getPath() + ",");
builder.append("replication=" + directive.getReplication() + ",");
- builder.append("pool=" + directive.getPool());
+ builder.append("pool=" + directive.getPool() + ",");
+ builder.append("expiration=" + directive.getExpiration().getMillis());
appendRpcIdsToString(builder, rpcClientId, rpcCallId);
builder.append("]");
return builder.toString();
@@ -2961,12 +2971,12 @@ public abstract class FSEditLogOp {
CacheDirectiveInfo directive;
public ModifyCacheDirectiveInfoOp() {
- super(OP_MODIFY_PATH_BASED_CACHE_DIRECTIVE);
+ super(OP_MODIFY_CACHE_DIRECTIVE);
}
static ModifyCacheDirectiveInfoOp getInstance(OpInstanceCache cache) {
return (ModifyCacheDirectiveInfoOp) cache
- .get(OP_MODIFY_PATH_BASED_CACHE_DIRECTIVE);
+ .get(OP_MODIFY_CACHE_DIRECTIVE);
}
public ModifyCacheDirectiveInfoOp setDirective(
@@ -2991,7 +3001,12 @@ public abstract class FSEditLogOp {
if ((flags & 0x4) != 0) {
builder.setPool(FSImageSerialization.readString(in));
}
- if ((flags & ~0x7) != 0) {
+ if ((flags & 0x8) != 0) {
+ builder.setExpiration(
+ CacheDirectiveInfo.Expiration.newAbsolute(
+ FSImageSerialization.readLong(in)));
+ }
+ if ((flags & ~0xF) != 0) {
throw new IOException("unknown flags set in " +
"ModifyCacheDirectiveInfoOp: " + flags);
}
@@ -3005,7 +3020,8 @@ public abstract class FSEditLogOp {
byte flags = (byte)(
((directive.getPath() != null) ? 0x1 : 0) |
((directive.getReplication() != null) ? 0x2 : 0) |
- ((directive.getPool() != null) ? 0x4 : 0)
+ ((directive.getPool() != null) ? 0x4 : 0) |
+ ((directive.getExpiration() != null) ? 0x8 : 0)
);
out.writeByte(flags);
if (directive.getPath() != null) {
@@ -3018,6 +3034,10 @@ public abstract class FSEditLogOp {
if (directive.getPool() != null) {
FSImageSerialization.writeString(directive.getPool(), out);
}
+ if (directive.getExpiration() != null) {
+ FSImageSerialization.writeLong(directive.getExpiration().getMillis(),
+ out);
+ }
writeRpcIds(rpcClientId, rpcCallId, out);
}
@@ -3036,6 +3056,10 @@ public abstract class FSEditLogOp {
if (directive.getPool() != null) {
XMLUtils.addSaxString(contentHandler, "POOL", directive.getPool());
}
+ if (directive.getExpiration() != null) {
+ XMLUtils.addSaxString(contentHandler, "EXPIRATION",
+ "" + directive.getExpiration().getMillis());
+ }
appendRpcIdsToXml(contentHandler, rpcClientId, rpcCallId);
}
@@ -3056,6 +3080,11 @@ public abstract class FSEditLogOp {
if (pool != null) {
builder.setPool(pool);
}
+ String expiryTime = st.getValueOrNull("EXPIRATION");
+ if (expiryTime != null) {
+ builder.setExpiration(CacheDirectiveInfo.Expiration.newAbsolute(
+ Long.parseLong(expiryTime)));
+ }
this.directive = builder.build();
readRpcIdsFromXml(st);
}
@@ -3075,6 +3104,10 @@ public abstract class FSEditLogOp {
if (directive.getPool() != null) {
builder.append(",").append("pool=").append(directive.getPool());
}
+ if (directive.getExpiration() != null) {
+ builder.append(",").append("expiration=").
+ append(directive.getExpiration().getMillis());
+ }
appendRpcIdsToString(builder, rpcClientId, rpcCallId);
builder.append("]");
return builder.toString();
@@ -3089,12 +3122,12 @@ public abstract class FSEditLogOp {
long id;
public RemoveCacheDirectiveInfoOp() {
- super(OP_REMOVE_PATH_BASED_CACHE_DIRECTIVE);
+ super(OP_REMOVE_CACHE_DIRECTIVE);
}
static RemoveCacheDirectiveInfoOp getInstance(OpInstanceCache cache) {
return (RemoveCacheDirectiveInfoOp) cache
- .get(OP_REMOVE_PATH_BASED_CACHE_DIRECTIVE);
+ .get(OP_REMOVE_CACHE_DIRECTIVE);
}
public RemoveCacheDirectiveInfoOp setId(long id) {
@@ -3162,7 +3195,7 @@ public abstract class FSEditLogOp {
@Override
public void writeFields(DataOutputStream out) throws IOException {
- info .writeTo(out);
+ info.writeTo(out);
writeRpcIds(rpcClientId, rpcCallId, out);
}
Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOpCodes.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOpCodes.java?rev=1547122&r1=1547121&r2=1547122&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOpCodes.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOpCodes.java Mon Dec 2 17:41:44 2013
@@ -64,12 +64,12 @@ public enum FSEditLogOpCodes {
OP_DISALLOW_SNAPSHOT ((byte) 30),
OP_SET_GENSTAMP_V2 ((byte) 31),
OP_ALLOCATE_BLOCK_ID ((byte) 32),
- OP_ADD_PATH_BASED_CACHE_DIRECTIVE ((byte) 33),
- OP_REMOVE_PATH_BASED_CACHE_DIRECTIVE ((byte) 34),
+ OP_ADD_CACHE_DIRECTIVE ((byte) 33),
+ OP_REMOVE_CACHE_DIRECTIVE ((byte) 34),
OP_ADD_CACHE_POOL ((byte) 35),
OP_MODIFY_CACHE_POOL ((byte) 36),
OP_REMOVE_CACHE_POOL ((byte) 37),
- OP_MODIFY_PATH_BASED_CACHE_DIRECTIVE ((byte) 38);
+ OP_MODIFY_CACHE_DIRECTIVE ((byte) 38);
private byte opCode;
Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java?rev=1547122&r1=1547121&r2=1547122&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java Mon Dec 2 17:41:44 2013
@@ -755,7 +755,7 @@ public class FSImage implements Closeabl
* This is an update of existing state of the filesystem and does not
* throw QuotaExceededException.
*/
- static void updateCountForQuota(INodeDirectoryWithQuota root) {
+ static void updateCountForQuota(INodeDirectory root) {
updateCountForQuotaRecursively(root, Quota.Counts.newInstance());
}
@@ -795,7 +795,7 @@ public class FSImage implements Closeabl
+ " quota = " + dsQuota + " < consumed = " + diskspace);
}
- ((INodeDirectoryWithQuota)dir).setSpaceConsumed(namespace, diskspace);
+ dir.getDirectoryWithQuotaFeature().setSpaceConsumed(namespace, diskspace);
}
}