You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by sz...@apache.org on 2013/11/09 02:34:23 UTC
svn commit: r1540239 - in
/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs: ./
src/main/java/ src/main/java/org/apache/hadoop/hdfs/client/
src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/
src/main/java/org/apache/hadoop/hdf...
Author: szetszwo
Date: Sat Nov 9 01:34:22 2013
New Revision: 1540239
URL: http://svn.apache.org/r1540239
Log:
merge of r1535792 through r1540238 from trunk.
Modified:
hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/ (props changed)
hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/ (props changed)
hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/ClientMmap.java
hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java
hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlock.java
hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeReference.java
hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeDirectoryWithSnapshot.java
hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeFileUnderConstructionWithSnapshot.java
hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeFileWithSnapshot.java
hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java
hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/TestDelegationTokenForProxyUser.java
hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java
hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestRenameWithSnapshots.java
hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestSnapshotBlocksMap.java
hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/WebHdfsTestUtil.java
Propchange: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/
------------------------------------------------------------------------------
Merged /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs:r1539897-1540238
Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1540239&r1=1540238&r2=1540239&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Sat Nov 9 01:34:22 2013
@@ -359,6 +359,8 @@ Trunk (Unreleased)
HDFS-5468. CacheAdmin help command does not recognize commands (Stephen
Chu via Colin Patrick McCabe)
+ HDFS-5394. Fix race conditions in DN caching and uncaching (cmccabe)
+
Release 2.3.0 - UNRELEASED
INCOMPATIBLE CHANGES
@@ -455,6 +457,9 @@ Release 2.3.0 - UNRELEASED
HDFS-5436. Move HsFtpFileSystem and HFtpFileSystem into org.apache.hdfs.web
(Haohui Mai via Arpit Agarwal)
+ HDFS-5371. Let client retry the same NN when
+ "dfs.client.test.drop.namenode.response.number" is enabled. (jing9)
+
OPTIMIZATIONS
HDFS-5239. Allow FSNamesystem lock fairness to be configurable (daryn)
@@ -506,6 +511,11 @@ Release 2.3.0 - UNRELEASED
HDFS-5443. Delete 0-sized block when deleting an under-construction file that
is included in snapshot. (jing9)
+ HDFS-5476. Snapshot: clean the blocks/files/directories under a renamed
+ file/directory while deletion. (jing9)
+
+ HDFS-5325. Remove WebHdfsFileSystem#ConnRunner. (Haohui Mai via jing9)
+
Release 2.2.1 - UNRELEASED
INCOMPATIBLE CHANGES
Propchange: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/
------------------------------------------------------------------------------
Merged /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java:r1539897-1540238
Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/ClientMmap.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/ClientMmap.java?rev=1540239&r1=1540238&r2=1540239&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/ClientMmap.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/ClientMmap.java Sat Nov 9 01:34:22 2013
@@ -22,6 +22,7 @@ import java.io.FileInputStream;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.io.nativeio.NativeIO;
import java.io.IOException;
import java.lang.ref.WeakReference;
@@ -147,20 +148,9 @@ public class ClientMmap {
/**
* Unmap the memory region.
- *
- * There isn't any portable way to unmap a memory region in Java.
- * So we use the sun.nio method here.
- * Note that unmapping a memory region could cause crashes if code
- * continues to reference the unmapped code. However, if we don't
- * manually unmap the memory, we are dependent on the finalizer to
- * do it, and we have no idea when the finalizer will run.
*/
void unmap() {
assert(refCount.get() == 0);
- if (map instanceof sun.nio.ch.DirectBuffer) {
- final sun.misc.Cleaner cleaner =
- ((sun.nio.ch.DirectBuffer) map).cleaner();
- cleaner.clean();
- }
+ NativeIO.POSIX.munmap(map);
}
}
Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java?rev=1540239&r1=1540238&r2=1540239&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java Sat Nov 9 01:34:22 2013
@@ -47,7 +47,6 @@ import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.net.*;
import org.apache.hadoop.net.NetworkTopology.InvalidTopologyException;
import org.apache.hadoop.util.Daemon;
-import org.apache.hadoop.util.IntrusiveCollection;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.Time;
Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java?rev=1540239&r1=1540238&r2=1540239&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java Sat Nov 9 01:34:22 2013
@@ -18,24 +18,35 @@
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
import java.io.FileInputStream;
+import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
+import java.util.Map.Entry;
+import java.util.concurrent.Executor;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang.builder.HashCodeBuilder;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.ChecksumException;
import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.io.nativeio.NativeIO;
/**
* Manages caching for an FsDatasetImpl by using the mmap(2) and mlock(2)
@@ -45,178 +56,411 @@ import org.apache.hadoop.hdfs.protocol.B
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class FsDatasetCache {
+ /**
+ * Keys which identify MappableBlocks.
+ */
+ private static final class Key {
+ /**
+ * Block id.
+ */
+ final long id;
+
+ /**
+ * Block pool id.
+ */
+ final String bpid;
+
+ Key(long id, String bpid) {
+ this.id = id;
+ this.bpid = bpid;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o == null) {
+ return false;
+ }
+ if (!(o.getClass() == getClass())) {
+ return false;
+ }
+ Key other = (Key)o;
+ return ((other.id == this.id) && (other.bpid.equals(this.bpid)));
+ }
+
+ @Override
+ public int hashCode() {
+ return new HashCodeBuilder().append(id).append(bpid).hashCode();
+ }
+ };
+
+ /**
+ * MappableBlocks that we know about.
+ */
+ private static final class Value {
+ final State state;
+ final MappableBlock mappableBlock;
+
+ Value(MappableBlock mappableBlock, State state) {
+ this.mappableBlock = mappableBlock;
+ this.state = state;
+ }
+ }
+
+ private enum State {
+ /**
+ * The MappableBlock is in the process of being cached.
+ */
+ CACHING,
+
+ /**
+ * The MappableBlock was in the process of being cached, but it was
+ * cancelled. Only the FsDatasetCache#WorkerTask can remove cancelled
+ * MappableBlock objects.
+ */
+ CACHING_CANCELLED,
+
+ /**
+ * The MappableBlock is in the cache.
+ */
+ CACHED,
+
+ /**
+ * The MappableBlock is in the process of uncaching.
+ */
+ UNCACHING;
+
+ /**
+ * Whether we should advertise this block as cached to the NameNode and
+ * clients.
+ */
+ public boolean shouldAdvertise() {
+ return (this == CACHED);
+ }
+ }
private static final Log LOG = LogFactory.getLog(FsDatasetCache.class);
/**
- * Map of cached blocks
+ * Stores MappableBlock objects and the states they're in.
*/
- private final ConcurrentMap<Long, MappableBlock> cachedBlocks;
+ private final HashMap<Key, Value> mappableBlockMap = new HashMap<Key, Value>();
private final FsDatasetImpl dataset;
+
+ private final ThreadPoolExecutor uncachingExecutor;
+
/**
- * Number of cached bytes
+ * The approximate amount of cache space in use.
+ *
+ * This number is an overestimate, counting bytes that will be used only
+ * if pending caching operations succeed. It does not take into account
+ * pending uncaching operations.
+ *
+ * This overestimate is more useful to the NameNode than an underestimate,
+ * since we don't want the NameNode to assign us more replicas than
+ * we can cache, because of the current batch of operations.
*/
- private AtomicLong usedBytes;
+ private final UsedBytesCount usedBytesCount;
+
+ public static class PageRounder {
+ private final long osPageSize = NativeIO.getOperatingSystemPageSize();
+
+ /**
+ * Round up a number to the operating system page size.
+ */
+ public long round(long count) {
+ long newCount =
+ (count + (osPageSize - 1)) / osPageSize;
+ return newCount * osPageSize;
+ }
+ }
+
+ private class UsedBytesCount {
+ private final AtomicLong usedBytes = new AtomicLong(0);
+
+ private PageRounder rounder = new PageRounder();
+
+ /**
+ * Try to reserve more bytes.
+ *
+ * @param count The number of bytes to add. We will round this
+ * up to the page size.
+ *
+ * @return The new number of usedBytes if we succeeded;
+ * -1 if we failed.
+ */
+ long reserve(long count) {
+ count = rounder.round(count);
+ while (true) {
+ long cur = usedBytes.get();
+ long next = cur + count;
+ if (next > maxBytes) {
+ return -1;
+ }
+ if (usedBytes.compareAndSet(cur, next)) {
+ return next;
+ }
+ }
+ }
+
+ /**
+ * Release some bytes that we're using.
+ *
+ * @param count The number of bytes to release. We will round this
+ * up to the page size.
+ *
+ * @return The new number of usedBytes.
+ */
+ long release(long count) {
+ count = rounder.round(count);
+ return usedBytes.addAndGet(-count);
+ }
+
+ long get() {
+ return usedBytes.get();
+ }
+ }
+
/**
- * Total cache capacity in bytes
+ * The total cache capacity in bytes.
*/
private final long maxBytes;
public FsDatasetCache(FsDatasetImpl dataset) {
this.dataset = dataset;
- this.cachedBlocks = new ConcurrentHashMap<Long, MappableBlock>();
- this.usedBytes = new AtomicLong(0);
this.maxBytes = dataset.datanode.getDnConf().getMaxLockedMemory();
- }
-
- /**
- * @return if the block is cached
- */
- boolean isCached(String bpid, long blockId) {
- MappableBlock mapBlock = cachedBlocks.get(blockId);
- if (mapBlock != null) {
- return mapBlock.getBlockPoolId().equals(bpid);
- }
- return false;
+ ThreadFactory workerFactory = new ThreadFactoryBuilder()
+ .setDaemon(true)
+ .setNameFormat("FsDatasetCache-%d-" + dataset.toString())
+ .build();
+ this.usedBytesCount = new UsedBytesCount();
+ this.uncachingExecutor = new ThreadPoolExecutor(
+ 0, 1,
+ 60, TimeUnit.SECONDS,
+ new LinkedBlockingQueue<Runnable>(),
+ workerFactory);
+ this.uncachingExecutor.allowCoreThreadTimeOut(true);
}
/**
* @return List of cached blocks suitable for translation into a
* {@link BlockListAsLongs} for a cache report.
*/
- List<Long> getCachedBlocks(String bpid) {
+ synchronized List<Long> getCachedBlocks(String bpid) {
List<Long> blocks = new ArrayList<Long>();
- // ConcurrentHashMap iteration doesn't see latest updates, which is okay
- Iterator<MappableBlock> it = cachedBlocks.values().iterator();
- while (it.hasNext()) {
- MappableBlock mapBlock = it.next();
- if (mapBlock.getBlockPoolId().equals(bpid)) {
- blocks.add(mapBlock.getBlock().getBlockId());
+ for (Iterator<Entry<Key, Value>> iter =
+ mappableBlockMap.entrySet().iterator(); iter.hasNext(); ) {
+ Entry<Key, Value> entry = iter.next();
+ if (entry.getKey().bpid.equals(bpid)) {
+ if (entry.getValue().state.shouldAdvertise()) {
+ blocks.add(entry.getKey().id);
+ }
}
}
return blocks;
}
/**
- * Asynchronously attempts to cache a block. This is subject to the
- * configured maximum locked memory limit.
- *
- * @param block block to cache
- * @param volume volume of the block
- * @param blockIn stream of the block's data file
- * @param metaIn stream of the block's meta file
- */
- void cacheBlock(String bpid, Block block, FsVolumeImpl volume,
- FileInputStream blockIn, FileInputStream metaIn) {
- if (isCached(bpid, block.getBlockId())) {
- return;
- }
- MappableBlock mapBlock = null;
- try {
- mapBlock = new MappableBlock(bpid, block, volume, blockIn, metaIn);
- } catch (IOException e) {
- LOG.warn("Failed to cache replica " + block + ": Could not instantiate"
- + " MappableBlock", e);
- IOUtils.closeQuietly(blockIn);
- IOUtils.closeQuietly(metaIn);
- return;
- }
- // Check if there's sufficient cache capacity
- boolean success = false;
- long bytes = mapBlock.getNumBytes();
- long used = usedBytes.get();
- while (used+bytes < maxBytes) {
- if (usedBytes.compareAndSet(used, used+bytes)) {
- success = true;
- break;
+ * Attempt to begin caching a block.
+ */
+ synchronized void cacheBlock(long blockId, String bpid,
+ String blockFileName, long length, long genstamp,
+ Executor volumeExecutor) {
+ Key key = new Key(blockId, bpid);
+ Value prevValue = mappableBlockMap.get(key);
+ if (prevValue != null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Block with id " + blockId + ", pool " + bpid +
+ " already exists in the FsDatasetCache with state " +
+ prevValue.state);
}
- used = usedBytes.get();
- }
- if (!success) {
- LOG.warn(String.format(
- "Failed to cache replica %s: %s exceeded (%d + %d > %d)",
- mapBlock.getBlock().toString(),
- DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY,
- used, bytes, maxBytes));
- mapBlock.close();
return;
}
- // Submit it to the worker pool to be cached
- volume.getExecutor().execute(new WorkerTask(mapBlock));
+ mappableBlockMap.put(key, new Value(null, State.CACHING));
+ volumeExecutor.execute(
+ new CachingTask(key, blockFileName, length, genstamp));
}
- /**
- * Uncaches a block if it is cached.
- * @param blockId id to uncache
- */
- void uncacheBlock(String bpid, long blockId) {
- MappableBlock mapBlock = cachedBlocks.get(blockId);
- if (mapBlock != null &&
- mapBlock.getBlockPoolId().equals(bpid) &&
- mapBlock.getBlock().getBlockId() == blockId) {
- mapBlock.close();
- cachedBlocks.remove(blockId);
- long bytes = mapBlock.getNumBytes();
- long used = usedBytes.get();
- while (!usedBytes.compareAndSet(used, used - bytes)) {
- used = usedBytes.get();
- }
- LOG.info("Successfully uncached block " + blockId);
- } else {
- LOG.info("Could not uncache block " + blockId + ": unknown block.");
+ synchronized void uncacheBlock(String bpid, long blockId) {
+ Key key = new Key(blockId, bpid);
+ Value prevValue = mappableBlockMap.get(key);
+
+ if (prevValue == null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Block with id " + blockId + ", pool " + bpid + " " +
+ "does not need to be uncached, because it is not currently " +
+ "in the mappableBlockMap.");
+ }
+ return;
+ }
+ switch (prevValue.state) {
+ case CACHING:
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Cancelling caching for block with id " + blockId +
+ ", pool " + bpid + ".");
+ }
+ mappableBlockMap.put(key,
+ new Value(prevValue.mappableBlock, State.CACHING_CANCELLED));
+ break;
+ case CACHED:
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Block with id " + blockId + ", pool " + bpid + " " +
+ "has been scheduled for uncaching.");
+ }
+ mappableBlockMap.put(key,
+ new Value(prevValue.mappableBlock, State.UNCACHING));
+ uncachingExecutor.execute(new UncachingTask(key));
+ break;
+ default:
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Block with id " + blockId + ", pool " + bpid + " " +
+ "does not need to be uncached, because it is " +
+ "in state " + prevValue.state + ".");
+ }
+ break;
}
}
/**
* Background worker that mmaps, mlocks, and checksums a block
*/
- private class WorkerTask implements Runnable {
-
- private MappableBlock block;
- WorkerTask(MappableBlock block) {
- this.block = block;
+ private class CachingTask implements Runnable {
+ private final Key key;
+ private final String blockFileName;
+ private final long length;
+ private final long genstamp;
+
+ CachingTask(Key key, String blockFileName, long length, long genstamp) {
+ this.key = key;
+ this.blockFileName = blockFileName;
+ this.length = length;
+ this.genstamp = genstamp;
}
@Override
public void run() {
boolean success = false;
+ FileInputStream blockIn = null, metaIn = null;
+ MappableBlock mappableBlock = null;
+ ExtendedBlock extBlk =
+ new ExtendedBlock(key.bpid, key.id, length, genstamp);
+ long newUsedBytes = usedBytesCount.reserve(length);
+ if (newUsedBytes < 0) {
+ LOG.warn("Failed to cache block id " + key.id + ", pool " + key.bpid +
+ ": could not reserve " + length + " more bytes in the " +
+ "cache: " + DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY +
+ " of " + maxBytes + " exceeded.");
+ return;
+ }
try {
- block.map();
- block.lock();
- block.verifyChecksum();
+ try {
+ blockIn = (FileInputStream)dataset.getBlockInputStream(extBlk, 0);
+ metaIn = (FileInputStream)dataset.getMetaDataInputStream(extBlk)
+ .getWrappedStream();
+ } catch (ClassCastException e) {
+ LOG.warn("Failed to cache block with id " + key.id + ", pool " +
+ key.bpid + ": Underlying blocks are not backed by files.", e);
+ return;
+ } catch (FileNotFoundException e) {
+ LOG.info("Failed to cache block with id " + key.id + ", pool " +
+ key.bpid + ": failed to find backing files.");
+ return;
+ } catch (IOException e) {
+ LOG.warn("Failed to cache block with id " + key.id + ", pool " +
+ key.bpid + ": failed to open file", e);
+ return;
+ }
+ try {
+ mappableBlock = MappableBlock.
+ load(length, blockIn, metaIn, blockFileName);
+ } catch (ChecksumException e) {
+ // Exception message is bogus since this wasn't caused by a file read
+ LOG.warn("Failed to cache block " + key.id + " in " + key.bpid + ": " +
+ "checksum verification failed.");
+ return;
+ } catch (IOException e) {
+ LOG.warn("Failed to cache block " + key.id + " in " + key.bpid, e);
+ return;
+ }
+ synchronized (FsDatasetCache.this) {
+ Value value = mappableBlockMap.get(key);
+ Preconditions.checkNotNull(value);
+ Preconditions.checkState(value.state == State.CACHING ||
+ value.state == State.CACHING_CANCELLED);
+ if (value.state == State.CACHING_CANCELLED) {
+ mappableBlockMap.remove(key);
+ LOG.warn("Caching of block " + key.id + " in " + key.bpid +
+ " was cancelled.");
+ return;
+ }
+ mappableBlockMap.put(key, new Value(mappableBlock, State.CACHED));
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Successfully cached block " + key.id + " in " + key.bpid +
+ ". We are now caching " + newUsedBytes + " bytes in total.");
+ }
success = true;
- } catch (ChecksumException e) {
- // Exception message is bogus since this wasn't caused by a file read
- LOG.warn("Failed to cache block " + block.getBlock() + ": Checksum "
- + "verification failed.");
- } catch (IOException e) {
- LOG.warn("Failed to cache block " + block.getBlock() + ": IOException",
- e);
- }
- // If we failed or the block became uncacheable in the meantime,
- // clean up and return the reserved cache allocation
- if (!success ||
- !dataset.validToCache(block.getBlockPoolId(),
- block.getBlock().getBlockId())) {
- block.close();
- long used = usedBytes.get();
- while (!usedBytes.compareAndSet(used, used-block.getNumBytes())) {
- used = usedBytes.get();
- }
- } else {
- LOG.info("Successfully cached block " + block.getBlock());
- cachedBlocks.put(block.getBlock().getBlockId(), block);
+ } finally {
+ if (!success) {
+ newUsedBytes = usedBytesCount.release(length);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Caching of block " + key.id + " in " +
+ key.bpid + " was aborted. We are now caching only " +
+ newUsedBytes + " + bytes in total.");
+ }
+ IOUtils.closeQuietly(blockIn);
+ IOUtils.closeQuietly(metaIn);
+ if (mappableBlock != null) {
+ mappableBlock.close();
+ }
+ }
+ }
+ }
+ }
+
+ private class UncachingTask implements Runnable {
+ private final Key key;
+
+ UncachingTask(Key key) {
+ this.key = key;
+ }
+
+ @Override
+ public void run() {
+ Value value;
+
+ synchronized (FsDatasetCache.this) {
+ value = mappableBlockMap.get(key);
+ }
+ Preconditions.checkNotNull(value);
+ Preconditions.checkArgument(value.state == State.UNCACHING);
+ // TODO: we will eventually need to do revocation here if any clients
+ // are reading via mmap with checksums enabled. See HDFS-5182.
+ IOUtils.closeQuietly(value.mappableBlock);
+ synchronized (FsDatasetCache.this) {
+ mappableBlockMap.remove(key);
+ }
+ long newUsedBytes =
+ usedBytesCount.release(value.mappableBlock.getLength());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Uncaching of block " + key.id + " in " + key.bpid +
+ " completed. usedBytes = " + newUsedBytes);
}
}
}
// Stats related methods for FsDatasetMBean
+ /**
+ * Get the approximate amount of cache space used.
+ */
public long getDnCacheUsed() {
- return usedBytes.get();
+ return usedBytesCount.get();
}
+ /**
+ * Get the maximum amount of bytes we can cache. This is a constant.
+ */
public long getDnCacheCapacity() {
return maxBytes;
}
Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java?rev=1540239&r1=1540238&r2=1540239&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java Sat Nov 9 01:34:22 2013
@@ -32,12 +32,12 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.Executor;
import javax.management.NotCompliantMBeanException;
import javax.management.ObjectName;
import javax.management.StandardMBean;
-import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
@@ -599,7 +599,7 @@ class FsDatasetImpl implements FsDataset
private synchronized ReplicaBeingWritten append(String bpid,
FinalizedReplica replicaInfo, long newGS, long estimateBlockLen)
throws IOException {
- // uncache the block
+ // If the block is cached, start uncaching it.
cacheManager.uncacheBlock(bpid, replicaInfo.getBlockId());
// unlink the finalized replica
replicaInfo.unlinkBlock(1);
@@ -1244,10 +1244,11 @@ class FsDatasetImpl implements FsDataset
volumeMap.remove(bpid, invalidBlks[i]);
perVolumeReplicaMap.get(v.getStorageID()).remove(bpid, invalidBlks[i]);
}
-
- // Uncache the block synchronously
+ // If the block is cached, start uncaching it.
cacheManager.uncacheBlock(bpid, invalidBlks[i].getBlockId());
- // Delete the block asynchronously to make sure we can do it fast enough
+ // Delete the block asynchronously to make sure we can do it fast enough.
+ // It's ok to unlink the block file before the uncache operation
+ // finishes.
asyncDiskService.deleteAsync(v, f,
FsDatasetUtil.getMetaFile(f, invalidBlks[i].getGenerationStamp()),
new ExtendedBlock(bpid, invalidBlks[i]));
@@ -1257,66 +1258,47 @@ class FsDatasetImpl implements FsDataset
}
}
- synchronized boolean validToCache(String bpid, long blockId) {
- ReplicaInfo info = volumeMap.get(bpid, blockId);
- if (info == null) {
- LOG.warn("Failed to cache replica in block pool " + bpid +
- " with block id " + blockId + ": ReplicaInfo not found.");
- return false;
- }
- FsVolumeImpl volume = (FsVolumeImpl)info.getVolume();
- if (volume == null) {
- LOG.warn("Failed to cache block with id " + blockId +
- ": Volume not found.");
- return false;
- }
- if (info.getState() != ReplicaState.FINALIZED) {
- LOG.warn("Failed to block with id " + blockId +
- ": Replica is not finalized.");
- return false;
- }
- return true;
- }
-
/**
* Asynchronously attempts to cache a single block via {@link FsDatasetCache}.
*/
private void cacheBlock(String bpid, long blockId) {
- ReplicaInfo info;
FsVolumeImpl volume;
+ String blockFileName;
+ long length, genstamp;
+ Executor volumeExecutor;
+
synchronized (this) {
- if (!validToCache(bpid, blockId)) {
+ ReplicaInfo info = volumeMap.get(bpid, blockId);
+ if (info == null) {
+ LOG.warn("Failed to cache block with id " + blockId + ", pool " +
+ bpid + ": ReplicaInfo not found.");
return;
}
- info = volumeMap.get(bpid, blockId);
- volume = (FsVolumeImpl)info.getVolume();
- }
- // Try to open block and meta streams
- FileInputStream blockIn = null;
- FileInputStream metaIn = null;
- boolean success = false;
- ExtendedBlock extBlk =
- new ExtendedBlock(bpid, blockId,
- info.getBytesOnDisk(), info.getGenerationStamp());
- try {
- blockIn = (FileInputStream)getBlockInputStream(extBlk, 0);
- metaIn = (FileInputStream)getMetaDataInputStream(extBlk)
- .getWrappedStream();
- success = true;
- } catch (ClassCastException e) {
- LOG.warn("Failed to cache replica " + extBlk + ": Underlying blocks"
- + " are not backed by files.", e);
- } catch (IOException e) {
- LOG.warn("Failed to cache replica " + extBlk + ": IOException while"
- + " trying to open block or meta files.", e);
- }
- if (!success) {
- IOUtils.closeQuietly(blockIn);
- IOUtils.closeQuietly(metaIn);
- return;
+ if (info.getState() != ReplicaState.FINALIZED) {
+ LOG.warn("Failed to cache block with id " + blockId + ", pool " +
+ bpid + ": replica is not finalized; it is in state " +
+ info.getState());
+ return;
+ }
+ try {
+ volume = (FsVolumeImpl)info.getVolume();
+ if (volume == null) {
+ LOG.warn("Failed to cache block with id " + blockId + ", pool " +
+ bpid + ": volume not found.");
+ return;
+ }
+ } catch (ClassCastException e) {
+ LOG.warn("Failed to cache block with id " + blockId +
+ ": volume was not an instance of FsVolumeImpl.");
+ return;
+ }
+ blockFileName = info.getBlockFile().getAbsolutePath();
+ length = info.getVisibleLength();
+ genstamp = info.getGenerationStamp();
+ volumeExecutor = volume.getCacheExecutor();
}
- cacheManager.cacheBlock(bpid, extBlk.getLocalBlock(),
- volume, blockIn, metaIn);
+ cacheManager.cacheBlock(blockId, bpid,
+ blockFileName, length, genstamp, volumeExecutor);
}
@Override // FsDatasetSpi
Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java?rev=1540239&r1=1540238&r2=1540239&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java Sat Nov 9 01:34:22 2013
@@ -18,7 +18,6 @@
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
import java.io.File;
-import java.io.FileInputStream;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
@@ -199,7 +198,7 @@ class FsVolumeImpl implements FsVolumeSp
return getBlockPoolSlice(bpid).addBlock(b, f);
}
- Executor getExecutor() {
+ Executor getCacheExecutor() {
return cacheExecutor;
}
Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlock.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlock.java?rev=1540239&r1=1540238&r2=1540239&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlock.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlock.java Sat Nov 9 01:34:22 2013
@@ -28,184 +28,104 @@ import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.FileChannel.MapMode;
-import org.apache.commons.io.IOUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.ChecksumException;
-import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
import org.apache.hadoop.io.nativeio.NativeIO;
import org.apache.hadoop.util.DataChecksum;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
/**
- * Low-level wrapper for a Block and its backing files that provides mmap,
- * mlock, and checksum verification operations.
- *
- * This could be a private class of FsDatasetCache, not meant for other users.
+ * Represents an HDFS block that is mmapped by the DataNode.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
-class MappableBlock implements Closeable {
-
- private final String bpid;
- private final Block block;
- private final FsVolumeImpl volume;
-
- private final FileInputStream blockIn;
- private final FileInputStream metaIn;
- private final FileChannel blockChannel;
- private final FileChannel metaChannel;
- private final long blockSize;
-
- private boolean isMapped;
- private boolean isLocked;
- private boolean isChecksummed;
-
- private MappedByteBuffer blockMapped = null;
-
- public MappableBlock(String bpid, Block blk, FsVolumeImpl volume,
- FileInputStream blockIn, FileInputStream metaIn) throws IOException {
- this.bpid = bpid;
- this.block = blk;
- this.volume = volume;
-
- this.blockIn = blockIn;
- this.metaIn = metaIn;
- this.blockChannel = blockIn.getChannel();
- this.metaChannel = metaIn.getChannel();
- this.blockSize = blockChannel.size();
-
- this.isMapped = false;
- this.isLocked = false;
- this.isChecksummed = false;
- }
-
- public String getBlockPoolId() {
- return bpid;
- }
-
- public Block getBlock() {
- return block;
- }
-
- public FsVolumeImpl getVolume() {
- return volume;
- }
-
- public boolean isMapped() {
- return isMapped;
- }
-
- public boolean isLocked() {
- return isLocked;
- }
-
- public boolean isChecksummed() {
- return isChecksummed;
- }
-
- /**
- * Returns the number of bytes on disk for the block file
- */
- public long getNumBytes() {
- return blockSize;
- }
-
- /**
- * Maps the block into memory. See mmap(2).
- */
- public void map() throws IOException {
- if (isMapped) {
- return;
+public class MappableBlock implements Closeable {
+ public static interface Mlocker {
+ void mlock(MappedByteBuffer mmap, long length) throws IOException;
+ }
+
+ private static class PosixMlocker implements Mlocker {
+ public void mlock(MappedByteBuffer mmap, long length)
+ throws IOException {
+ NativeIO.POSIX.mlock(mmap, length);
}
- blockMapped = blockChannel.map(MapMode.READ_ONLY, 0, blockSize);
- isMapped = true;
}
- /**
- * Unmaps the block from memory. See munmap(2).
- */
- public void unmap() {
- if (!isMapped) {
- return;
- }
- if (blockMapped instanceof sun.nio.ch.DirectBuffer) {
- sun.misc.Cleaner cleaner =
- ((sun.nio.ch.DirectBuffer)blockMapped).cleaner();
- cleaner.clean();
- }
- isMapped = false;
- isLocked = false;
- isChecksummed = false;
+ @VisibleForTesting
+ public static Mlocker mlocker = new PosixMlocker();
+
+ private MappedByteBuffer mmap;
+ private final long length;
+
+ MappableBlock(MappedByteBuffer mmap, long length) {
+ this.mmap = mmap;
+ this.length = length;
+ assert length > 0;
+ }
+
+ public long getLength() {
+ return length;
}
/**
- * Locks the block into memory. This prevents the block from being paged out.
- * See mlock(2).
- */
- public void lock() throws IOException {
- Preconditions.checkArgument(isMapped,
- "Block must be mapped before it can be locked!");
- if (isLocked) {
- return;
- }
- NativeIO.POSIX.mlock(blockMapped, blockSize);
- isLocked = true;
- }
-
- /**
- * Unlocks the block from memory, allowing it to be paged out. See munlock(2).
- */
- public void unlock() throws IOException {
- if (!isLocked || !isMapped) {
- return;
- }
- NativeIO.POSIX.munlock(blockMapped, blockSize);
- isLocked = false;
- isChecksummed = false;
- }
-
- /**
- * Reads bytes into a buffer until EOF or the buffer's limit is reached
- */
- private int fillBuffer(FileChannel channel, ByteBuffer buf)
- throws IOException {
- int bytesRead = channel.read(buf);
- if (bytesRead < 0) {
- //EOF
- return bytesRead;
- }
- while (buf.remaining() > 0) {
- int n = channel.read(buf);
- if (n < 0) {
- //EOF
- return bytesRead;
+ * Load the block.
+ *
+ * mmap and mlock the block, and then verify its checksum.
+ *
+ * @param length The current length of the block.
+ * @param blockIn The block input stream. Should be positioned at the
+ * start. The caller must close this.
+ * @param metaIn The meta file input stream. Should be positioned at
+ * the start. The caller must close this.
+ * @param blockFileName The block file name, for logging purposes.
+ *
+ * @return The Mappable block.
+ */
+ public static MappableBlock load(long length,
+ FileInputStream blockIn, FileInputStream metaIn,
+ String blockFileName) throws IOException {
+ MappableBlock mappableBlock = null;
+ MappedByteBuffer mmap = null;
+ try {
+ FileChannel blockChannel = blockIn.getChannel();
+ if (blockChannel == null) {
+ throw new IOException("Block InputStream has no FileChannel.");
+ }
+ mmap = blockChannel.map(MapMode.READ_ONLY, 0, length);
+ mlocker.mlock(mmap, length);
+ verifyChecksum(length, metaIn, blockChannel, blockFileName);
+ mappableBlock = new MappableBlock(mmap, length);
+ } finally {
+ if (mappableBlock == null) {
+ if (mmap != null) {
+ NativeIO.POSIX.munmap(mmap); // unmapping also unlocks
+ }
}
- bytesRead += n;
}
- return bytesRead;
+ return mappableBlock;
}
/**
* Verifies the block's checksum. This is an I/O intensive operation.
* @return if the block was successfully checksummed.
*/
- public void verifyChecksum() throws IOException, ChecksumException {
- Preconditions.checkArgument(isLocked && isMapped,
- "Block must be mapped and locked before checksum verification!");
- // skip if checksum has already been successfully verified
- if (isChecksummed) {
- return;
- }
+ private static void verifyChecksum(long length,
+ FileInputStream metaIn, FileChannel blockChannel, String blockFileName)
+ throws IOException, ChecksumException {
// Verify the checksum from the block's meta file
// Get the DataChecksum from the meta file header
- metaChannel.position(0);
BlockMetadataHeader header =
BlockMetadataHeader.readHeader(new DataInputStream(
new BufferedInputStream(metaIn, BlockMetadataHeader
.getHeaderSize())));
+ FileChannel metaChannel = metaIn.getChannel();
+ if (metaChannel == null) {
+ throw new IOException("Block InputStream meta file has no FileChannel.");
+ }
DataChecksum checksum = header.getChecksum();
final int bytesPerChecksum = checksum.getBytesPerChecksum();
final int checksumSize = checksum.getChecksumSize();
@@ -214,13 +134,13 @@ class MappableBlock implements Closeable
ByteBuffer checksumBuf = ByteBuffer.allocate(numChunks*checksumSize);
// Verify the checksum
int bytesVerified = 0;
- while (bytesVerified < blockChannel.size()) {
+ while (bytesVerified < length) {
Preconditions.checkState(bytesVerified % bytesPerChecksum == 0,
"Unexpected partial chunk before EOF");
assert bytesVerified % bytesPerChecksum == 0;
int bytesRead = fillBuffer(blockChannel, blockBuf);
if (bytesRead == -1) {
- throw new IOException("Premature EOF");
+ throw new IOException("checksum verification failed: premature EOF");
}
blockBuf.flip();
// Number of read chunks, including partial chunk at end
@@ -228,22 +148,41 @@ class MappableBlock implements Closeable
checksumBuf.limit(chunks*checksumSize);
fillBuffer(metaChannel, checksumBuf);
checksumBuf.flip();
- checksum.verifyChunkedSums(blockBuf, checksumBuf, block.getBlockName(),
+ checksum.verifyChunkedSums(blockBuf, checksumBuf, blockFileName,
bytesVerified);
// Success
bytesVerified += bytesRead;
blockBuf.clear();
checksumBuf.clear();
}
- isChecksummed = true;
- // Can close the backing file since everything is safely in memory
- blockChannel.close();
+ }
+
+ /**
+ * Reads bytes into a buffer until EOF or the buffer's limit is reached
+ */
+ private static int fillBuffer(FileChannel channel, ByteBuffer buf)
+ throws IOException {
+ int bytesRead = channel.read(buf);
+ if (bytesRead < 0) {
+ //EOF
+ return bytesRead;
+ }
+ while (buf.remaining() > 0) {
+ int n = channel.read(buf);
+ if (n < 0) {
+ //EOF
+ return bytesRead;
+ }
+ bytesRead += n;
+ }
+ return bytesRead;
}
@Override
public void close() {
- unmap();
- IOUtils.closeQuietly(blockIn);
- IOUtils.closeQuietly(metaIn);
+ if (mmap != null) {
+ NativeIO.POSIX.munmap(mmap);
+ mmap = null;
+ }
}
}
Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeReference.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeReference.java?rev=1540239&r1=1540238&r2=1540239&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeReference.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeReference.java Sat Nov 9 01:34:22 2013
@@ -646,16 +646,14 @@ public abstract class INodeReference ext
FileWithSnapshot sfile = (FileWithSnapshot) referred;
// make sure we mark the file as deleted
sfile.deleteCurrentFile();
- if (snapshot != null) {
- try {
- // when calling cleanSubtree of the referred node, since we
- // compute quota usage updates before calling this destroy
- // function, we use true for countDiffChange
- referred.cleanSubtree(snapshot, prior, collectedBlocks,
- removedINodes, true);
- } catch (QuotaExceededException e) {
- LOG.error("should not exceed quota while snapshot deletion", e);
- }
+ try {
+ // when calling cleanSubtree of the referred node, since we
+ // compute quota usage updates before calling this destroy
+ // function, we use true for countDiffChange
+ referred.cleanSubtree(snapshot, prior, collectedBlocks,
+ removedINodes, true);
+ } catch (QuotaExceededException e) {
+ LOG.error("should not exceed quota while snapshot deletion", e);
}
} else if (referred instanceof INodeDirectoryWithSnapshot) {
// similarly, if referred is a directory, it must be an
Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeDirectoryWithSnapshot.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeDirectoryWithSnapshot.java?rev=1540239&r1=1540238&r2=1540239&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeDirectoryWithSnapshot.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeDirectoryWithSnapshot.java Sat Nov 9 01:34:22 2013
@@ -716,14 +716,8 @@ public class INodeDirectoryWithSnapshot
if (priorDiff != null && priorDiff.getSnapshot().equals(prior)) {
List<INode> cList = priorDiff.diff.getList(ListType.CREATED);
List<INode> dList = priorDiff.diff.getList(ListType.DELETED);
- priorCreated = new HashMap<INode, INode>(cList.size());
- for (INode cNode : cList) {
- priorCreated.put(cNode, cNode);
- }
- priorDeleted = new HashMap<INode, INode>(dList.size());
- for (INode dNode : dList) {
- priorDeleted.put(dNode, dNode);
- }
+ priorCreated = cloneDiffList(cList);
+ priorDeleted = cloneDiffList(dList);
}
}
@@ -896,6 +890,17 @@ public class INodeDirectoryWithSnapshot
counts.add(Content.DIRECTORY, diffs.asList().size());
}
+ private static Map<INode, INode> cloneDiffList(List<INode> diffList) {
+ if (diffList == null || diffList.size() == 0) {
+ return null;
+ }
+ Map<INode, INode> map = new HashMap<INode, INode>(diffList.size());
+ for (INode node : diffList) {
+ map.put(node, node);
+ }
+ return map;
+ }
+
/**
* Destroy a subtree under a DstReference node.
*/
@@ -914,26 +919,28 @@ public class INodeDirectoryWithSnapshot
destroyDstSubtree(inode.asReference().getReferredINode(), snapshot,
prior, collectedBlocks, removedINodes);
}
- } else if (inode.isFile() && snapshot != null) {
+ } else if (inode.isFile()) {
inode.cleanSubtree(snapshot, prior, collectedBlocks, removedINodes, true);
} else if (inode.isDirectory()) {
Map<INode, INode> excludedNodes = null;
if (inode instanceof INodeDirectoryWithSnapshot) {
INodeDirectoryWithSnapshot sdir = (INodeDirectoryWithSnapshot) inode;
+
DirectoryDiffList diffList = sdir.getDiffs();
+ DirectoryDiff priorDiff = diffList.getDiff(prior);
+ if (priorDiff != null && priorDiff.getSnapshot().equals(prior)) {
+ List<INode> dList = priorDiff.diff.getList(ListType.DELETED);
+ excludedNodes = cloneDiffList(dList);
+ }
+
if (snapshot != null) {
diffList.deleteSnapshotDiff(snapshot, prior, sdir, collectedBlocks,
removedINodes, true);
}
- DirectoryDiff priorDiff = diffList.getDiff(prior);
+ priorDiff = diffList.getDiff(prior);
if (priorDiff != null && priorDiff.getSnapshot().equals(prior)) {
priorDiff.diff.destroyCreatedList(sdir, collectedBlocks,
removedINodes);
- List<INode> dList = priorDiff.diff.getList(ListType.DELETED);
- excludedNodes = new HashMap<INode, INode>(dList.size());
- for (INode dNode : dList) {
- excludedNodes.put(dNode, dNode);
- }
}
}
for (INode child : inode.asDirectory().getChildrenList(prior)) {
Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeFileUnderConstructionWithSnapshot.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeFileUnderConstructionWithSnapshot.java?rev=1540239&r1=1540238&r2=1540239&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeFileUnderConstructionWithSnapshot.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeFileUnderConstructionWithSnapshot.java Sat Nov 9 01:34:22 2013
@@ -109,8 +109,10 @@ public class INodeFileUnderConstructionW
final List<INode> removedINodes, final boolean countDiffChange)
throws QuotaExceededException {
if (snapshot == null) { // delete the current file
- recordModification(prior, null);
- isCurrentFileDeleted = true;
+ if (!isCurrentFileDeleted()) {
+ recordModification(prior, null);
+ deleteCurrentFile();
+ }
Util.collectBlocksAndClear(this, collectedBlocks, removedINodes);
return Quota.Counts.newInstance();
} else { // delete a snapshot
Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeFileWithSnapshot.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeFileWithSnapshot.java?rev=1540239&r1=1540238&r2=1540239&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeFileWithSnapshot.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeFileWithSnapshot.java Sat Nov 9 01:34:22 2013
@@ -96,8 +96,10 @@ public class INodeFileWithSnapshot exten
final List<INode> removedINodes, final boolean countDiffChange)
throws QuotaExceededException {
if (snapshot == null) { // delete the current file
- recordModification(prior, null);
- isCurrentFileDeleted = true;
+ if (!isCurrentFileDeleted()) {
+ recordModification(prior, null);
+ deleteCurrentFile();
+ }
Util.collectBlocksAndClear(this, collectedBlocks, removedINodes);
return Quota.Counts.newInstance();
} else { // delete a snapshot
Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java?rev=1540239&r1=1540238&r2=1540239&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java Sat Nov 9 01:34:22 2013
@@ -683,19 +683,6 @@ public class WebHdfsFileSystem extends F
}
}
- @VisibleForTesting
- final class ConnRunner extends AbstractRunner {
- protected ConnRunner(final HttpOpParam.Op op, HttpURLConnection conn) {
- super(op, false);
- this.conn = conn;
- }
-
- @Override
- protected URL getUrl() {
- return null;
- }
- }
-
private FsPermission applyUMask(FsPermission permission) {
if (permission == null) {
permission = FsPermission.getDefault();
Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/TestDelegationTokenForProxyUser.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/TestDelegationTokenForProxyUser.java?rev=1540239&r1=1540238&r2=1540239&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/TestDelegationTokenForProxyUser.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/TestDelegationTokenForProxyUser.java Sat Nov 9 01:34:22 2013
@@ -23,20 +23,14 @@ package org.apache.hadoop.hdfs.security;
import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.IOException;
-import java.net.HttpURLConnection;
import java.net.InetAddress;
import java.net.NetworkInterface;
-import java.net.URL;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Enumeration;
-import java.util.Map;
-
-import javax.servlet.http.HttpServletResponse;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
@@ -47,23 +41,17 @@ import org.apache.hadoop.hdfs.DFSConfigK
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
-import org.apache.hadoop.hdfs.server.namenode.web.resources.NamenodeWebHdfsMethods;
import org.apache.hadoop.hdfs.web.WebHdfsFileSystem;
import org.apache.hadoop.hdfs.web.WebHdfsTestUtil;
-import org.apache.hadoop.hdfs.web.resources.DoAsParam;
-import org.apache.hadoop.hdfs.web.resources.ExceptionHandler;
-import org.apache.hadoop.hdfs.web.resources.GetOpParam;
-import org.apache.hadoop.hdfs.web.resources.PostOpParam;
-import org.apache.hadoop.hdfs.web.resources.PutOpParam;
import org.apache.hadoop.security.TestDoAsEffectiveUser;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.ProxyUsers;
import org.apache.hadoop.security.token.Token;
-import org.apache.log4j.Level;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
+import org.mockito.internal.util.reflection.Whitebox;
public class TestDelegationTokenForProxyUser {
private static MiniDFSCluster cluster;
@@ -155,56 +143,26 @@ public class TestDelegationTokenForProxy
}
}
- @Test(timeout=20000)
+ @Test(timeout=5000)
public void testWebHdfsDoAs() throws Exception {
WebHdfsTestUtil.LOG.info("START: testWebHdfsDoAs()");
- ((Log4JLogger)NamenodeWebHdfsMethods.LOG).getLogger().setLevel(Level.ALL);
- ((Log4JLogger)ExceptionHandler.LOG).getLogger().setLevel(Level.ALL);
WebHdfsTestUtil.LOG.info("ugi.getShortUserName()=" + ugi.getShortUserName());
final WebHdfsFileSystem webhdfs = WebHdfsTestUtil.getWebHdfsFileSystemAs(ugi, config);
final Path root = new Path("/");
cluster.getFileSystem().setPermission(root, new FsPermission((short)0777));
- {
- //test GETHOMEDIRECTORY with doAs
- final URL url = WebHdfsTestUtil.toUrl(webhdfs,
- GetOpParam.Op.GETHOMEDIRECTORY, root, new DoAsParam(PROXY_USER));
- final HttpURLConnection conn = (HttpURLConnection) url.openConnection();
- final Map<?, ?> m = WebHdfsTestUtil.connectAndGetJson(conn, HttpServletResponse.SC_OK);
- conn.disconnect();
-
- final Object responsePath = m.get(Path.class.getSimpleName());
- WebHdfsTestUtil.LOG.info("responsePath=" + responsePath);
- Assert.assertEquals("/user/" + PROXY_USER, responsePath);
- }
+ Whitebox.setInternalState(webhdfs, "ugi", proxyUgi);
{
- //test GETHOMEDIRECTORY with DOas
- final URL url = WebHdfsTestUtil.toUrl(webhdfs,
- GetOpParam.Op.GETHOMEDIRECTORY, root, new DoAsParam(PROXY_USER) {
- @Override
- public String getName() {
- return "DOas";
- }
- });
- final HttpURLConnection conn = (HttpURLConnection) url.openConnection();
- final Map<?, ?> m = WebHdfsTestUtil.connectAndGetJson(conn, HttpServletResponse.SC_OK);
- conn.disconnect();
-
- final Object responsePath = m.get(Path.class.getSimpleName());
+ Path responsePath = webhdfs.getHomeDirectory();
WebHdfsTestUtil.LOG.info("responsePath=" + responsePath);
- Assert.assertEquals("/user/" + PROXY_USER, responsePath);
+ Assert.assertEquals(webhdfs.getUri() + "/user/" + PROXY_USER, responsePath.toString());
}
final Path f = new Path("/testWebHdfsDoAs/a.txt");
{
- //test create file with doAs
- final PutOpParam.Op op = PutOpParam.Op.CREATE;
- final URL url = WebHdfsTestUtil.toUrl(webhdfs, op, f, new DoAsParam(PROXY_USER));
- HttpURLConnection conn = (HttpURLConnection) url.openConnection();
- conn = WebHdfsTestUtil.twoStepWrite(webhdfs, op, conn);
- final FSDataOutputStream out = WebHdfsTestUtil.write(webhdfs, op, conn, 4096);
+ FSDataOutputStream out = webhdfs.create(f);
out.write("Hello, webhdfs user!".getBytes());
out.close();
@@ -214,12 +172,7 @@ public class TestDelegationTokenForProxy
}
{
- //test append file with doAs
- final PostOpParam.Op op = PostOpParam.Op.APPEND;
- final URL url = WebHdfsTestUtil.toUrl(webhdfs, op, f, new DoAsParam(PROXY_USER));
- HttpURLConnection conn = (HttpURLConnection) url.openConnection();
- conn = WebHdfsTestUtil.twoStepWrite(webhdfs, op, conn);
- final FSDataOutputStream out = WebHdfsTestUtil.write(webhdfs, op, conn, 4096);
+ final FSDataOutputStream out = webhdfs.append(f);
out.write("\nHello again!".getBytes());
out.close();
Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java?rev=1540239&r1=1540238&r2=1540239&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java Sat Nov 9 01:34:22 2013
@@ -26,8 +26,11 @@ import static org.mockito.Mockito.doRetu
import java.io.FileInputStream;
import java.io.IOException;
+import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.HdfsBlockLocation;
@@ -42,6 +45,8 @@ import org.apache.hadoop.hdfs.protocol.B
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetCache.PageRounder;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.MappableBlock;
import org.apache.hadoop.hdfs.server.namenode.FSImage;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.protocol.BlockIdCommand;
@@ -52,12 +57,18 @@ import org.apache.hadoop.hdfs.server.pro
import org.apache.hadoop.hdfs.server.protocol.NNHAStatusHeartbeat;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.io.nativeio.NativeIO;
+import org.apache.hadoop.test.GenericTestUtils;
import org.apache.log4j.Logger;
import org.junit.After;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Supplier;
+
public class TestFsDatasetCache {
+ private static final Log LOG = LogFactory.getLog(TestFsDatasetCache.class);
// Most Linux installs allow a default of 64KB locked memory
private static final long CACHE_CAPACITY = 64 * 1024;
@@ -71,12 +82,14 @@ public class TestFsDatasetCache {
private static DataNode dn;
private static FsDatasetSpi<?> fsd;
private static DatanodeProtocolClientSideTranslatorPB spyNN;
+ private static PageRounder rounder = new PageRounder();
@Before
public void setUp() throws Exception {
assumeTrue(!Path.WINDOWS);
- assumeTrue(NativeIO.isAvailable());
+ assumeTrue(NativeIO.getMemlockLimit() >= CACHE_CAPACITY);
conf = new HdfsConfiguration();
+ conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_CACHING_ENABLED_KEY, true);
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
conf.setLong(DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY,
CACHE_CAPACITY);
@@ -169,19 +182,34 @@ public class TestFsDatasetCache {
* Blocks until cache usage hits the expected new value.
*/
private long verifyExpectedCacheUsage(final long expected) throws Exception {
- long cacheUsed = fsd.getDnCacheUsed();
- while (cacheUsed != expected) {
- cacheUsed = fsd.getDnCacheUsed();
- Thread.sleep(100);
- }
- assertEquals("Unexpected amount of cache used", expected, cacheUsed);
- return cacheUsed;
+ GenericTestUtils.waitFor(new Supplier<Boolean>() {
+ private int tries = 0;
+
+ @Override
+ public Boolean get() {
+ long curDnCacheUsed = fsd.getDnCacheUsed();
+ if (curDnCacheUsed != expected) {
+ if (tries++ > 10) {
+ LOG.info("verifyExpectedCacheUsage: expected " +
+ expected + ", got " + curDnCacheUsed + "; " +
+ "memlock limit = " + NativeIO.getMemlockLimit() +
+ ". Waiting...");
+ }
+ return false;
+ }
+ return true;
+ }
+ }, 100, 60000);
+ return expected;
}
- @Test(timeout=60000)
+ @Test(timeout=600000)
public void testCacheAndUncacheBlock() throws Exception {
+ LOG.info("beginning testCacheAndUncacheBlock");
final int NUM_BLOCKS = 5;
+ verifyExpectedCacheUsage(0);
+
// Write a test file
final Path testFile = new Path("/testCacheBlock");
final long testFileLen = BLOCK_SIZE*NUM_BLOCKS;
@@ -211,15 +239,23 @@ public class TestFsDatasetCache {
setHeartbeatResponse(uncacheBlock(locs[i]));
current = verifyExpectedCacheUsage(current - blockSizes[i]);
}
+ LOG.info("finishing testCacheAndUncacheBlock");
}
- @Test(timeout=60000)
+ @Test(timeout=600000)
public void testFilesExceedMaxLockedMemory() throws Exception {
+ LOG.info("beginning testFilesExceedMaxLockedMemory");
+
+ // We don't want to deal with page rounding issues, so skip this
+ // test if page size is weird
+ long osPageSize = NativeIO.getOperatingSystemPageSize();
+ assumeTrue(osPageSize == 4096);
+
// Create some test files that will exceed total cache capacity
- // Don't forget that meta files take up space too!
- final int numFiles = 4;
- final long fileSize = CACHE_CAPACITY / numFiles;
- final Path[] testFiles = new Path[4];
+ final int numFiles = 5;
+ final long fileSize = 15000;
+
+ final Path[] testFiles = new Path[numFiles];
final HdfsBlockLocation[][] fileLocs = new HdfsBlockLocation[numFiles][];
final long[] fileSizes = new long[numFiles];
for (int i=0; i<numFiles; i++) {
@@ -235,35 +271,87 @@ public class TestFsDatasetCache {
}
// Cache the first n-1 files
- long current = 0;
+ long total = 0;
+ verifyExpectedCacheUsage(0);
for (int i=0; i<numFiles-1; i++) {
setHeartbeatResponse(cacheBlocks(fileLocs[i]));
- current = verifyExpectedCacheUsage(current + fileSizes[i]);
+ total = verifyExpectedCacheUsage(rounder.round(total + fileSizes[i]));
}
- final long oldCurrent = current;
// nth file should hit a capacity exception
final LogVerificationAppender appender = new LogVerificationAppender();
final Logger logger = Logger.getRootLogger();
logger.addAppender(appender);
setHeartbeatResponse(cacheBlocks(fileLocs[numFiles-1]));
- int lines = 0;
- while (lines == 0) {
- Thread.sleep(100);
- lines = appender.countLinesWithMessage(
- DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY + " exceeded");
- }
- // Uncache the cached part of the nth file
- setHeartbeatResponse(uncacheBlocks(fileLocs[numFiles-1]));
- while (fsd.getDnCacheUsed() != oldCurrent) {
- Thread.sleep(100);
- }
+ GenericTestUtils.waitFor(new Supplier<Boolean>() {
+ @Override
+ public Boolean get() {
+ int lines = appender.countLinesWithMessage(
+ "more bytes in the cache: " +
+ DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY);
+ return lines > 0;
+ }
+ }, 500, 30000);
// Uncache the n-1 files
for (int i=0; i<numFiles-1; i++) {
setHeartbeatResponse(uncacheBlocks(fileLocs[i]));
- current = verifyExpectedCacheUsage(current - fileSizes[i]);
+ total -= rounder.round(fileSizes[i]);
+ verifyExpectedCacheUsage(total);
+ }
+ LOG.info("finishing testFilesExceedMaxLockedMemory");
+ }
+
+ @Test(timeout=600000)
+ public void testUncachingBlocksBeforeCachingFinishes() throws Exception {
+ LOG.info("beginning testUncachingBlocksBeforeCachingFinishes");
+ final int NUM_BLOCKS = 5;
+
+ verifyExpectedCacheUsage(0);
+
+ // Write a test file
+ final Path testFile = new Path("/testCacheBlock");
+ final long testFileLen = BLOCK_SIZE*NUM_BLOCKS;
+ DFSTestUtil.createFile(fs, testFile, testFileLen, (short)1, 0xABBAl);
+
+ // Get the details of the written file
+ HdfsBlockLocation[] locs =
+ (HdfsBlockLocation[])fs.getFileBlockLocations(testFile, 0, testFileLen);
+ assertEquals("Unexpected number of blocks", NUM_BLOCKS, locs.length);
+ final long[] blockSizes = getBlockSizes(locs);
+
+ // Check initial state
+ final long cacheCapacity = fsd.getDnCacheCapacity();
+ long cacheUsed = fsd.getDnCacheUsed();
+ long current = 0;
+ assertEquals("Unexpected cache capacity", CACHE_CAPACITY, cacheCapacity);
+ assertEquals("Unexpected amount of cache used", current, cacheUsed);
+
+ MappableBlock.mlocker = new MappableBlock.Mlocker() {
+ @Override
+ public void mlock(MappedByteBuffer mmap, long length) throws IOException {
+ LOG.info("An mlock operation is starting.");
+ try {
+ Thread.sleep(3000);
+ } catch (InterruptedException e) {
+ Assert.fail();
+ }
+ }
+ };
+ // Starting caching each block in succession. The usedBytes amount
+ // should increase, even though caching doesn't complete on any of them.
+ for (int i=0; i<NUM_BLOCKS; i++) {
+ setHeartbeatResponse(cacheBlock(locs[i]));
+ current = verifyExpectedCacheUsage(current + blockSizes[i]);
}
+
+ setHeartbeatResponse(new DatanodeCommand[] {
+ getResponse(locs, DatanodeProtocol.DNA_UNCACHE)
+ });
+
+ // wait until all caching jobs are finished cancelling.
+ current = verifyExpectedCacheUsage(0);
+ LOG.info("finishing testUncachingBlocksBeforeCachingFinishes");
}
}
Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestRenameWithSnapshots.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestRenameWithSnapshots.java?rev=1540239&r1=1540238&r2=1540239&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestRenameWithSnapshots.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestRenameWithSnapshots.java Sat Nov 9 01:34:22 2013
@@ -2243,4 +2243,50 @@ public class TestRenameWithSnapshots {
restartClusterAndCheckImage(true);
}
+
+ /**
+ * Make sure we clean the whole subtree under a DstReference node after
+ * deleting a snapshot.
+ * see HDFS-5476.
+ */
+ @Test
+ public void testCleanDstReference() throws Exception {
+ final Path test = new Path("/test");
+ final Path foo = new Path(test, "foo");
+ final Path bar = new Path(foo, "bar");
+ hdfs.mkdirs(bar);
+ SnapshotTestHelper.createSnapshot(hdfs, test, "s0");
+
+ // create file after s0 so that the file should not be included in s0
+ final Path fileInBar = new Path(bar, "file");
+ DFSTestUtil.createFile(hdfs, fileInBar, BLOCKSIZE, REPL, SEED);
+ // rename foo --> foo2
+ final Path foo2 = new Path(test, "foo2");
+ hdfs.rename(foo, foo2);
+ // create snapshot s1, note the file is included in s1
+ hdfs.createSnapshot(test, "s1");
+ // delete bar and foo2
+ hdfs.delete(new Path(foo2, "bar"), true);
+ hdfs.delete(foo2, true);
+
+ final Path sfileInBar = SnapshotTestHelper.getSnapshotPath(test, "s1",
+ "foo2/bar/file");
+ assertTrue(hdfs.exists(sfileInBar));
+
+ hdfs.deleteSnapshot(test, "s1");
+ assertFalse(hdfs.exists(sfileInBar));
+
+ restartClusterAndCheckImage(true);
+ // make sure the file under bar is deleted
+ final Path barInS0 = SnapshotTestHelper.getSnapshotPath(test, "s0",
+ "foo/bar");
+ INodeDirectoryWithSnapshot barNode = (INodeDirectoryWithSnapshot) fsdir
+ .getINode(barInS0.toString());
+ assertEquals(0, barNode.getChildrenList(null).size());
+ List<DirectoryDiff> diffList = barNode.getDiffs().asList();
+ assertEquals(1, diffList.size());
+ DirectoryDiff diff = diffList.get(0);
+ assertEquals(0, diff.getChildrenDiff().getList(ListType.DELETED).size());
+ assertEquals(0, diff.getChildrenDiff().getList(ListType.CREATED).size());
+ }
}
Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestSnapshotBlocksMap.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestSnapshotBlocksMap.java?rev=1540239&r1=1540238&r2=1540239&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestSnapshotBlocksMap.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestSnapshotBlocksMap.java Sat Nov 9 01:34:22 2013
@@ -347,4 +347,49 @@ public class TestSnapshotBlocksMap {
assertEquals(1, blks.length);
assertEquals(BLOCKSIZE, blks[0].getNumBytes());
}
+
+ /**
+ * 1. rename under-construction file with 0-sized blocks after snapshot.
+ * 2. delete the renamed directory.
+ * make sure we delete the 0-sized block.
+ * see HDFS-5476.
+ */
+ @Test
+ public void testDeletionWithZeroSizeBlock3() throws Exception {
+ final Path foo = new Path("/foo");
+ final Path subDir = new Path(foo, "sub");
+ final Path bar = new Path(subDir, "bar");
+ DFSTestUtil.createFile(hdfs, bar, BLOCKSIZE, REPLICATION, 0L);
+
+ hdfs.append(bar);
+
+ INodeFile barNode = fsdir.getINode4Write(bar.toString()).asFile();
+ BlockInfo[] blks = barNode.getBlocks();
+ assertEquals(1, blks.length);
+ ExtendedBlock previous = new ExtendedBlock(fsn.getBlockPoolId(), blks[0]);
+ cluster.getNameNodeRpc()
+ .addBlock(bar.toString(), hdfs.getClient().getClientName(), previous,
+ null, barNode.getId(), null);
+
+ SnapshotTestHelper.createSnapshot(hdfs, foo, "s1");
+
+ // rename bar
+ final Path bar2 = new Path(subDir, "bar2");
+ hdfs.rename(bar, bar2);
+
+ INodeFile bar2Node = fsdir.getINode4Write(bar2.toString()).asFile();
+ blks = bar2Node.getBlocks();
+ assertEquals(2, blks.length);
+ assertEquals(BLOCKSIZE, blks[0].getNumBytes());
+ assertEquals(0, blks[1].getNumBytes());
+
+ // delete subDir
+ hdfs.delete(subDir, true);
+
+ final Path sbar = SnapshotTestHelper.getSnapshotPath(foo, "s1", "sub/bar");
+ barNode = fsdir.getINode(sbar.toString()).asFile();
+ blks = barNode.getBlocks();
+ assertEquals(1, blks.length);
+ assertEquals(BLOCKSIZE, blks[0].getNumBytes());
+ }
}
Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/WebHdfsTestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/WebHdfsTestUtil.java?rev=1540239&r1=1540238&r2=1540239&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/WebHdfsTestUtil.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/WebHdfsTestUtil.java Sat Nov 9 01:34:22 2013
@@ -78,11 +78,6 @@ public class WebHdfsTestUtil {
Assert.assertEquals(expectedResponseCode, conn.getResponseCode());
return WebHdfsFileSystem.jsonParse(conn, false);
}
-
- public static HttpURLConnection twoStepWrite(final WebHdfsFileSystem webhdfs,
- final HttpOpParam.Op op, HttpURLConnection conn) throws IOException {
- return webhdfs.new ConnRunner(op, conn).twoStepWrite();
- }
public static FSDataOutputStream write(final WebHdfsFileSystem webhdfs,
final HttpOpParam.Op op, final HttpURLConnection conn,