You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by zh...@apache.org on 2015/09/23 05:40:55 UTC
[21/50] [abbrv] hadoop git commit: HDFS-6955. DN should reserve disk
space for a full block when creating tmp files (Contributed by Kanaka Kumar
Avvaru)
HDFS-6955. DN should reserve disk space for a full block when creating tmp files (Contributed by Kanaka Kumar Avvaru)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/92c1af16
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/92c1af16
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/92c1af16
Branch: refs/heads/HDFS-7285
Commit: 92c1af1646b1d91a2ab7821e4f7d450e3b6e10bb
Parents: a7201d6
Author: Vinayakumar B <vi...@apache.org>
Authored: Fri Sep 18 16:37:10 2015 +0530
Committer: Vinayakumar B <vi...@apache.org>
Committed: Fri Sep 18 16:37:10 2015 +0530
----------------------------------------------------------------------
.../hdfs/server/datanode/BlockReceiver.java | 5 +-
.../server/datanode/fsdataset/FsVolumeSpi.java | 8 +-
.../datanode/fsdataset/impl/FsDatasetImpl.java | 13 +-
.../datanode/fsdataset/impl/FsVolumeImpl.java | 72 ++-
.../server/datanode/SimulatedFSDataset.java | 2 +-
.../server/datanode/TestDirectoryScanner.java | 2 +-
.../datanode/extdataset/ExternalVolumeImpl.java | 2 +-
.../fsdataset/impl/TestRbwSpaceReservation.java | 452 ---------------
.../fsdataset/impl/TestSpaceReservation.java | 576 +++++++++++++++++++
9 files changed, 637 insertions(+), 495 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/92c1af16/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
index bc5396f..957b2c7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
@@ -117,7 +117,7 @@ class BlockReceiver implements Closeable {
/** the block to receive */
private final ExtendedBlock block;
/** the replica to write */
- private final ReplicaInPipelineInterface replicaInfo;
+ private ReplicaInPipelineInterface replicaInfo;
/** pipeline stage */
private final BlockConstructionStage stage;
private final boolean isTransfer;
@@ -259,6 +259,9 @@ class BlockReceiver implements Closeable {
} catch (ReplicaNotFoundException bne) {
throw bne;
} catch(IOException ioe) {
+ if (replicaInfo != null) {
+ replicaInfo.releaseAllBytesReserved();
+ }
IOUtils.closeStream(this);
cleanupBlock();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/92c1af16/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java
index ee01924..9e16121 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java
@@ -62,13 +62,13 @@ public interface FsVolumeSpi {
boolean isTransientStorage();
/**
- * Reserve disk space for an RBW block so a writer does not run out of
- * space before the block is full.
+ * Reserve disk space for a block (RBW or Re-replicating)
+ * so a writer does not run out of space before the block is full.
*/
- void reserveSpaceForRbw(long bytesToReserve);
+ void reserveSpaceForReplica(long bytesToReserve);
/**
- * Release disk space previously reserved for RBW block.
+ * Release disk space previously reserved for block opened for write.
*/
void releaseReservedSpace(long bytesToRelease);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/92c1af16/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
index 8722d35..32eb724 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
@@ -1157,7 +1157,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
// Replace finalized replica by a RBW replica in replicas map
volumeMap.add(bpid, newReplicaInfo);
- v.reserveSpaceForRbw(estimateBlockLen - replicaInfo.getNumBytes());
+ v.reserveSpaceForReplica(estimateBlockLen - replicaInfo.getNumBytes());
return newReplicaInfo;
}
@@ -1487,7 +1487,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
}
ReplicaInPipeline newReplicaInfo =
new ReplicaInPipeline(b.getBlockId(), b.getGenerationStamp(), v,
- f.getParentFile(), 0);
+ f.getParentFile(), b.getLocalBlock().getNumBytes());
volumeMap.add(b.getBlockPoolId(), newReplicaInfo);
return new ReplicaHandler(newReplicaInfo, ref);
} else {
@@ -1604,7 +1604,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
if (replicaInfo != null && replicaInfo.getState() == ReplicaState.TEMPORARY) {
// remove from volumeMap
volumeMap.remove(b.getBlockPoolId(), b.getLocalBlock());
-
+
// delete the on-disk temp file
if (delBlockFromDisk(replicaInfo.getBlockFile(),
replicaInfo.getMetaFile(), b.getLocalBlock())) {
@@ -2555,14 +2555,15 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
final long usedSpace; // size of space used by HDFS
final long freeSpace; // size of free space excluding reserved space
final long reservedSpace; // size of space reserved for non-HDFS
- final long reservedSpaceForRBW; // size of space reserved RBW
+ final long reservedSpaceForReplicas; // size of space reserved RBW or
+ // re-replication
VolumeInfo(FsVolumeImpl v, long usedSpace, long freeSpace) {
this.directory = v.toString();
this.usedSpace = usedSpace;
this.freeSpace = freeSpace;
this.reservedSpace = v.getReserved();
- this.reservedSpaceForRBW = v.getReservedForRbw();
+ this.reservedSpaceForReplicas = v.getReservedForReplicas();
}
}
@@ -2596,7 +2597,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
innerInfo.put("usedSpace", v.usedSpace);
innerInfo.put("freeSpace", v.freeSpace);
innerInfo.put("reservedSpace", v.reservedSpace);
- innerInfo.put("reservedSpaceForRBW", v.reservedSpaceForRBW);
+ innerInfo.put("reservedSpaceForReplicas", v.reservedSpaceForReplicas);
info.put(v.directory, innerInfo);
}
return info;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/92c1af16/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
index e90f5d2..8fd52c3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
@@ -22,8 +22,8 @@ import java.io.File;
import java.io.FileOutputStream;
import java.io.FilenameFilter;
import java.io.IOException;
-import java.nio.channels.ClosedChannelException;
import java.io.OutputStreamWriter;
+import java.nio.channels.ClosedChannelException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.nio.file.StandardCopyOption;
@@ -40,9 +40,6 @@ import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Joiner;
-import com.google.common.base.Preconditions;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.DF;
@@ -54,21 +51,24 @@ import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.datanode.DataStorage;
import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil;
-import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
-import org.apache.hadoop.util.CloseableReferenceCount;
import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.util.CloseableReferenceCount;
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
-
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.util.Time;
import org.codehaus.jackson.annotate.JsonProperty;
import org.codehaus.jackson.map.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
/**
* The underlying volume used to store replica.
*
@@ -90,8 +90,9 @@ public class FsVolumeImpl implements FsVolumeSpi {
private final long reserved;
private CloseableReferenceCount reference = new CloseableReferenceCount();
- // Disk space reserved for open blocks.
- private AtomicLong reservedForRbw;
+ // Disk space reserved for blocks (RBW or Re-replicating) open for write.
+ private AtomicLong reservedForReplicas;
+ private long recentReserved = 0;
// Capacity configured. This is useful when we want to
// limit the visible capacity for tests. If negative, then we just
@@ -113,8 +114,8 @@ public class FsVolumeImpl implements FsVolumeSpi {
this.reserved = conf.getLong(
DFSConfigKeys.DFS_DATANODE_DU_RESERVED_KEY,
DFSConfigKeys.DFS_DATANODE_DU_RESERVED_DEFAULT);
- this.reservedForRbw = new AtomicLong(0L);
- this.currentDir = currentDir;
+ this.reservedForReplicas = new AtomicLong(0L);
+ this.currentDir = currentDir;
File parent = currentDir.getParentFile();
this.usage = new DF(parent, conf);
this.storageType = storageType;
@@ -353,8 +354,9 @@ public class FsVolumeImpl implements FsVolumeSpi {
*/
@Override
public long getAvailable() throws IOException {
- long remaining = getCapacity() - getDfsUsed() - reservedForRbw.get();
- long available = usage.getAvailable() - reserved - reservedForRbw.get();
+ long remaining = getCapacity() - getDfsUsed() - reservedForReplicas.get();
+ long available = usage.getAvailable() - reserved
+ - reservedForReplicas.get();
if (remaining > available) {
remaining = available;
}
@@ -362,10 +364,15 @@ public class FsVolumeImpl implements FsVolumeSpi {
}
@VisibleForTesting
- public long getReservedForRbw() {
- return reservedForRbw.get();
+ public long getReservedForReplicas() {
+ return reservedForReplicas.get();
}
-
+
+ @VisibleForTesting
+ long getRecentReserved() {
+ return recentReserved;
+ }
+
long getReserved(){
return reserved;
}
@@ -412,13 +419,20 @@ public class FsVolumeImpl implements FsVolumeSpi {
*/
File createTmpFile(String bpid, Block b) throws IOException {
checkReference();
- return getBlockPoolSlice(bpid).createTmpFile(b);
+ reserveSpaceForReplica(b.getNumBytes());
+ try {
+ return getBlockPoolSlice(bpid).createTmpFile(b);
+ } catch (IOException exception) {
+ releaseReservedSpace(b.getNumBytes());
+ throw exception;
+ }
}
@Override
- public void reserveSpaceForRbw(long bytesToReserve) {
+ public void reserveSpaceForReplica(long bytesToReserve) {
if (bytesToReserve != 0) {
- reservedForRbw.addAndGet(bytesToReserve);
+ reservedForReplicas.addAndGet(bytesToReserve);
+ recentReserved = bytesToReserve;
}
}
@@ -428,14 +442,15 @@ public class FsVolumeImpl implements FsVolumeSpi {
long oldReservation, newReservation;
do {
- oldReservation = reservedForRbw.get();
+ oldReservation = reservedForReplicas.get();
newReservation = oldReservation - bytesToRelease;
if (newReservation < 0) {
- // Failsafe, this should never occur in practice, but if it does we don't
- // want to start advertising more space than we have available.
+ // Failsafe, this should never occur in practice, but if it does we
+ // don't want to start advertising more space than we have available.
newReservation = 0;
}
- } while (!reservedForRbw.compareAndSet(oldReservation, newReservation));
+ } while (!reservedForReplicas.compareAndSet(oldReservation,
+ newReservation));
}
}
@@ -779,7 +794,7 @@ public class FsVolumeImpl implements FsVolumeSpi {
*/
File createRbwFile(String bpid, Block b) throws IOException {
checkReference();
- reserveSpaceForRbw(b.getNumBytes());
+ reserveSpaceForReplica(b.getNumBytes());
try {
return getBlockPoolSlice(bpid).createRbwFile(b);
} catch (IOException exception) {
@@ -790,16 +805,15 @@ public class FsVolumeImpl implements FsVolumeSpi {
/**
*
- * @param bytesReservedForRbw Space that was reserved during
+ * @param bytesReserved Space that was reserved during
* block creation. Now that the block is being finalized we
* can free up this space.
* @return
* @throws IOException
*/
- File addFinalizedBlock(String bpid, Block b,
- File f, long bytesReservedForRbw)
+ File addFinalizedBlock(String bpid, Block b, File f, long bytesReserved)
throws IOException {
- releaseReservedSpace(bytesReservedForRbw);
+ releaseReservedSpace(bytesReserved);
return getBlockPoolSlice(bpid).addFinalizedBlock(b, f);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/92c1af16/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
index 5d1b31a..acbd8a8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
@@ -492,7 +492,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
}
@Override
- public void reserveSpaceForRbw(long bytesToReserve) {
+ public void reserveSpaceForReplica(long bytesToReserve) {
}
@Override
http://git-wip-us.apache.org/repos/asf/hadoop/blob/92c1af16/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java
index 9b942b7..baf50d8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java
@@ -612,7 +612,7 @@ public class TestDirectoryScanner {
}
@Override
- public void reserveSpaceForRbw(long bytesToReserve) {
+ public void reserveSpaceForReplica(long bytesToReserve) {
}
@Override
http://git-wip-us.apache.org/repos/asf/hadoop/blob/92c1af16/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalVolumeImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalVolumeImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalVolumeImpl.java
index 3242ff7..985a259 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalVolumeImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalVolumeImpl.java
@@ -74,7 +74,7 @@ public class ExternalVolumeImpl implements FsVolumeSpi {
}
@Override
- public void reserveSpaceForRbw(long bytesToReserve) {
+ public void reserveSpaceForReplica(long bytesToReserve) {
}
@Override
http://git-wip-us.apache.org/repos/asf/hadoop/blob/92c1af16/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestRbwSpaceReservation.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestRbwSpaceReservation.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestRbwSpaceReservation.java
deleted file mode 100644
index a647d96..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestRbwSpaceReservation.java
+++ /dev/null
@@ -1,452 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
-
-import com.google.common.base.Supplier;
-import org.apache.commons.io.IOUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.commons.logging.impl.Log4JLogger;
-import org.apache.hadoop.conf.Configuration;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
-import static org.hamcrest.core.Is.is;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.*;
-import org.apache.hadoop.hdfs.protocol.Block;
-import org.apache.hadoop.hdfs.server.datanode.DataNode;
-import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
-import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
-import org.apache.hadoop.ipc.RemoteException;
-import org.apache.hadoop.test.GenericTestUtils;
-import org.apache.hadoop.util.Daemon;
-import org.apache.log4j.Level;
-import org.junit.After;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.mockito.Mockito;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.lang.management.ManagementFactory;
-import java.lang.reflect.Field;
-import java.util.Map;
-import java.util.Random;
-import java.util.concurrent.TimeoutException;
-
-import javax.management.MBeanServer;
-import javax.management.ObjectName;
-
-/**
- * Ensure that the DN reserves disk space equivalent to a full block for
- * replica being written (RBW).
- */
-public class TestRbwSpaceReservation {
- static final Log LOG = LogFactory.getLog(TestRbwSpaceReservation.class);
-
- private static final int DU_REFRESH_INTERVAL_MSEC = 500;
- private static final int STORAGES_PER_DATANODE = 1;
- private static final int BLOCK_SIZE = 1024 * 1024;
- private static final int SMALL_BLOCK_SIZE = 1024;
-
- protected MiniDFSCluster cluster;
- private Configuration conf;
- private DistributedFileSystem fs = null;
- private DFSClient client = null;
- FsVolumeReference singletonVolumeRef = null;
- FsVolumeImpl singletonVolume = null;
-
- private static Random rand = new Random();
-
- private void initConfig(int blockSize) {
- conf = new HdfsConfiguration();
-
- // Refresh disk usage information frequently.
- conf.setInt(FS_DU_INTERVAL_KEY, DU_REFRESH_INTERVAL_MSEC);
- conf.setLong(DFS_BLOCK_SIZE_KEY, blockSize);
-
- // Disable the scanner
- conf.setInt(DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, -1);
- }
-
- static {
- ((Log4JLogger) FsDatasetImpl.LOG).getLogger().setLevel(Level.ALL);
- ((Log4JLogger) DataNode.LOG).getLogger().setLevel(Level.ALL);
- }
-
- /**
- *
- * @param blockSize
- * @param perVolumeCapacity limit the capacity of each volume to the given
- * value. If negative, then don't limit.
- * @throws IOException
- */
- private void startCluster(int blockSize, int numDatanodes, long perVolumeCapacity) throws IOException {
- initConfig(blockSize);
-
- cluster = new MiniDFSCluster
- .Builder(conf)
- .storagesPerDatanode(STORAGES_PER_DATANODE)
- .numDataNodes(numDatanodes)
- .build();
- fs = cluster.getFileSystem();
- client = fs.getClient();
- cluster.waitActive();
-
- if (perVolumeCapacity >= 0) {
- try (FsDatasetSpi.FsVolumeReferences volumes =
- cluster.getDataNodes().get(0).getFSDataset().getFsVolumeReferences()) {
- singletonVolumeRef = volumes.get(0).obtainReference();
- }
- singletonVolume = ((FsVolumeImpl) singletonVolumeRef.getVolume());
- singletonVolume.setCapacityForTesting(perVolumeCapacity);
- }
- }
-
- @After
- public void shutdownCluster() throws IOException {
- if (singletonVolumeRef != null) {
- singletonVolumeRef.close();
- singletonVolumeRef = null;
- }
-
- if (client != null) {
- client.close();
- client = null;
- }
-
- if (fs != null) {
- fs.close();
- fs = null;
- }
-
- if (cluster != null) {
- cluster.shutdown();
- cluster = null;
- }
- }
-
- private void createFileAndTestSpaceReservation(
- final String fileNamePrefix, final int fileBlockSize)
- throws IOException, InterruptedException {
- // Enough for 1 block + meta files + some delta.
- final long configuredCapacity = fileBlockSize * 2 - 1;
- startCluster(BLOCK_SIZE, 1, configuredCapacity);
- FSDataOutputStream out = null;
- Path path = new Path("/" + fileNamePrefix + ".dat");
-
- try {
- out = fs.create(path, false, 4096, (short) 1, fileBlockSize);
-
- byte[] buffer = new byte[rand.nextInt(fileBlockSize / 4)];
- out.write(buffer);
- out.hsync();
- int bytesWritten = buffer.length;
-
- // Check that space was reserved for a full block minus the bytesWritten.
- assertThat(singletonVolume.getReservedForRbw(),
- is((long) fileBlockSize - bytesWritten));
- out.close();
- out = null;
-
- // Check that the reserved space has been released since we closed the
- // file.
- assertThat(singletonVolume.getReservedForRbw(), is(0L));
-
- // Reopen the file for appends and write 1 more byte.
- out = fs.append(path);
- out.write(buffer);
- out.hsync();
- bytesWritten += buffer.length;
-
- // Check that space was again reserved for a full block minus the
- // bytesWritten so far.
- assertThat(singletonVolume.getReservedForRbw(),
- is((long) fileBlockSize - bytesWritten));
-
- // Write once again and again verify the available space. This ensures
- // that the reserved space is progressively adjusted to account for bytes
- // written to disk.
- out.write(buffer);
- out.hsync();
- bytesWritten += buffer.length;
- assertThat(singletonVolume.getReservedForRbw(),
- is((long) fileBlockSize - bytesWritten));
- } finally {
- if (out != null) {
- out.close();
- }
- }
- }
-
- @Test (timeout=300000)
- public void testWithDefaultBlockSize()
- throws IOException, InterruptedException {
- createFileAndTestSpaceReservation(GenericTestUtils.getMethodName(), BLOCK_SIZE);
- }
-
- @Test (timeout=300000)
- public void testWithNonDefaultBlockSize()
- throws IOException, InterruptedException {
- // Same test as previous one, but with a non-default block size.
- createFileAndTestSpaceReservation(GenericTestUtils.getMethodName(), BLOCK_SIZE * 2);
- }
-
- @Rule
- public ExpectedException thrown = ExpectedException.none();
-
- @Test (timeout=300000)
- public void testWithLimitedSpace() throws IOException {
- // Cluster with just enough space for a full block + meta.
- startCluster(BLOCK_SIZE, 1, 2 * BLOCK_SIZE - 1);
- final String methodName = GenericTestUtils.getMethodName();
- Path file1 = new Path("/" + methodName + ".01.dat");
- Path file2 = new Path("/" + methodName + ".02.dat");
-
- // Create two files.
- FSDataOutputStream os1 = null, os2 = null;
-
- try {
- os1 = fs.create(file1);
- os2 = fs.create(file2);
-
- // Write one byte to the first file.
- byte[] data = new byte[1];
- os1.write(data);
- os1.hsync();
-
- // Try to write one byte to the second file.
- // The block allocation must fail.
- thrown.expect(RemoteException.class);
- os2.write(data);
- os2.hsync();
- } finally {
- if (os1 != null) {
- os1.close();
- }
-
- // os2.close() will fail as no block was allocated.
- }
- }
-
- /**
- * Ensure that reserved space is released when the client goes away
- * unexpectedly.
- *
- * The verification is done for each replica in the write pipeline.
- *
- * @throws IOException
- */
- @Test(timeout=300000)
- public void testSpaceReleasedOnUnexpectedEof()
- throws IOException, InterruptedException, TimeoutException {
- final short replication = 3;
- startCluster(BLOCK_SIZE, replication, -1);
-
- final String methodName = GenericTestUtils.getMethodName();
- final Path file = new Path("/" + methodName + ".01.dat");
-
- // Write 1 byte to the file and kill the writer.
- FSDataOutputStream os = fs.create(file, replication);
- os.write(new byte[1]);
- os.hsync();
- DFSTestUtil.abortStream((DFSOutputStream) os.getWrappedStream());
-
- // Ensure all space reserved for the replica was released on each
- // DataNode.
- for (DataNode dn : cluster.getDataNodes()) {
- try (FsDatasetSpi.FsVolumeReferences volumes =
- dn.getFSDataset().getFsVolumeReferences()) {
- final FsVolumeImpl volume = (FsVolumeImpl) volumes.get(0);
- GenericTestUtils.waitFor(new Supplier<Boolean>() {
- @Override
- public Boolean get() {
- return (volume.getReservedForRbw() == 0);
- }
- }, 500, Integer.MAX_VALUE); // Wait until the test times out.
- }
- }
- }
-
- @SuppressWarnings("unchecked")
- @Test(timeout = 30000)
- public void testRBWFileCreationError() throws Exception {
-
- final short replication = 1;
- startCluster(BLOCK_SIZE, replication, -1);
-
- final FsVolumeImpl fsVolumeImpl = (FsVolumeImpl) cluster.getDataNodes()
- .get(0).getFSDataset().getFsVolumeReferences().get(0);
- final String methodName = GenericTestUtils.getMethodName();
- final Path file = new Path("/" + methodName + ".01.dat");
-
- // Mock BlockPoolSlice so that RBW file creation gives IOExcception
- BlockPoolSlice blockPoolSlice = Mockito.mock(BlockPoolSlice.class);
- Mockito.when(blockPoolSlice.createRbwFile((Block) Mockito.any()))
- .thenThrow(new IOException("Synthetic IO Exception Throgh MOCK"));
-
- Field field = FsVolumeImpl.class.getDeclaredField("bpSlices");
- field.setAccessible(true);
- Map<String, BlockPoolSlice> bpSlices = (Map<String, BlockPoolSlice>) field
- .get(fsVolumeImpl);
- bpSlices.put(fsVolumeImpl.getBlockPoolList()[0], blockPoolSlice);
-
- try {
- // Write 1 byte to the file
- FSDataOutputStream os = fs.create(file, replication);
- os.write(new byte[1]);
- os.hsync();
- os.close();
- fail("Expecting IOException file creation failure");
- } catch (IOException e) {
- // Exception can be ignored (expected)
- }
-
- // Ensure RBW space reserved is released
- assertTrue("Expected ZERO but got " + fsVolumeImpl.getReservedForRbw(),
- fsVolumeImpl.getReservedForRbw() == 0);
- }
-
- @Test(timeout = 30000)
- public void testRBWInJMXBean() throws Exception {
-
- final short replication = 1;
- startCluster(BLOCK_SIZE, replication, -1);
-
- final String methodName = GenericTestUtils.getMethodName();
- final Path file = new Path("/" + methodName + ".01.dat");
-
- try (FSDataOutputStream os = fs.create(file, replication)) {
- // Write 1 byte to the file
- os.write(new byte[1]);
- os.hsync();
-
- final MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
- final ObjectName mxbeanName = new ObjectName(
- "Hadoop:service=DataNode,name=DataNodeInfo");
- final String volumeInfo = (String) mbs.getAttribute(mxbeanName,
- "VolumeInfo");
-
- assertTrue(volumeInfo.contains("reservedSpaceForRBW"));
- }
- }
-
- /**
- * Stress test to ensure we are not leaking reserved space.
- * @throws IOException
- * @throws InterruptedException
- */
- @Test (timeout=600000)
- public void stressTest() throws IOException, InterruptedException {
- final int numWriters = 5;
- startCluster(SMALL_BLOCK_SIZE, 1, SMALL_BLOCK_SIZE * numWriters * 10);
- Writer[] writers = new Writer[numWriters];
-
- // Start a few writers and let them run for a while.
- for (int i = 0; i < numWriters; ++i) {
- writers[i] = new Writer(client, SMALL_BLOCK_SIZE);
- writers[i].start();
- }
-
- Thread.sleep(60000);
-
- // Stop the writers.
- for (Writer w : writers) {
- w.stopWriter();
- }
- int filesCreated = 0;
- int numFailures = 0;
- for (Writer w : writers) {
- w.join();
- filesCreated += w.getFilesCreated();
- numFailures += w.getNumFailures();
- }
-
- LOG.info("Stress test created " + filesCreated +
- " files and hit " + numFailures + " failures");
-
- // Check no space was leaked.
- assertThat(singletonVolume.getReservedForRbw(), is(0L));
- }
-
- private static class Writer extends Daemon {
- private volatile boolean keepRunning;
- private final DFSClient localClient;
- private int filesCreated = 0;
- private int numFailures = 0;
- byte[] data;
-
- Writer(DFSClient client, int blockSize) throws IOException {
- localClient = client;
- keepRunning = true;
- filesCreated = 0;
- numFailures = 0;
-
- // At least some of the files should span a block boundary.
- data = new byte[blockSize * 2];
- }
-
- @Override
- public void run() {
- /**
- * Create a file, write up to 3 blocks of data and close the file.
- * Do this in a loop until we are told to stop.
- */
- while (keepRunning) {
- OutputStream os = null;
- try {
- String filename = "/file-" + rand.nextLong();
- os = localClient.create(filename, false);
- os.write(data, 0, rand.nextInt(data.length));
- IOUtils.closeQuietly(os);
- os = null;
- localClient.delete(filename, false);
- Thread.sleep(50); // Sleep for a bit to avoid killing the system.
- ++filesCreated;
- } catch (IOException ioe) {
- // Just ignore the exception and keep going.
- ++numFailures;
- } catch (InterruptedException ie) {
- return;
- } finally {
- if (os != null) {
- IOUtils.closeQuietly(os);
- }
- }
- }
- }
-
- public void stopWriter() {
- keepRunning = false;
- }
-
- public int getFilesCreated() {
- return filesCreated;
- }
-
- public int getNumFailures() {
- return numFailures;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/92c1af16/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestSpaceReservation.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestSpaceReservation.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestSpaceReservation.java
new file mode 100644
index 0000000..c494288
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestSpaceReservation.java
@@ -0,0 +1,576 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
+
+import com.google.common.base.Supplier;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.logging.impl.Log4JLogger;
+import org.apache.hadoop.conf.Configuration;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.*;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.util.Daemon;
+import org.apache.log4j.Level;
+import org.junit.After;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.mockito.Mockito;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.lang.management.ManagementFactory;
+import java.lang.reflect.Field;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.TimeoutException;
+
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+
+/**
+ * Ensure that the DN reserves disk space equivalent to a full block for
+ * replica being written (RBW) & Replica being copied from another DN.
+ */
+public class TestSpaceReservation {
+ static final Log LOG = LogFactory.getLog(TestSpaceReservation.class);
+
+ private static final int DU_REFRESH_INTERVAL_MSEC = 500;
+ private static final int STORAGES_PER_DATANODE = 1;
+ private static final int BLOCK_SIZE = 1024 * 1024;
+ private static final int SMALL_BLOCK_SIZE = 1024;
+
+ protected MiniDFSCluster cluster;
+ private Configuration conf;
+ private DistributedFileSystem fs = null;
+ private DFSClient client = null;
+ FsVolumeReference singletonVolumeRef = null;
+ FsVolumeImpl singletonVolume = null;
+
+ private static Random rand = new Random();
+
+ private void initConfig(int blockSize) {
+ conf = new HdfsConfiguration();
+
+ // Refresh disk usage information frequently.
+ conf.setInt(FS_DU_INTERVAL_KEY, DU_REFRESH_INTERVAL_MSEC);
+ conf.setLong(DFS_BLOCK_SIZE_KEY, blockSize);
+
+ // Disable the scanner
+ conf.setInt(DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, -1);
+ }
+
+ static {
+ ((Log4JLogger) FsDatasetImpl.LOG).getLogger().setLevel(Level.ALL);
+ ((Log4JLogger) DataNode.LOG).getLogger().setLevel(Level.ALL);
+ }
+
+ /**
+ *
+ * @param blockSize
+ * @param perVolumeCapacity limit the capacity of each volume to the given
+ * value. If negative, then don't limit.
+ * @throws IOException
+ */
+ private void startCluster(int blockSize, int numDatanodes, long perVolumeCapacity) throws IOException {
+ initConfig(blockSize);
+
+ cluster = new MiniDFSCluster
+ .Builder(conf)
+ .storagesPerDatanode(STORAGES_PER_DATANODE)
+ .numDataNodes(numDatanodes)
+ .build();
+ fs = cluster.getFileSystem();
+ client = fs.getClient();
+ cluster.waitActive();
+
+ if (perVolumeCapacity >= 0) {
+ try (FsDatasetSpi.FsVolumeReferences volumes =
+ cluster.getDataNodes().get(0).getFSDataset().getFsVolumeReferences()) {
+ singletonVolumeRef = volumes.get(0).obtainReference();
+ }
+ singletonVolume = ((FsVolumeImpl) singletonVolumeRef.getVolume());
+ singletonVolume.setCapacityForTesting(perVolumeCapacity);
+ }
+ }
+
+ @After
+ public void shutdownCluster() throws IOException {
+ if (singletonVolumeRef != null) {
+ singletonVolumeRef.close();
+ singletonVolumeRef = null;
+ }
+
+ if (client != null) {
+ client.close();
+ client = null;
+ }
+
+ if (fs != null) {
+ fs.close();
+ fs = null;
+ }
+
+ if (cluster != null) {
+ cluster.shutdown();
+ cluster = null;
+ }
+ }
+
+ private void createFileAndTestSpaceReservation(
+ final String fileNamePrefix, final int fileBlockSize)
+ throws IOException, InterruptedException {
+ // Enough for 1 block + meta files + some delta.
+ final long configuredCapacity = fileBlockSize * 2 - 1;
+ startCluster(BLOCK_SIZE, 1, configuredCapacity);
+ FSDataOutputStream out = null;
+ Path path = new Path("/" + fileNamePrefix + ".dat");
+
+ try {
+ out = fs.create(path, false, 4096, (short) 1, fileBlockSize);
+
+ byte[] buffer = new byte[rand.nextInt(fileBlockSize / 4)];
+ out.write(buffer);
+ out.hsync();
+ int bytesWritten = buffer.length;
+
+ // Check that space was reserved for a full block minus the bytesWritten.
+ assertThat(singletonVolume.getReservedForReplicas(),
+ is((long) fileBlockSize - bytesWritten));
+ out.close();
+ out = null;
+
+ // Check that the reserved space has been released since we closed the
+ // file.
+ assertThat(singletonVolume.getReservedForReplicas(), is(0L));
+
+ // Reopen the file for appends and write 1 more byte.
+ out = fs.append(path);
+ out.write(buffer);
+ out.hsync();
+ bytesWritten += buffer.length;
+
+ // Check that space was again reserved for a full block minus the
+ // bytesWritten so far.
+ assertThat(singletonVolume.getReservedForReplicas(),
+ is((long) fileBlockSize - bytesWritten));
+
+ // Write once again and again verify the available space. This ensures
+ // that the reserved space is progressively adjusted to account for bytes
+ // written to disk.
+ out.write(buffer);
+ out.hsync();
+ bytesWritten += buffer.length;
+ assertThat(singletonVolume.getReservedForReplicas(),
+ is((long) fileBlockSize - bytesWritten));
+ } finally {
+ if (out != null) {
+ out.close();
+ }
+ }
+ }
+
+ @Test (timeout=300000)
+ public void testWithDefaultBlockSize()
+ throws IOException, InterruptedException {
+ createFileAndTestSpaceReservation(GenericTestUtils.getMethodName(), BLOCK_SIZE);
+ }
+
+ @Test (timeout=300000)
+ public void testWithNonDefaultBlockSize()
+ throws IOException, InterruptedException {
+ // Same test as previous one, but with a non-default block size.
+ createFileAndTestSpaceReservation(GenericTestUtils.getMethodName(), BLOCK_SIZE * 2);
+ }
+
+ @Rule
+ public ExpectedException thrown = ExpectedException.none();
+
+ @Test (timeout=300000)
+ public void testWithLimitedSpace() throws IOException {
+ // Cluster with just enough space for a full block + meta.
+ startCluster(BLOCK_SIZE, 1, 2 * BLOCK_SIZE - 1);
+ final String methodName = GenericTestUtils.getMethodName();
+ Path file1 = new Path("/" + methodName + ".01.dat");
+ Path file2 = new Path("/" + methodName + ".02.dat");
+
+ // Create two files.
+ FSDataOutputStream os1 = null, os2 = null;
+
+ try {
+ os1 = fs.create(file1);
+ os2 = fs.create(file2);
+
+ // Write one byte to the first file.
+ byte[] data = new byte[1];
+ os1.write(data);
+ os1.hsync();
+
+ // Try to write one byte to the second file.
+ // The block allocation must fail.
+ thrown.expect(RemoteException.class);
+ os2.write(data);
+ os2.hsync();
+ } finally {
+ if (os1 != null) {
+ os1.close();
+ }
+
+ // os2.close() will fail as no block was allocated.
+ }
+ }
+
+ /**
+ * Ensure that reserved space is released when the client goes away
+ * unexpectedly.
+ *
+ * The verification is done for each replica in the write pipeline.
+ *
+ * @throws IOException
+ */
+ @Test(timeout=300000)
+ public void testSpaceReleasedOnUnexpectedEof()
+ throws IOException, InterruptedException, TimeoutException {
+ final short replication = 3;
+ startCluster(BLOCK_SIZE, replication, -1);
+
+ final String methodName = GenericTestUtils.getMethodName();
+ final Path file = new Path("/" + methodName + ".01.dat");
+
+ // Write 1 byte to the file and kill the writer.
+ FSDataOutputStream os = fs.create(file, replication);
+ os.write(new byte[1]);
+ os.hsync();
+ DFSTestUtil.abortStream((DFSOutputStream) os.getWrappedStream());
+
+ // Ensure all space reserved for the replica was released on each
+ // DataNode.
+ for (DataNode dn : cluster.getDataNodes()) {
+ try (FsDatasetSpi.FsVolumeReferences volumes =
+ dn.getFSDataset().getFsVolumeReferences()) {
+ final FsVolumeImpl volume = (FsVolumeImpl) volumes.get(0);
+ GenericTestUtils.waitFor(new Supplier<Boolean>() {
+ @Override
+ public Boolean get() {
+ return (volume.getReservedForReplicas() == 0);
+ }
+ }, 500, Integer.MAX_VALUE); // Wait until the test times out.
+ }
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test(timeout = 30000)
+ public void testRBWFileCreationError() throws Exception {
+
+ final short replication = 1;
+ startCluster(BLOCK_SIZE, replication, -1);
+
+ final FsVolumeImpl fsVolumeImpl = (FsVolumeImpl) cluster.getDataNodes()
+ .get(0).getFSDataset().getFsVolumeReferences().get(0);
+ final String methodName = GenericTestUtils.getMethodName();
+ final Path file = new Path("/" + methodName + ".01.dat");
+
+ // Mock BlockPoolSlice so that RBW file creation gives IOExcception
+ BlockPoolSlice blockPoolSlice = Mockito.mock(BlockPoolSlice.class);
+ Mockito.when(blockPoolSlice.createRbwFile((Block) Mockito.any()))
+ .thenThrow(new IOException("Synthetic IO Exception Throgh MOCK"));
+
+ Field field = FsVolumeImpl.class.getDeclaredField("bpSlices");
+ field.setAccessible(true);
+ Map<String, BlockPoolSlice> bpSlices = (Map<String, BlockPoolSlice>) field
+ .get(fsVolumeImpl);
+ bpSlices.put(fsVolumeImpl.getBlockPoolList()[0], blockPoolSlice);
+
+ try {
+ // Write 1 byte to the file
+ FSDataOutputStream os = fs.create(file, replication);
+ os.write(new byte[1]);
+ os.hsync();
+ os.close();
+ fail("Expecting IOException file creation failure");
+ } catch (IOException e) {
+ // Exception can be ignored (expected)
+ }
+
+ // Ensure RBW space reserved is released
+ assertTrue(
+ "Expected ZERO but got " + fsVolumeImpl.getReservedForReplicas(),
+ fsVolumeImpl.getReservedForReplicas() == 0);
+
+ // Reserve some bytes to verify double clearing space should't happen
+ fsVolumeImpl.reserveSpaceForReplica(1000);
+ try {
+ // Write 1 byte to the file
+ FSDataOutputStream os = fs.create(new Path("/" + methodName + ".02.dat"),
+ replication);
+ os.write(new byte[1]);
+ os.hsync();
+ os.close();
+ fail("Expecting IOException file creation failure");
+ } catch (IOException e) {
+ // Exception can be ignored (expected)
+ }
+
+ // Ensure RBW space reserved is released only once
+ assertTrue(fsVolumeImpl.getReservedForReplicas() == 1000);
+ }
+
+ @Test(timeout = 30000)
+ public void testReservedSpaceInJMXBean() throws Exception {
+
+ final short replication = 1;
+ startCluster(BLOCK_SIZE, replication, -1);
+
+ final String methodName = GenericTestUtils.getMethodName();
+ final Path file = new Path("/" + methodName + ".01.dat");
+
+ try (FSDataOutputStream os = fs.create(file, replication)) {
+ // Write 1 byte to the file
+ os.write(new byte[1]);
+ os.hsync();
+
+ final MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+ final ObjectName mxbeanName = new ObjectName(
+ "Hadoop:service=DataNode,name=DataNodeInfo");
+ final String volumeInfo = (String) mbs.getAttribute(mxbeanName,
+ "VolumeInfo");
+
+ // verify reserved space for Replicas in JMX bean volume info
+ assertTrue(volumeInfo.contains("reservedSpaceForReplicas"));
+ }
+ }
+
+ @Test(timeout = 300000)
+ public void testTmpSpaceReserve() throws Exception {
+
+ final short replication = 2;
+ startCluster(BLOCK_SIZE, replication, -1);
+ final int byteCount1 = 100;
+ final int byteCount2 = 200;
+
+ final String methodName = GenericTestUtils.getMethodName();
+
+ // Test positive scenario
+ {
+ final Path file = new Path("/" + methodName + ".01.dat");
+
+ try (FSDataOutputStream os = fs.create(file, (short) 1)) {
+ // Write test data to the file
+ os.write(new byte[byteCount1]);
+ os.hsync();
+ }
+
+ BlockLocation[] blockLocations = fs.getFileBlockLocations(file, 0, 10);
+ String firstReplicaNode = blockLocations[0].getNames()[0];
+
+ int newReplicaDNIndex = 0;
+ if (firstReplicaNode.equals(cluster.getDataNodes().get(0)
+ .getDisplayName())) {
+ newReplicaDNIndex = 1;
+ }
+
+ FsVolumeImpl fsVolumeImpl = (FsVolumeImpl) cluster.getDataNodes()
+ .get(newReplicaDNIndex).getFSDataset().getFsVolumeReferences().get(0);
+
+ performReReplication(file, true);
+
+ assertEquals("Wrong reserve space for Tmp ", byteCount1,
+ fsVolumeImpl.getRecentReserved());
+
+ assertEquals("Reserved Tmp space is not released", 0,
+ fsVolumeImpl.getReservedForReplicas());
+ }
+
+ // Test when file creation fails
+ {
+ final Path file = new Path("/" + methodName + ".01.dat");
+
+ try (FSDataOutputStream os = fs.create(file, (short) 1)) {
+ // Write test data to the file
+ os.write(new byte[byteCount2]);
+ os.hsync();
+ }
+
+ BlockLocation[] blockLocations = fs.getFileBlockLocations(file, 0, 10);
+ String firstReplicaNode = blockLocations[0].getNames()[0];
+
+ int newReplicaDNIndex = 0;
+ if (firstReplicaNode.equals(cluster.getDataNodes().get(0)
+ .getDisplayName())) {
+ newReplicaDNIndex = 1;
+ }
+
+ BlockPoolSlice blockPoolSlice = Mockito.mock(BlockPoolSlice.class);
+ Mockito.when(blockPoolSlice.createTmpFile((Block) Mockito.any()))
+ .thenThrow(new IOException("Synthetic IO Exception Throgh MOCK"));
+
+ final FsVolumeImpl fsVolumeImpl = (FsVolumeImpl) cluster.getDataNodes()
+ .get(newReplicaDNIndex).getFSDataset().getFsVolumeReferences().get(0);
+
+ // Reserve some bytes to verify double clearing space should't happen
+ fsVolumeImpl.reserveSpaceForReplica(1000);
+
+ Field field = FsVolumeImpl.class.getDeclaredField("bpSlices");
+ field.setAccessible(true);
+ @SuppressWarnings("unchecked")
+ Map<String, BlockPoolSlice> bpSlices = (Map<String, BlockPoolSlice>) field
+ .get(fsVolumeImpl);
+ bpSlices.put(fsVolumeImpl.getBlockPoolList()[0], blockPoolSlice);
+
+ performReReplication(file, false);
+
+ assertEquals("Wrong reserve space for Tmp ", byteCount2,
+ fsVolumeImpl.getRecentReserved());
+
+ assertEquals("Tmp space is not released OR released twice", 1000,
+ fsVolumeImpl.getReservedForReplicas());
+ }
+ }
+
+ private void performReReplication(Path filePath, boolean waitForSuccess)
+ throws Exception {
+ fs.setReplication(filePath, (short) 2);
+
+ Thread.sleep(4000);
+ BlockLocation[] blockLocations = fs.getFileBlockLocations(filePath, 0, 10);
+
+ if (waitForSuccess) {
+ // Wait for the re replication
+ while (blockLocations[0].getNames().length < 2) {
+ Thread.sleep(2000);
+ blockLocations = fs.getFileBlockLocations(filePath, 0, 10);
+ }
+ }
+ }
+
+ /**
+ * Stress test to ensure we are not leaking reserved space.
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ @Test (timeout=600000)
+ public void stressTest() throws IOException, InterruptedException {
+ final int numWriters = 5;
+ startCluster(SMALL_BLOCK_SIZE, 1, SMALL_BLOCK_SIZE * numWriters * 10);
+ Writer[] writers = new Writer[numWriters];
+
+ // Start a few writers and let them run for a while.
+ for (int i = 0; i < numWriters; ++i) {
+ writers[i] = new Writer(client, SMALL_BLOCK_SIZE);
+ writers[i].start();
+ }
+
+ Thread.sleep(60000);
+
+ // Stop the writers.
+ for (Writer w : writers) {
+ w.stopWriter();
+ }
+ int filesCreated = 0;
+ int numFailures = 0;
+ for (Writer w : writers) {
+ w.join();
+ filesCreated += w.getFilesCreated();
+ numFailures += w.getNumFailures();
+ }
+
+ LOG.info("Stress test created " + filesCreated +
+ " files and hit " + numFailures + " failures");
+
+ // Check no space was leaked.
+ assertThat(singletonVolume.getReservedForReplicas(), is(0L));
+ }
+
+ private static class Writer extends Daemon {
+ private volatile boolean keepRunning;
+ private final DFSClient localClient;
+ private int filesCreated = 0;
+ private int numFailures = 0;
+ byte[] data;
+
+ Writer(DFSClient client, int blockSize) throws IOException {
+ localClient = client;
+ keepRunning = true;
+ filesCreated = 0;
+ numFailures = 0;
+
+ // At least some of the files should span a block boundary.
+ data = new byte[blockSize * 2];
+ }
+
+ @Override
+ public void run() {
+ /**
+ * Create a file, write up to 3 blocks of data and close the file.
+ * Do this in a loop until we are told to stop.
+ */
+ while (keepRunning) {
+ OutputStream os = null;
+ try {
+ String filename = "/file-" + rand.nextLong();
+ os = localClient.create(filename, false);
+ os.write(data, 0, rand.nextInt(data.length));
+ IOUtils.closeQuietly(os);
+ os = null;
+ localClient.delete(filename, false);
+ Thread.sleep(50); // Sleep for a bit to avoid killing the system.
+ ++filesCreated;
+ } catch (IOException ioe) {
+ // Just ignore the exception and keep going.
+ ++numFailures;
+ } catch (InterruptedException ie) {
+ return;
+ } finally {
+ if (os != null) {
+ IOUtils.closeQuietly(os);
+ }
+ }
+ }
+ }
+
+ public void stopWriter() {
+ keepRunning = false;
+ }
+
+ public int getFilesCreated() {
+ return filesCreated;
+ }
+
+ public int getNumFailures() {
+ return numFailures;
+ }
+ }
+}