You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by cm...@apache.org on 2013/11/27 18:55:53 UTC
svn commit: r1546143 [1/2] - in
/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs: ./ dev-support/
src/main/java/org/apache/hadoop/hdfs/
src/main/java/org/apache/hadoop/hdfs/client/
src/main/java/org/apache/hadoop/hdfs/protocol/ src/main/java/org/ap...
Author: cmccabe
Date: Wed Nov 27 17:55:52 2013
New Revision: 1546143
URL: http://svn.apache.org/r1546143
Log:
HDFS-5556. Add some more NameNode cache statistics, cache pool stats (cmccabe)
Added:
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/CachePoolEntry.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/CachePoolStats.java
Modified:
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/HdfsAdmin.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/CacheDirective.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/CacheDirectiveStats.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/CachePoolInfo.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStatistics.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/FSDatasetMBean.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CachePool.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeMXBean.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/CacheAdmin.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCacheDirectives.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeMXBean.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRetryCacheWithHA.java
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1546143&r1=1546142&r2=1546143&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Wed Nov 27 17:55:52 2013
@@ -218,6 +218,9 @@ Trunk (Unreleased)
HDFS-5286. Flatten INodeDirectory hierarchy: Replace INodeDirectoryWithQuota
with DirectoryWithQuotaFeature. (szetszwo)
+ HDFS-5556. Add some more NameNode cache statistics, cache pool stats
+ (cmccabe)
+
OPTIMIZATIONS
HDFS-5349. DNA_CACHE and DNA_UNCACHE should be by blockId only. (cmccabe)
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml?rev=1546143&r1=1546142&r2=1546143&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml Wed Nov 27 17:55:52 2013
@@ -352,6 +352,11 @@
<Method name="getReplication" />
<Bug pattern="ICAST_QUESTIONABLE_UNSIGNED_RIGHT_SHIFT" />
</Match>
+ <Match>
+ <Class name="org.apache.hadoop.hdfs.protocol.CacheDirective" />
+ <Method name="insertInternal" />
+ <Bug pattern="BC_UNCONFIRMED_CAST" />
+ </Match>
<!-- These two are used for shutting down and kicking the CRMon, do not need strong sync -->
<Match>
<Class name="org.apache.hadoop.hdfs.server.blockmanagement.CacheReplicationMonitor" />
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java?rev=1546143&r1=1546142&r2=1546143&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java Wed Nov 27 17:55:52 2013
@@ -109,6 +109,7 @@ import org.apache.hadoop.hdfs.client.Cli
import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
+import org.apache.hadoop.hdfs.protocol.CachePoolEntry;
import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks;
@@ -2358,7 +2359,7 @@ public class DFSClient implements java.i
}
}
- public RemoteIterator<CachePoolInfo> listCachePools() throws IOException {
+ public RemoteIterator<CachePoolEntry> listCachePools() throws IOException {
checkOpen();
try {
return namenode.listCachePools("");
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java?rev=1546143&r1=1546142&r2=1546143&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java Wed Nov 27 17:55:52 2013
@@ -58,6 +58,7 @@ import org.apache.hadoop.hdfs.client.Hdf
import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
+import org.apache.hadoop.hdfs.protocol.CachePoolEntry;
import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.DirectoryListing;
@@ -1713,12 +1714,12 @@ public class DistributedFileSystem exten
/**
* List all cache pools.
*
- * @return A remote iterator from which you can get CachePoolInfo objects.
+ * @return A remote iterator from which you can get CachePoolEntry objects.
* Requests will be made as needed.
* @throws IOException
* If there was an error listing cache pools.
*/
- public RemoteIterator<CachePoolInfo> listCachePools() throws IOException {
+ public RemoteIterator<CachePoolEntry> listCachePools() throws IOException {
return dfs.listCachePools();
}
}
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/HdfsAdmin.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/HdfsAdmin.java?rev=1546143&r1=1546142&r2=1546143&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/HdfsAdmin.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/HdfsAdmin.java Wed Nov 27 17:55:52 2013
@@ -29,6 +29,7 @@ import org.apache.hadoop.fs.RemoteIterat
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
+import org.apache.hadoop.hdfs.protocol.CachePoolEntry;
import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.tools.DFSAdmin;
@@ -213,12 +214,12 @@ public class HdfsAdmin {
/**
* List all cache pools.
*
- * @return A remote iterator from which you can get CachePoolInfo objects.
+ * @return A remote iterator from which you can get CachePoolEntry objects.
* Requests will be made as needed.
* @throws IOException
* If there was an error listing cache pools.
*/
- public RemoteIterator<CachePoolInfo> listCachePools() throws IOException {
+ public RemoteIterator<CachePoolEntry> listCachePools() throws IOException {
return dfs.listCachePools();
}
}
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/CacheDirective.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/CacheDirective.java?rev=1546143&r1=1546142&r2=1546143&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/CacheDirective.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/CacheDirective.java Wed Nov 27 17:55:52 2013
@@ -21,6 +21,8 @@ import org.apache.commons.lang.builder.H
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.server.namenode.CachePool;
+import org.apache.hadoop.util.IntrusiveCollection;
+import org.apache.hadoop.util.IntrusiveCollection.Element;
import com.google.common.base.Preconditions;
@@ -30,32 +32,32 @@ import com.google.common.base.Preconditi
* This is an implementation class, not part of the public API.
*/
@InterfaceAudience.Private
-public final class CacheDirective {
- private final long entryId;
+public final class CacheDirective implements IntrusiveCollection.Element {
+ private final long id;
private final String path;
private final short replication;
- private final CachePool pool;
+ private CachePool pool;
private long bytesNeeded;
private long bytesCached;
private long filesAffected;
+ private Element prev;
+ private Element next;
- public CacheDirective(long entryId, String path,
- short replication, CachePool pool) {
- Preconditions.checkArgument(entryId > 0);
- this.entryId = entryId;
+ public CacheDirective(long id, String path,
+ short replication) {
+ Preconditions.checkArgument(id > 0);
+ this.id = id;
Preconditions.checkArgument(replication > 0);
this.path = path;
- Preconditions.checkNotNull(pool);
this.replication = replication;
Preconditions.checkNotNull(path);
- this.pool = pool;
this.bytesNeeded = 0;
this.bytesCached = 0;
this.filesAffected = 0;
}
- public long getEntryId() {
- return entryId;
+ public long getId() {
+ return id;
}
public String getPath() {
@@ -70,9 +72,9 @@ public final class CacheDirective {
return replication;
}
- public CacheDirectiveInfo toDirective() {
+ public CacheDirectiveInfo toInfo() {
return new CacheDirectiveInfo.Builder().
- setId(entryId).
+ setId(id).
setPath(new Path(path)).
setReplication(replication).
setPool(pool.getPoolName()).
@@ -88,13 +90,13 @@ public final class CacheDirective {
}
public CacheDirectiveEntry toEntry() {
- return new CacheDirectiveEntry(toDirective(), toStats());
+ return new CacheDirectiveEntry(toInfo(), toStats());
}
@Override
public String toString() {
StringBuilder builder = new StringBuilder();
- builder.append("{ entryId:").append(entryId).
+ builder.append("{ id:").append(id).
append(", path:").append(path).
append(", replication:").append(replication).
append(", pool:").append(pool).
@@ -113,12 +115,12 @@ public final class CacheDirective {
return false;
}
CacheDirective other = (CacheDirective)o;
- return entryId == other.entryId;
+ return id == other.id;
}
@Override
public int hashCode() {
- return new HashCodeBuilder().append(entryId).toHashCode();
+ return new HashCodeBuilder().append(id).toHashCode();
}
public long getBytesNeeded() {
@@ -156,4 +158,55 @@ public final class CacheDirective {
public void incrementFilesAffected() {
this.filesAffected++;
}
+
+ @SuppressWarnings("unchecked")
+ @Override // IntrusiveCollection.Element
+ public void insertInternal(IntrusiveCollection<? extends Element> list,
+ Element prev, Element next) {
+ assert this.pool == null;
+ this.pool = ((CachePool.DirectiveList)list).getCachePool();
+ this.prev = prev;
+ this.next = next;
+ }
+
+ @Override // IntrusiveCollection.Element
+ public void setPrev(IntrusiveCollection<? extends Element> list, Element prev) {
+ assert list == pool.getDirectiveList();
+ this.prev = prev;
+ }
+
+ @Override // IntrusiveCollection.Element
+ public void setNext(IntrusiveCollection<? extends Element> list, Element next) {
+ assert list == pool.getDirectiveList();
+ this.next = next;
+ }
+
+ @Override // IntrusiveCollection.Element
+ public void removeInternal(IntrusiveCollection<? extends Element> list) {
+ assert list == pool.getDirectiveList();
+ this.pool = null;
+ this.prev = null;
+ this.next = null;
+ }
+
+ @Override // IntrusiveCollection.Element
+ public Element getPrev(IntrusiveCollection<? extends Element> list) {
+ if (list != pool.getDirectiveList()) {
+ return null;
+ }
+ return this.prev;
+ }
+
+ @Override // IntrusiveCollection.Element
+ public Element getNext(IntrusiveCollection<? extends Element> list) {
+ if (list != pool.getDirectiveList()) {
+ return null;
+ }
+ return this.next;
+ }
+
+ @Override // IntrusiveCollection.Element
+ public boolean isInList(IntrusiveCollection<? extends Element> list) {
+ return pool == null ? false : list == pool.getDirectiveList();
+ }
};
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/CacheDirectiveStats.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/CacheDirectiveStats.java?rev=1546143&r1=1546142&r2=1546143&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/CacheDirectiveStats.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/CacheDirectiveStats.java Wed Nov 27 17:55:52 2013
@@ -94,21 +94,21 @@ public class CacheDirectiveStats {
/**
* @return The bytes needed.
*/
- public Long getBytesNeeded() {
+ public long getBytesNeeded() {
return bytesNeeded;
}
/**
* @return The bytes cached.
*/
- public Long getBytesCached() {
+ public long getBytesCached() {
return bytesCached;
}
/**
* @return The files affected.
*/
- public Long getFilesAffected() {
+ public long getFilesAffected() {
return filesAffected;
}
Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/CachePoolEntry.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/CachePoolEntry.java?rev=1546143&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/CachePoolEntry.java (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/CachePoolEntry.java Wed Nov 27 17:55:52 2013
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.protocol;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * Describes a Cache Pool entry.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class CachePoolEntry {
+ private final CachePoolInfo info;
+ private final CachePoolStats stats;
+
+ public CachePoolEntry(CachePoolInfo info, CachePoolStats stats) {
+ this.info = info;
+ this.stats = stats;
+ }
+
+ public CachePoolInfo getInfo() {
+ return info;
+ }
+
+ public CachePoolStats getStats() {
+ return stats;
+ }
+}
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/CachePoolInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/CachePoolInfo.java?rev=1546143&r1=1546142&r2=1546143&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/CachePoolInfo.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/CachePoolInfo.java Wed Nov 27 17:55:52 2013
@@ -30,6 +30,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.InvalidRequestException;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.permission.PermissionStatus;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp;
@@ -150,7 +151,10 @@ public class CachePoolInfo {
public static void validate(CachePoolInfo info) throws IOException {
if (info == null) {
- throw new IOException("CachePoolInfo is null");
+ throw new InvalidRequestException("CachePoolInfo is null");
+ }
+ if ((info.getWeight() != null) && (info.getWeight() < 0)) {
+ throw new InvalidRequestException("CachePool weight is negative.");
}
validateName(info.poolName);
}
Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/CachePoolStats.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/CachePoolStats.java?rev=1546143&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/CachePoolStats.java (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/CachePoolStats.java Wed Nov 27 17:55:52 2013
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.protocol;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * CachePoolStats describes cache pool statistics.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class CachePoolStats {
+ public static class Builder {
+ private long bytesNeeded;
+ private long bytesCached;
+ private long filesAffected;
+
+ public Builder() {
+ }
+
+ public Builder setBytesNeeded(long bytesNeeded) {
+ this.bytesNeeded = bytesNeeded;
+ return this;
+ }
+
+ public Builder setBytesCached(long bytesCached) {
+ this.bytesCached = bytesCached;
+ return this;
+ }
+
+ public Builder setFilesAffected(long filesAffected) {
+ this.filesAffected = filesAffected;
+ return this;
+ }
+
+ public CachePoolStats build() {
+ return new CachePoolStats(bytesNeeded, bytesCached, filesAffected);
+ }
+ };
+
+ private final long bytesNeeded;
+ private final long bytesCached;
+ private final long filesAffected;
+
+ private CachePoolStats(long bytesNeeded, long bytesCached, long filesAffected) {
+ this.bytesNeeded = bytesNeeded;
+ this.bytesCached = bytesCached;
+ this.filesAffected = filesAffected;
+ }
+
+ public long getBytesNeeded() {
+ return bytesNeeded;
+ }
+
+ public long getBytesCached() {
+ return bytesNeeded;
+ }
+
+ public long getFilesAffected() {
+ return filesAffected;
+ }
+
+ public String toString() {
+ return new StringBuilder().append("{").
+ append("bytesNeeded:").append(bytesNeeded).
+ append(", bytesCached:").append(bytesCached).
+ append(", filesAffected:").append(filesAffected).
+ append("}").toString();
+ }
+}
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java?rev=1546143&r1=1546142&r2=1546143&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java Wed Nov 27 17:55:52 2013
@@ -1178,6 +1178,6 @@ public interface ClientProtocol {
* @return A RemoteIterator which returns CachePool objects.
*/
@Idempotent
- public RemoteIterator<CachePoolInfo> listCachePools(String prevPool)
+ public RemoteIterator<CachePoolEntry> listCachePools(String prevPool)
throws IOException;
}
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java?rev=1546143&r1=1546142&r2=1546143&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java Wed Nov 27 17:55:52 2013
@@ -30,6 +30,7 @@ import org.apache.hadoop.fs.RemoteIterat
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
+import org.apache.hadoop.hdfs.protocol.CachePoolEntry;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks;
import org.apache.hadoop.hdfs.protocol.DirectoryListing;
@@ -51,6 +52,8 @@ import org.apache.hadoop.hdfs.protocol.p
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AllowSnapshotResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AppendRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AppendResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CachePoolEntryProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CachePoolInfoProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CompleteRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CompleteResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ConcatRequestProto;
@@ -103,7 +106,6 @@ import org.apache.hadoop.hdfs.protocol.p
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.IsFileClosedRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.IsFileClosedResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCachePoolsRequestProto;
-import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCachePoolsResponseElementProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCachePoolsResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCorruptFileBlocksRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCorruptFileBlocksResponseProto;
@@ -1136,18 +1138,15 @@ public class ClientNamenodeProtocolServe
public ListCachePoolsResponseProto listCachePools(RpcController controller,
ListCachePoolsRequestProto request) throws ServiceException {
try {
- RemoteIterator<CachePoolInfo> iter =
+ RemoteIterator<CachePoolEntry> iter =
server.listCachePools(request.getPrevPoolName());
ListCachePoolsResponseProto.Builder responseBuilder =
ListCachePoolsResponseProto.newBuilder();
String prevPoolName = null;
while (iter.hasNext()) {
- CachePoolInfo pool = iter.next();
- ListCachePoolsResponseElementProto.Builder elemBuilder =
- ListCachePoolsResponseElementProto.newBuilder();
- elemBuilder.setInfo(PBHelper.convert(pool));
- responseBuilder.addElements(elemBuilder.build());
- prevPoolName = pool.getPoolName();
+ CachePoolEntry entry = iter.next();
+ responseBuilder.addEntries(PBHelper.convert(entry));
+ prevPoolName = entry.getInfo().getPoolName();
}
// fill in hasNext
if (prevPoolName == null) {
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java?rev=1546143&r1=1546142&r2=1546143&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java Wed Nov 27 17:55:52 2013
@@ -38,6 +38,7 @@ import org.apache.hadoop.fs.permission.F
import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
+import org.apache.hadoop.hdfs.protocol.CachePoolEntry;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks;
import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException;
@@ -61,6 +62,7 @@ import org.apache.hadoop.hdfs.protocol.p
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AllowSnapshotRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AppendRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AppendResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CachePoolEntryProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CompleteRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ConcatRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CreateRequestProto;
@@ -96,7 +98,6 @@ import org.apache.hadoop.hdfs.protocol.p
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetSnapshottableDirListingResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.IsFileClosedRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCachePoolsRequestProto;
-import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCachePoolsResponseElementProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCachePoolsResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCorruptFileBlocksRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCacheDirectivesRequestProto;
@@ -1138,23 +1139,23 @@ public class ClientNamenodeProtocolTrans
}
}
- private static class BatchedCachePoolInfo
- implements BatchedEntries<CachePoolInfo> {
+ private static class BatchedCachePoolEntries
+ implements BatchedEntries<CachePoolEntry> {
private final ListCachePoolsResponseProto proto;
- public BatchedCachePoolInfo(ListCachePoolsResponseProto proto) {
+ public BatchedCachePoolEntries(ListCachePoolsResponseProto proto) {
this.proto = proto;
}
@Override
- public CachePoolInfo get(int i) {
- ListCachePoolsResponseElementProto elem = proto.getElements(i);
- return PBHelper.convert(elem.getInfo());
+ public CachePoolEntry get(int i) {
+ CachePoolEntryProto elem = proto.getEntries(i);
+ return PBHelper.convert(elem);
}
@Override
public int size() {
- return proto.getElementsCount();
+ return proto.getEntriesCount();
}
@Override
@@ -1162,19 +1163,19 @@ public class ClientNamenodeProtocolTrans
return proto.getHasMore();
}
}
-
+
private class CachePoolIterator
- extends BatchedRemoteIterator<String, CachePoolInfo> {
+ extends BatchedRemoteIterator<String, CachePoolEntry> {
public CachePoolIterator(String prevKey) {
super(prevKey);
}
@Override
- public BatchedEntries<CachePoolInfo> makeRequest(String prevKey)
+ public BatchedEntries<CachePoolEntry> makeRequest(String prevKey)
throws IOException {
try {
- return new BatchedCachePoolInfo(
+ return new BatchedCachePoolEntries(
rpcProxy.listCachePools(null,
ListCachePoolsRequestProto.newBuilder().
setPrevPoolName(prevKey).build()));
@@ -1184,13 +1185,13 @@ public class ClientNamenodeProtocolTrans
}
@Override
- public String elementToPrevKey(CachePoolInfo element) {
- return element.getPoolName();
+ public String elementToPrevKey(CachePoolEntry entry) {
+ return entry.getInfo().getPoolName();
}
}
@Override
- public RemoteIterator<CachePoolInfo> listCachePools(String prevKey)
+ public RemoteIterator<CachePoolEntry> listCachePools(String prevKey)
throws IOException {
return new CachePoolIterator(prevKey);
}
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java?rev=1546143&r1=1546142&r2=1546143&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java Wed Nov 27 17:55:52 2013
@@ -38,7 +38,9 @@ import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveStats;
+import org.apache.hadoop.hdfs.protocol.CachePoolEntry;
import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
+import org.apache.hadoop.hdfs.protocol.CachePoolStats;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
@@ -60,7 +62,9 @@ import org.apache.hadoop.hdfs.protocol.S
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CacheDirectiveEntryProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CacheDirectiveStatsProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CachePoolEntryProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CachePoolInfoProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CachePoolStatsProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CreateFlagProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.DatanodeReportTypeProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFsStatsResponseProto;
@@ -1678,6 +1682,35 @@ public class PBHelper {
return info;
}
+ public static CachePoolStatsProto convert(CachePoolStats stats) {
+ CachePoolStatsProto.Builder builder = CachePoolStatsProto.newBuilder();
+ builder.setBytesNeeded(stats.getBytesNeeded());
+ builder.setBytesCached(stats.getBytesCached());
+ builder.setFilesAffected(stats.getFilesAffected());
+ return builder.build();
+ }
+
+ public static CachePoolStats convert (CachePoolStatsProto proto) {
+ CachePoolStats.Builder builder = new CachePoolStats.Builder();
+ builder.setBytesNeeded(proto.getBytesNeeded());
+ builder.setBytesCached(proto.getBytesCached());
+ builder.setFilesAffected(proto.getFilesAffected());
+ return builder.build();
+ }
+
+ public static CachePoolEntryProto convert(CachePoolEntry entry) {
+ CachePoolEntryProto.Builder builder = CachePoolEntryProto.newBuilder();
+ builder.setInfo(PBHelper.convert(entry.getInfo()));
+ builder.setStats(PBHelper.convert(entry.getStats()));
+ return builder.build();
+ }
+
+ public static CachePoolEntry convert (CachePoolEntryProto proto) {
+ CachePoolInfo info = PBHelper.convert(proto.getInfo());
+ CachePoolStats stats = PBHelper.convert(proto.getStats());
+ return new CachePoolEntry(info, stats);
+ }
+
public static HdfsProtos.ChecksumTypeProto convert(DataChecksum.Type type) {
return HdfsProtos.ChecksumTypeProto.valueOf(type.id);
}
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java?rev=1546143&r1=1546142&r2=1546143&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java Wed Nov 27 17:55:52 2013
@@ -208,8 +208,6 @@ public class CacheReplicationMonitor ext
/**
* Scan all CacheDirectives. Use the information to figure out
* what cache replication factor each block should have.
- *
- * @param mark Whether the current scan is setting or clearing the mark
*/
private void rescanCacheDirectives() {
FSDirectory fsDir = namesystem.getFSDirectory();
@@ -301,7 +299,7 @@ public class CacheReplicationMonitor ext
pce.addBytesNeeded(neededTotal);
pce.addBytesCached(cachedTotal);
if (LOG.isTraceEnabled()) {
- LOG.debug("Directive " + pce.getEntryId() + " is caching " +
+ LOG.debug("Directive " + pce.getId() + " is caching " +
file.getFullPathName() + ": " + cachedTotal + "/" + neededTotal);
}
}
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStatistics.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStatistics.java?rev=1546143&r1=1546142&r2=1546143&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStatistics.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStatistics.java Wed Nov 27 17:55:52 2013
@@ -42,6 +42,12 @@ public interface DatanodeStatistics {
/** @return the percentage of the block pool used space over the total capacity. */
public float getPercentBlockPoolUsed();
+
+ /** @return the total cache capacity of all DataNodes */
+ public long getCacheCapacity();
+
+ /** @return the total cache used by all DataNodes */
+ public long getCacheUsed();
/** @return the xceiver count */
public int getXceiverCount();
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java?rev=1546143&r1=1546142&r2=1546143&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java Wed Nov 27 17:55:52 2013
@@ -148,6 +148,17 @@ class HeartbeatManager implements Datano
public synchronized int getXceiverCount() {
return stats.xceiverCount;
}
+
+ @Override
+ public synchronized long getCacheCapacity() {
+ return stats.cacheCapacity;
+ }
+
+ @Override
+ public synchronized long getCacheUsed() {
+ return stats.cacheUsed;
+ }
+
@Override
public synchronized long[] getStats() {
@@ -308,6 +319,8 @@ class HeartbeatManager implements Datano
private long capacityRemaining = 0L;
private long blockPoolUsed = 0L;
private int xceiverCount = 0;
+ private long cacheCapacity = 0L;
+ private long cacheUsed = 0L;
private int expiredHeartbeats = 0;
@@ -321,6 +334,8 @@ class HeartbeatManager implements Datano
} else {
capacityTotal += node.getDfsUsed();
}
+ cacheCapacity += node.getCacheCapacity();
+ cacheUsed += node.getCacheUsed();
}
private void subtract(final DatanodeDescriptor node) {
@@ -333,6 +348,8 @@ class HeartbeatManager implements Datano
} else {
capacityTotal -= node.getDfsUsed();
}
+ cacheCapacity -= node.getCacheCapacity();
+ cacheUsed -= node.getCacheUsed();
}
/** Increment expired heartbeat counter. */
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java?rev=1546143&r1=1546142&r2=1546143&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java Wed Nov 27 17:55:52 2013
@@ -145,6 +145,8 @@ public class FsDatasetCache {
*/
private final HashMap<Key, Value> mappableBlockMap = new HashMap<Key, Value>();
+ private final AtomicLong numBlocksCached = new AtomicLong(0);
+
private final FsDatasetImpl dataset;
private final ThreadPoolExecutor uncachingExecutor;
@@ -417,6 +419,7 @@ public class FsDatasetCache {
LOG.debug("Successfully cached block " + key.id + " in " + key.bpid +
". We are now caching " + newUsedBytes + " bytes in total.");
}
+ numBlocksCached.addAndGet(1);
success = true;
} finally {
if (!success) {
@@ -465,6 +468,7 @@ public class FsDatasetCache {
}
long newUsedBytes =
usedBytesCount.release(value.mappableBlock.getLength());
+ numBlocksCached.addAndGet(-1);
if (LOG.isDebugEnabled()) {
LOG.debug("Uncaching of block " + key.id + " in " + key.bpid +
" completed. usedBytes = " + newUsedBytes);
@@ -477,14 +481,14 @@ public class FsDatasetCache {
/**
* Get the approximate amount of cache space used.
*/
- public long getDnCacheUsed() {
+ public long getCacheUsed() {
return usedBytesCount.get();
}
/**
* Get the maximum amount of bytes we can cache. This is a constant.
*/
- public long getDnCacheCapacity() {
+ public long getCacheCapacity() {
return maxBytes;
}
@@ -496,4 +500,7 @@ public class FsDatasetCache {
return numBlocksFailedToUncache.get();
}
+ public long getNumBlocksCached() {
+ return numBlocksCached.get();
+ }
}
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java?rev=1546143&r1=1546142&r2=1546143&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java Wed Nov 27 17:55:52 2013
@@ -292,12 +292,12 @@ class FsDatasetImpl implements FsDataset
@Override // FSDatasetMBean
public long getCacheUsed() {
- return cacheManager.getDnCacheUsed();
+ return cacheManager.getCacheUsed();
}
@Override // FSDatasetMBean
public long getCacheCapacity() {
- return cacheManager.getDnCacheCapacity();
+ return cacheManager.getCacheCapacity();
}
@Override // FSDatasetMBean
@@ -310,6 +310,11 @@ class FsDatasetImpl implements FsDataset
return cacheManager.getNumBlocksFailedToUncache();
}
+ @Override // FSDatasetMBean
+ public long getNumBlocksCached() {
+ return cacheManager.getNumBlocksCached();
+ }
+
/**
* Find the block's on-disk length
*/
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/FSDatasetMBean.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/FSDatasetMBean.java?rev=1546143&r1=1546142&r2=1546143&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/FSDatasetMBean.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/FSDatasetMBean.java Wed Nov 27 17:55:52 2013
@@ -89,6 +89,11 @@ public interface FSDatasetMBean {
public long getCacheCapacity();
/**
+ * Returns the number of blocks cached.
+ */
+ public long getNumBlocksCached();
+
+ /**
* Returns the number of blocks that the datanode was unable to cache
*/
public long getNumBlocksFailedToCache();
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java?rev=1546143&r1=1546142&r2=1546143&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java Wed Nov 27 17:55:52 2013
@@ -49,6 +49,7 @@ import org.apache.hadoop.fs.permission.F
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
+import org.apache.hadoop.hdfs.protocol.CachePoolEntry;
import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
@@ -99,24 +100,24 @@ public final class CacheManager {
private final BlockManager blockManager;
/**
- * Cache entries, sorted by ID.
+ * Cache directives, sorted by ID.
*
* listCacheDirectives relies on the ordering of elements in this map
* to track what has already been listed by the client.
*/
- private final TreeMap<Long, CacheDirective> entriesById =
+ private final TreeMap<Long, CacheDirective> directivesById =
new TreeMap<Long, CacheDirective>();
/**
- * The entry ID to use for a new entry. Entry IDs always increase, and are
+ * The directive ID to use for a new directive. IDs always increase, and are
* never reused.
*/
- private long nextEntryId;
+ private long nextDirectiveId;
/**
- * Cache entries, sorted by path
+ * Cache directives, sorted by path
*/
- private final TreeMap<String, List<CacheDirective>> entriesByPath =
+ private final TreeMap<String, List<CacheDirective>> directivesByPath =
new TreeMap<String, List<CacheDirective>>();
/**
@@ -177,7 +178,7 @@ public final class CacheManager {
BlockManager blockManager) {
this.namesystem = namesystem;
this.blockManager = blockManager;
- this.nextEntryId = 1;
+ this.nextDirectiveId = 1;
this.maxListCachePoolsResponses = conf.getInt(
DFS_NAMENODE_LIST_CACHE_POOLS_NUM_RESPONSES,
DFS_NAMENODE_LIST_CACHE_POOLS_NUM_RESPONSES_DEFAULT);
@@ -239,7 +240,7 @@ public final class CacheManager {
public TreeMap<Long, CacheDirective> getEntriesById() {
assert namesystem.hasReadLock();
- return entriesById;
+ return directivesById;
}
@VisibleForTesting
@@ -250,10 +251,10 @@ public final class CacheManager {
private long getNextEntryId() throws IOException {
assert namesystem.hasWriteLock();
- if (nextEntryId >= Long.MAX_VALUE - 1) {
+ if (nextDirectiveId >= Long.MAX_VALUE - 1) {
throw new IOException("No more available IDs.");
}
- return nextEntryId++;
+ return nextDirectiveId++;
}
// Helper getter / validation methods
@@ -301,7 +302,7 @@ public final class CacheManager {
}
/**
- * Get a CacheDirective by ID, validating the ID and that the entry
+ * Get a CacheDirective by ID, validating the ID and that the directive
* exists.
*/
private CacheDirective getById(long id) throws InvalidRequestException {
@@ -309,13 +310,13 @@ public final class CacheManager {
if (id <= 0) {
throw new InvalidRequestException("Invalid negative ID.");
}
- // Find the entry.
- CacheDirective entry = entriesById.get(id);
- if (entry == null) {
+ // Find the directive.
+ CacheDirective directive = directivesById.get(id);
+ if (directive == null) {
throw new InvalidRequestException("No directive with ID " + id
+ " found.");
}
- return entry;
+ return directive;
}
/**
@@ -332,32 +333,34 @@ public final class CacheManager {
// RPC handlers
- private void addInternal(CacheDirective entry) {
- entriesById.put(entry.getEntryId(), entry);
- String path = entry.getPath();
- List<CacheDirective> entryList = entriesByPath.get(path);
- if (entryList == null) {
- entryList = new ArrayList<CacheDirective>(1);
- entriesByPath.put(path, entryList);
+ private void addInternal(CacheDirective directive, CachePool pool) {
+ boolean addedDirective = pool.getDirectiveList().add(directive);
+ assert addedDirective;
+ directivesById.put(directive.getId(), directive);
+ String path = directive.getPath();
+ List<CacheDirective> directives = directivesByPath.get(path);
+ if (directives == null) {
+ directives = new ArrayList<CacheDirective>(1);
+ directivesByPath.put(path, directives);
}
- entryList.add(entry);
+ directives.add(directive);
}
public CacheDirectiveInfo addDirective(
- CacheDirectiveInfo directive, FSPermissionChecker pc)
+ CacheDirectiveInfo info, FSPermissionChecker pc)
throws IOException {
assert namesystem.hasWriteLock();
- CacheDirective entry;
+ CacheDirective directive;
try {
- CachePool pool = getCachePool(validatePoolName(directive));
+ CachePool pool = getCachePool(validatePoolName(info));
checkWritePermission(pc, pool);
- String path = validatePath(directive);
- short replication = validateReplication(directive, (short)1);
+ String path = validatePath(info);
+ short replication = validateReplication(info, (short)1);
long id;
- if (directive.getId() != null) {
- // We are loading an entry from the edit log.
+ if (info.getId() != null) {
+ // We are loading a directive from the edit log.
// Use the ID from the edit log.
- id = directive.getId();
+ id = info.getId();
if (id <= 0) {
throw new InvalidRequestException("can't add an ID " +
"of " + id + ": it is not positive.");
@@ -366,88 +369,90 @@ public final class CacheManager {
throw new InvalidRequestException("can't add an ID " +
"of " + id + ": it is too big.");
}
- if (nextEntryId <= id) {
- nextEntryId = id + 1;
+ if (nextDirectiveId <= id) {
+ nextDirectiveId = id + 1;
}
} else {
- // Add a new entry with the next available ID.
+ // Add a new directive with the next available ID.
id = getNextEntryId();
}
- entry = new CacheDirective(id, path, replication, pool);
- addInternal(entry);
+ directive = new CacheDirective(id, path, replication);
+ addInternal(directive, pool);
} catch (IOException e) {
- LOG.warn("addDirective of " + directive + " failed: ", e);
+ LOG.warn("addDirective of " + info + " failed: ", e);
throw e;
}
- LOG.info("addDirective of " + directive + " successful.");
+ LOG.info("addDirective of " + info + " successful.");
if (monitor != null) {
monitor.kick();
}
- return entry.toDirective();
+ return directive.toInfo();
}
- public void modifyDirective(CacheDirectiveInfo directive,
+ public void modifyDirective(CacheDirectiveInfo info,
FSPermissionChecker pc) throws IOException {
assert namesystem.hasWriteLock();
String idString =
- (directive.getId() == null) ?
- "(null)" : directive.getId().toString();
+ (info.getId() == null) ?
+ "(null)" : info.getId().toString();
try {
// Check for invalid IDs.
- Long id = directive.getId();
+ Long id = info.getId();
if (id == null) {
throw new InvalidRequestException("Must supply an ID.");
}
CacheDirective prevEntry = getById(id);
checkWritePermission(pc, prevEntry.getPool());
String path = prevEntry.getPath();
- if (directive.getPath() != null) {
- path = validatePath(directive);
+ if (info.getPath() != null) {
+ path = validatePath(info);
}
short replication = prevEntry.getReplication();
- if (directive.getReplication() != null) {
- replication = validateReplication(directive, replication);
+ if (info.getReplication() != null) {
+ replication = validateReplication(info, replication);
}
CachePool pool = prevEntry.getPool();
- if (directive.getPool() != null) {
- pool = getCachePool(validatePoolName(directive));
+ if (info.getPool() != null) {
+ pool = getCachePool(validatePoolName(info));
checkWritePermission(pc, pool);
}
removeInternal(prevEntry);
CacheDirective newEntry =
- new CacheDirective(id, path, replication, pool);
- addInternal(newEntry);
+ new CacheDirective(id, path, replication);
+ addInternal(newEntry, pool);
} catch (IOException e) {
LOG.warn("modifyDirective of " + idString + " failed: ", e);
throw e;
}
LOG.info("modifyDirective of " + idString + " successfully applied " +
- directive + ".");
+ info+ ".");
}
- public void removeInternal(CacheDirective existing)
+ public void removeInternal(CacheDirective directive)
throws InvalidRequestException {
assert namesystem.hasWriteLock();
- // Remove the corresponding entry in entriesByPath.
- String path = existing.getPath();
- List<CacheDirective> entries = entriesByPath.get(path);
- if (entries == null || !entries.remove(existing)) {
+ // Remove the corresponding entry in directivesByPath.
+ String path = directive.getPath();
+ List<CacheDirective> directives = directivesByPath.get(path);
+ if (directives == null || !directives.remove(directive)) {
throw new InvalidRequestException("Failed to locate entry " +
- existing.getEntryId() + " by path " + existing.getPath());
+ directive.getId() + " by path " + directive.getPath());
}
- if (entries.size() == 0) {
- entriesByPath.remove(path);
+ if (directives.size() == 0) {
+ directivesByPath.remove(path);
}
- entriesById.remove(existing.getEntryId());
+ directivesById.remove(directive.getId());
+ directive.getPool().getDirectiveList().remove(directive);
+ assert directive.getPool() == null;
}
public void removeDirective(long id, FSPermissionChecker pc)
throws IOException {
assert namesystem.hasWriteLock();
try {
- CacheDirective existing = getById(id);
- checkWritePermission(pc, existing.getPool());
- removeInternal(existing);
+ CacheDirective directive = getById(id);
+ checkWritePermission(pc, directive.getPool());
+ removeInternal(directive);
} catch (IOException e) {
LOG.warn("removeDirective of " + id + " failed: ", e);
throw e;
@@ -478,13 +483,13 @@ public final class CacheManager {
new ArrayList<CacheDirectiveEntry>(NUM_PRE_ALLOCATED_ENTRIES);
int numReplies = 0;
SortedMap<Long, CacheDirective> tailMap =
- entriesById.tailMap(prevId + 1);
+ directivesById.tailMap(prevId + 1);
for (Entry<Long, CacheDirective> cur : tailMap.entrySet()) {
if (numReplies >= maxListCacheDirectivesNumResponses) {
return new BatchedListEntries<CacheDirectiveEntry>(replies, true);
}
- CacheDirective curEntry = cur.getValue();
- CacheDirectiveInfo info = cur.getValue().toDirective();
+ CacheDirective curDirective = cur.getValue();
+ CacheDirectiveInfo info = cur.getValue().toInfo();
if (filter.getPool() != null &&
!info.getPool().equals(filter.getPool())) {
continue;
@@ -496,7 +501,7 @@ public final class CacheManager {
boolean hasPermission = true;
if (pc != null) {
try {
- pc.checkPermission(curEntry.getPool(), FsAction.READ);
+ pc.checkPermission(curDirective.getPool(), FsAction.READ);
} catch (AccessControlException e) {
hasPermission = false;
}
@@ -530,7 +535,7 @@ public final class CacheManager {
pool = CachePool.createFromInfoAndDefaults(info);
cachePools.put(pool.getPoolName(), pool);
LOG.info("Created new cache pool " + pool);
- return pool.getInfo(null);
+ return pool.getInfo(true);
}
/**
@@ -599,39 +604,34 @@ public final class CacheManager {
throw new InvalidRequestException(
"Cannot remove non-existent cache pool " + poolName);
}
-
- // Remove entries using this pool
- // TODO: could optimize this somewhat to avoid the need to iterate
- // over all entries in entriesById
- Iterator<Entry<Long, CacheDirective>> iter =
- entriesById.entrySet().iterator();
+ // Remove all directives in this pool.
+ Iterator<CacheDirective> iter = pool.getDirectiveList().iterator();
while (iter.hasNext()) {
- Entry<Long, CacheDirective> entry = iter.next();
- if (entry.getValue().getPool() == pool) {
- entriesByPath.remove(entry.getValue().getPath());
- iter.remove();
- }
+ CacheDirective directive = iter.next();
+ directivesByPath.remove(directive.getPath());
+ directivesById.remove(directive.getId());
+ iter.remove();
}
if (monitor != null) {
monitor.kick();
}
}
- public BatchedListEntries<CachePoolInfo>
+ public BatchedListEntries<CachePoolEntry>
listCachePools(FSPermissionChecker pc, String prevKey) {
assert namesystem.hasReadLock();
final int NUM_PRE_ALLOCATED_ENTRIES = 16;
- ArrayList<CachePoolInfo> results =
- new ArrayList<CachePoolInfo>(NUM_PRE_ALLOCATED_ENTRIES);
+ ArrayList<CachePoolEntry> results =
+ new ArrayList<CachePoolEntry>(NUM_PRE_ALLOCATED_ENTRIES);
SortedMap<String, CachePool> tailMap = cachePools.tailMap(prevKey, false);
int numListed = 0;
for (Entry<String, CachePool> cur : tailMap.entrySet()) {
if (numListed++ >= maxListCachePoolsResponses) {
- return new BatchedListEntries<CachePoolInfo>(results, true);
+ return new BatchedListEntries<CachePoolEntry>(results, true);
}
- results.add(cur.getValue().getInfo(pc));
+ results.add(cur.getValue().getEntry(pc));
}
- return new BatchedListEntries<CachePoolInfo>(results, false);
+ return new BatchedListEntries<CachePoolEntry>(results, false);
}
public void setCachedLocations(LocatedBlock block) {
@@ -693,13 +693,6 @@ public final class CacheManager {
for (Iterator<Long> iter = blockIds.iterator(); iter.hasNext(); ) {
Block block = new Block(iter.next());
BlockInfo blockInfo = blockManager.getStoredBlock(block);
- if (blockInfo.getGenerationStamp() < block.getGenerationStamp()) {
- // The NameNode will eventually remove or update the out-of-date block.
- // Until then, we pretend that it isn't cached.
- LOG.warn("Genstamp in cache report disagrees with our genstamp for " +
- block + ": expected genstamp " + blockInfo.getGenerationStamp());
- continue;
- }
if (!blockInfo.isComplete()) {
LOG.warn("Ignoring block id " + block.getBlockId() + ", because " +
"it is in not complete yet. It is in state " +
@@ -743,9 +736,9 @@ public final class CacheManager {
*/
public void saveState(DataOutput out, String sdPath)
throws IOException {
- out.writeLong(nextEntryId);
+ out.writeLong(nextDirectiveId);
savePools(out, sdPath);
- saveEntries(out, sdPath);
+ saveDirectives(out, sdPath);
}
/**
@@ -755,10 +748,10 @@ public final class CacheManager {
* @throws IOException
*/
public void loadState(DataInput in) throws IOException {
- nextEntryId = in.readLong();
- // pools need to be loaded first since entries point to their parent pool
+ nextDirectiveId = in.readLong();
+ // pools need to be loaded first since directives point to their parent pool
loadPools(in);
- loadEntries(in);
+ loadDirectives(in);
}
/**
@@ -773,7 +766,7 @@ public final class CacheManager {
Counter counter = prog.getCounter(Phase.SAVING_CHECKPOINT, step);
out.writeInt(cachePools.size());
for (CachePool pool: cachePools.values()) {
- pool.getInfo(null).writeTo(out);
+ pool.getInfo(true).writeTo(out);
counter.increment();
}
prog.endStep(Phase.SAVING_CHECKPOINT, step);
@@ -782,19 +775,19 @@ public final class CacheManager {
/*
* Save cache entries to fsimage
*/
- private void saveEntries(DataOutput out, String sdPath)
+ private void saveDirectives(DataOutput out, String sdPath)
throws IOException {
StartupProgress prog = NameNode.getStartupProgress();
Step step = new Step(StepType.CACHE_ENTRIES, sdPath);
prog.beginStep(Phase.SAVING_CHECKPOINT, step);
- prog.setTotal(Phase.SAVING_CHECKPOINT, step, entriesById.size());
+ prog.setTotal(Phase.SAVING_CHECKPOINT, step, directivesById.size());
Counter counter = prog.getCounter(Phase.SAVING_CHECKPOINT, step);
- out.writeInt(entriesById.size());
- for (CacheDirective entry: entriesById.values()) {
- out.writeLong(entry.getEntryId());
- Text.writeString(out, entry.getPath());
- out.writeShort(entry.getReplication());
- Text.writeString(out, entry.getPool().getPoolName());
+ out.writeInt(directivesById.size());
+ for (CacheDirective directive : directivesById.values()) {
+ out.writeLong(directive.getId());
+ Text.writeString(out, directive.getPath());
+ out.writeShort(directive.getReplication());
+ Text.writeString(out, directive.getPool().getPoolName());
counter.increment();
}
prog.endStep(Phase.SAVING_CHECKPOINT, step);
@@ -819,38 +812,41 @@ public final class CacheManager {
}
/**
- * Load cache entries from the fsimage
+ * Load cache directives from the fsimage
*/
- private void loadEntries(DataInput in) throws IOException {
+ private void loadDirectives(DataInput in) throws IOException {
StartupProgress prog = NameNode.getStartupProgress();
Step step = new Step(StepType.CACHE_ENTRIES);
prog.beginStep(Phase.LOADING_FSIMAGE, step);
- int numberOfEntries = in.readInt();
- prog.setTotal(Phase.LOADING_FSIMAGE, step, numberOfEntries);
+ int numDirectives = in.readInt();
+ prog.setTotal(Phase.LOADING_FSIMAGE, step, numDirectives);
Counter counter = prog.getCounter(Phase.LOADING_FSIMAGE, step);
- for (int i = 0; i < numberOfEntries; i++) {
- long entryId = in.readLong();
+ for (int i = 0; i < numDirectives; i++) {
+ long directiveId = in.readLong();
String path = Text.readString(in);
short replication = in.readShort();
String poolName = Text.readString(in);
// Get pool reference by looking it up in the map
CachePool pool = cachePools.get(poolName);
if (pool == null) {
- throw new IOException("Entry refers to pool " + poolName +
+ throw new IOException("Directive refers to pool " + poolName +
", which does not exist.");
}
- CacheDirective entry =
- new CacheDirective(entryId, path, replication, pool);
- if (entriesById.put(entry.getEntryId(), entry) != null) {
- throw new IOException("An entry with ID " + entry.getEntryId() +
+ CacheDirective directive =
+ new CacheDirective(directiveId, path, replication);
+ boolean addedDirective = pool.getDirectiveList().add(directive);
+ assert addedDirective;
+ if (directivesById.put(directive.getId(), directive) != null) {
+ throw new IOException("A directive with ID " + directive.getId() +
" already exists");
}
- List<CacheDirective> entries = entriesByPath.get(entry.getPath());
- if (entries == null) {
- entries = new LinkedList<CacheDirective>();
- entriesByPath.put(entry.getPath(), entries);
+ List<CacheDirective> directives =
+ directivesByPath.get(directive.getPath());
+ if (directives == null) {
+ directives = new LinkedList<CacheDirective>();
+ directivesByPath.put(directive.getPath(), directives);
}
- entries.add(entry);
+ directives.add(directive);
counter.increment();
}
prog.endStep(Phase.LOADING_FSIMAGE, step);
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CachePool.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CachePool.java?rev=1546143&r1=1546142&r2=1546143&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CachePool.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CachePool.java Wed Nov 27 17:55:52 2013
@@ -26,9 +26,13 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.protocol.CacheDirective;
+import org.apache.hadoop.hdfs.protocol.CachePoolEntry;
import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
+import org.apache.hadoop.hdfs.protocol.CachePoolStats;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.IntrusiveCollection;
import com.google.common.base.Preconditions;
@@ -69,6 +73,22 @@ public final class CachePool {
private int weight;
+ public final static class DirectiveList
+ extends IntrusiveCollection<CacheDirective> {
+ private CachePool cachePool;
+
+ private DirectiveList(CachePool cachePool) {
+ this.cachePool = cachePool;
+ }
+
+ public CachePool getCachePool() {
+ return cachePool;
+ }
+ }
+
+ @Nonnull
+ private final DirectiveList directiveList = new DirectiveList(this);
+
/**
* Create a new cache pool based on a CachePoolInfo object and the defaults.
* We will fill in information that was not supplied according to the
@@ -171,7 +191,7 @@ public final class CachePool {
* @return
* Cache pool information.
*/
- private CachePoolInfo getInfo(boolean fullInfo) {
+ CachePoolInfo getInfo(boolean fullInfo) {
CachePoolInfo info = new CachePoolInfo(poolName);
if (!fullInfo) {
return info;
@@ -183,15 +203,28 @@ public final class CachePool {
}
/**
+ * Get statistics about this CachePool.
+ *
+ * @return Cache pool statistics.
+ */
+ private CachePoolStats getStats() {
+ return new CachePoolStats.Builder().
+ setBytesNeeded(0).
+ setBytesCached(0).
+ setFilesAffected(0).
+ build();
+ }
+
+ /**
* Returns a CachePoolInfo describing this CachePool based on the permissions
* of the calling user. Unprivileged users will see only minimal descriptive
* information about the pool.
*
* @param pc Permission checker to be used to validate the user's permissions,
* or null
- * @return CachePoolInfo describing this CachePool
+ * @return CachePoolEntry describing this CachePool
*/
- public CachePoolInfo getInfo(FSPermissionChecker pc) {
+ public CachePoolEntry getEntry(FSPermissionChecker pc) {
boolean hasPermission = true;
if (pc != null) {
try {
@@ -200,7 +233,8 @@ public final class CachePool {
hasPermission = false;
}
}
- return getInfo(hasPermission);
+ return new CachePoolEntry(getInfo(hasPermission),
+ hasPermission ? getStats() : new CachePoolStats.Builder().build());
}
public String toString() {
@@ -212,4 +246,8 @@ public final class CachePool {
append(", weight:").append(weight).
append(" }").toString();
}
+
+ public DirectiveList getDirectiveList() {
+ return directiveList;
+ }
}
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1546143&r1=1546142&r2=1546143&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Wed Nov 27 17:55:52 2013
@@ -164,6 +164,7 @@ import org.apache.hadoop.hdfs.protocol.H
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.protocol.CachePoolEntry;
import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
@@ -6430,6 +6431,16 @@ public class FSNamesystem implements Nam
}
@Override // NameNodeMXBean
+ public long getCacheCapacity() {
+ return datanodeStatistics.getCacheCapacity();
+ }
+
+ @Override // NameNodeMXBean
+ public long getCacheUsed() {
+ return datanodeStatistics.getCacheUsed();
+ }
+
+ @Override // NameNodeMXBean
public long getTotalBlocks() {
return getBlocksTotal();
}
@@ -7285,11 +7296,11 @@ public class FSNamesystem implements Nam
getEditLog().logSync();
}
- public BatchedListEntries<CachePoolInfo> listCachePools(String prevKey)
+ public BatchedListEntries<CachePoolEntry> listCachePools(String prevKey)
throws IOException {
final FSPermissionChecker pc =
isPermissionEnabled ? getPermissionChecker() : null;
- BatchedListEntries<CachePoolInfo> results;
+ BatchedListEntries<CachePoolEntry> results;
checkOperation(OperationCategory.READ);
boolean success = false;
readLock();
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeMXBean.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeMXBean.java?rev=1546143&r1=1546142&r2=1546143&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeMXBean.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeMXBean.java Wed Nov 27 17:55:52 2013
@@ -101,6 +101,16 @@ public interface NameNodeMXBean {
* @return the percentage of the remaining space on the cluster
*/
public float getPercentRemaining();
+
+ /**
+ * Returns the amount of cache used by the datanode (in bytes).
+ */
+ public long getCacheUsed();
+
+ /**
+ * Returns the total cache capacity of the datanode (in bytes).
+ */
+ public long getCacheCapacity();
/**
* Get the total space used by the block pools of this namenode
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java?rev=1546143&r1=1546142&r2=1546143&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java Wed Nov 27 17:55:52 2013
@@ -64,6 +64,7 @@ import org.apache.hadoop.hdfs.protocol.B
import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
+import org.apache.hadoop.hdfs.protocol.CachePoolEntry;
import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
@@ -1298,26 +1299,26 @@ class NameNodeRpcServer implements Namen
}
private class ServerSideCachePoolIterator
- extends BatchedRemoteIterator<String, CachePoolInfo> {
+ extends BatchedRemoteIterator<String, CachePoolEntry> {
public ServerSideCachePoolIterator(String prevKey) {
super(prevKey);
}
@Override
- public BatchedEntries<CachePoolInfo> makeRequest(String prevKey)
+ public BatchedEntries<CachePoolEntry> makeRequest(String prevKey)
throws IOException {
return namesystem.listCachePools(prevKey);
}
@Override
- public String elementToPrevKey(CachePoolInfo element) {
- return element.getPoolName();
+ public String elementToPrevKey(CachePoolEntry entry) {
+ return entry.getInfo().getPoolName();
}
}
@Override
- public RemoteIterator<CachePoolInfo> listCachePools(String prevKey)
+ public RemoteIterator<CachePoolEntry> listCachePools(String prevKey)
throws IOException {
return new ServerSideCachePoolIterator(prevKey);
}
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/CacheAdmin.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/CacheAdmin.java?rev=1546143&r1=1546142&r2=1546143&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/CacheAdmin.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/CacheAdmin.java Wed Nov 27 17:55:52 2013
@@ -32,6 +32,7 @@ import org.apache.hadoop.fs.permission.F
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveStats;
+import org.apache.hadoop.hdfs.protocol.CachePoolEntry;
import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
import org.apache.hadoop.hdfs.server.namenode.CachePool;
@@ -755,9 +756,10 @@ public class CacheAdmin extends Configur
build();
int numResults = 0;
try {
- RemoteIterator<CachePoolInfo> iter = dfs.listCachePools();
+ RemoteIterator<CachePoolEntry> iter = dfs.listCachePools();
while (iter.hasNext()) {
- CachePoolInfo info = iter.next();
+ CachePoolEntry entry = iter.next();
+ CachePoolInfo info = entry.getInfo();
String[] row = new String[5];
if (name == null || info.getPoolName().equals(name)) {
row[0] = info.getPoolName();
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto?rev=1546143&r1=1546142&r2=1546143&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto Wed Nov 27 17:55:52 2013
@@ -421,6 +421,12 @@ message CachePoolInfoProto {
optional int32 weight = 5;
}
+message CachePoolStatsProto {
+ required int64 bytesNeeded = 1;
+ required int64 bytesCached = 2;
+ required int64 filesAffected = 3;
+}
+
message AddCachePoolRequestProto {
required CachePoolInfoProto info = 1;
}
@@ -447,12 +453,13 @@ message ListCachePoolsRequestProto {
}
message ListCachePoolsResponseProto {
- repeated ListCachePoolsResponseElementProto elements = 1;
+ repeated CachePoolEntryProto entries = 1;
required bool hasMore = 2;
}
-message ListCachePoolsResponseElementProto {
+message CachePoolEntryProto {
required CachePoolInfoProto info = 1;
+ required CachePoolStatsProto stats = 2;
}
message GetFileLinkInfoRequestProto {
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java?rev=1546143&r1=1546142&r2=1546143&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java Wed Nov 27 17:55:52 2013
@@ -505,6 +505,11 @@ public class SimulatedFSDataset implemen
return 0l;
}
+ @Override // FSDatasetMBean
+ public long getNumBlocksCached() {
+ return 0l;
+ }
+
@Override
public long getNumBlocksFailedToCache() {
return 0l;
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java?rev=1546143&r1=1546142&r2=1546143&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java Wed Nov 27 17:55:52 2013
@@ -49,8 +49,8 @@ import org.apache.hadoop.hdfs.protocol.B
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetCache;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetCache.PageRounder;
-import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.MappableBlock;
import org.apache.hadoop.hdfs.server.namenode.EditLogFileOutputStream;
import org.apache.hadoop.hdfs.server.namenode.FSImage;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
@@ -72,6 +72,8 @@ import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import org.apache.log4j.Level;
+import org.apache.log4j.LogManager;
import com.google.common.base.Supplier;
@@ -95,6 +97,7 @@ public class TestFsDatasetCache {
static {
EditLogFileOutputStream.setShouldSkipFsyncForTesting(false);
+ LogManager.getLogger(FsDatasetCache.class).setLevel(Level.DEBUG);
}
@Before
@@ -201,17 +204,21 @@ public class TestFsDatasetCache {
/**
* Blocks until cache usage hits the expected new value.
*/
- private long verifyExpectedCacheUsage(final long expected) throws Exception {
+ private long verifyExpectedCacheUsage(final long expectedCacheUsed,
+ final long expectedBlocks) throws Exception {
GenericTestUtils.waitFor(new Supplier<Boolean>() {
private int tries = 0;
@Override
public Boolean get() {
- long curDnCacheUsed = fsd.getCacheUsed();
- if (curDnCacheUsed != expected) {
+ long curCacheUsed = fsd.getCacheUsed();
+ long curBlocks = fsd.getNumBlocksCached();
+ if ((curCacheUsed != expectedCacheUsed) ||
+ (curBlocks != expectedBlocks)) {
if (tries++ > 10) {
- LOG.info("verifyExpectedCacheUsage: expected " +
- expected + ", got " + curDnCacheUsed + "; " +
+ LOG.info("verifyExpectedCacheUsage: have " +
+ curCacheUsed + "/" + expectedCacheUsed + " bytes cached; " +
+ curBlocks + "/" + expectedBlocks + " blocks cached. " +
"memlock limit = " +
NativeIO.POSIX.getCacheManipulator().getMemlockLimit() +
". Waiting...");
@@ -221,14 +228,15 @@ public class TestFsDatasetCache {
return true;
}
}, 100, 60000);
- return expected;
+ return expectedCacheUsed;
}
private void testCacheAndUncacheBlock() throws Exception {
LOG.info("beginning testCacheAndUncacheBlock");
final int NUM_BLOCKS = 5;
- verifyExpectedCacheUsage(0);
+ verifyExpectedCacheUsage(0, 0);
+ assertEquals(0, fsd.getNumBlocksCached());
// Write a test file
final Path testFile = new Path("/testCacheBlock");
@@ -255,7 +263,7 @@ public class TestFsDatasetCache {
// Cache each block in succession, checking each time
for (int i=0; i<NUM_BLOCKS; i++) {
setHeartbeatResponse(cacheBlock(locs[i]));
- current = verifyExpectedCacheUsage(current + blockSizes[i]);
+ current = verifyExpectedCacheUsage(current + blockSizes[i], i + 1);
dnMetrics = getMetrics(dn.getMetrics().name());
long cmds = MetricsAsserts.getLongCounter("BlocksCached", dnMetrics);
assertTrue("Expected more cache requests from the NN ("
@@ -267,7 +275,8 @@ public class TestFsDatasetCache {
// Uncache each block in succession, again checking each time
for (int i=0; i<NUM_BLOCKS; i++) {
setHeartbeatResponse(uncacheBlock(locs[i]));
- current = verifyExpectedCacheUsage(current - blockSizes[i]);
+ current = verifyExpectedCacheUsage(current - blockSizes[i],
+ NUM_BLOCKS - 1 - i);
dnMetrics = getMetrics(dn.getMetrics().name());
long cmds = MetricsAsserts.getLongCounter("BlocksUncached", dnMetrics);
assertTrue("Expected more uncache requests from the NN",
@@ -334,10 +343,11 @@ public class TestFsDatasetCache {
// Cache the first n-1 files
long total = 0;
- verifyExpectedCacheUsage(0);
+ verifyExpectedCacheUsage(0, 0);
for (int i=0; i<numFiles-1; i++) {
setHeartbeatResponse(cacheBlocks(fileLocs[i]));
- total = verifyExpectedCacheUsage(rounder.round(total + fileSizes[i]));
+ total = verifyExpectedCacheUsage(
+ rounder.round(total + fileSizes[i]), 4 * (i + 1));
}
// nth file should hit a capacity exception
@@ -363,7 +373,7 @@ public class TestFsDatasetCache {
for (int i=0; i<numFiles-1; i++) {
setHeartbeatResponse(uncacheBlocks(fileLocs[i]));
total -= rounder.round(fileSizes[i]);
- verifyExpectedCacheUsage(total);
+ verifyExpectedCacheUsage(total, 4 * (numFiles - 2 - i));
}
LOG.info("finishing testFilesExceedMaxLockedMemory");
}
@@ -373,7 +383,7 @@ public class TestFsDatasetCache {
LOG.info("beginning testUncachingBlocksBeforeCachingFinishes");
final int NUM_BLOCKS = 5;
- verifyExpectedCacheUsage(0);
+ verifyExpectedCacheUsage(0, 0);
// Write a test file
final Path testFile = new Path("/testCacheBlock");
@@ -409,7 +419,7 @@ public class TestFsDatasetCache {
// should increase, even though caching doesn't complete on any of them.
for (int i=0; i<NUM_BLOCKS; i++) {
setHeartbeatResponse(cacheBlock(locs[i]));
- current = verifyExpectedCacheUsage(current + blockSizes[i]);
+ current = verifyExpectedCacheUsage(current + blockSizes[i], i + 1);
}
setHeartbeatResponse(new DatanodeCommand[] {
@@ -417,7 +427,7 @@ public class TestFsDatasetCache {
});
// wait until all caching jobs are finished cancelling.
- current = verifyExpectedCacheUsage(0);
+ current = verifyExpectedCacheUsage(0, 0);
LOG.info("finishing testUncachingBlocksBeforeCachingFinishes");
}