You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ji...@apache.org on 2014/10/18 01:30:53 UTC
[06/34] git commit: HDFS-6925. DataNode should attempt to place
replicas on transient storage first if lazyPersist flag is received. (Arpit
Agarwal)
HDFS-6925. DataNode should attempt to place replicas on transient storage first if lazyPersist flag is received. (Arpit Agarwal)
Conflicts:
hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-6581.txt
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/34d0088b
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/34d0088b
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/34d0088b
Branch: refs/heads/branch-2.6
Commit: 34d0088bf9d793de888c8dd969a047dbae77cfb5
Parents: 8c3c0ec
Author: arp <ar...@apache.org>
Authored: Wed Aug 27 15:35:47 2014 -0700
Committer: Jitendra Pandey <Ji...@Jitendra-Pandeys-MacBook-Pro-4.local>
Committed: Fri Oct 17 16:00:49 2014 -0700
----------------------------------------------------------------------
.../hdfs/server/datanode/BlockReceiver.java | 5 +-
.../hdfs/server/datanode/DataXceiver.java | 6 +-
.../hdfs/server/datanode/DirectoryScanner.java | 2 +-
.../hadoop/hdfs/server/datanode/Replica.java | 5 ++
.../hdfs/server/datanode/ReplicaInfo.java | 30 ++--------
.../server/datanode/ReplicaUnderRecovery.java | 3 +-
.../AvailableSpaceVolumeChoosingPolicy.java | 2 +-
.../server/datanode/fsdataset/FsDatasetSpi.java | 4 +-
.../server/datanode/fsdataset/FsVolumeSpi.java | 5 +-
.../RoundRobinVolumeChoosingPolicy.java | 13 ++++-
.../datanode/fsdataset/impl/FsDatasetImpl.java | 49 ++++++++++++----
.../fsdataset/impl/FsTransientVolumeImpl.java | 60 ++++++++++++++++++++
.../datanode/fsdataset/impl/FsVolumeImpl.java | 12 +++-
.../fsdataset/impl/FsVolumeImplAllocator.java | 44 ++++++++++++++
.../datanode/fsdataset/impl/FsVolumeList.java | 20 ++++++-
.../hdfs/TestWriteBlockGetsBlockLengthHint.java | 5 +-
.../server/datanode/BlockReportTestBase.java | 2 +-
.../server/datanode/SimulatedFSDataset.java | 10 +++-
.../hdfs/server/datanode/TestBlockRecovery.java | 4 +-
.../server/datanode/TestDirectoryScanner.java | 6 +-
.../server/datanode/TestSimulatedFSDataset.java | 2 +-
.../fsdataset/impl/FsDatasetTestUtil.java | 2 +-
.../fsdataset/impl/TestWriteToReplica.java | 12 ++--
23 files changed, 231 insertions(+), 72 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/34d0088b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
index bfb2233..4d1cc6c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
@@ -139,7 +139,8 @@ class BlockReceiver implements Closeable {
final long newGs, final long minBytesRcvd, final long maxBytesRcvd,
final String clientname, final DatanodeInfo srcDataNode,
final DataNode datanode, DataChecksum requestedChecksum,
- CachingStrategy cachingStrategy) throws IOException {
+ CachingStrategy cachingStrategy,
+ final boolean allowLazyPersist) throws IOException {
try{
this.block = block;
this.in = in;
@@ -180,7 +181,7 @@ class BlockReceiver implements Closeable {
} else {
switch (stage) {
case PIPELINE_SETUP_CREATE:
- replicaInfo = datanode.data.createRbw(storageType, block);
+ replicaInfo = datanode.data.createRbw(storageType, block, allowLazyPersist);
datanode.notifyNamenodeReceivingBlock(
block, replicaInfo.getStorageUuid());
break;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/34d0088b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
index 3b8304e..67eb941 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
@@ -607,8 +607,8 @@ class DataXceiver extends Receiver implements Runnable {
peer.getLocalAddressString(),
stage, latestGenerationStamp, minBytesRcvd, maxBytesRcvd,
clientname, srcDataNode, datanode, requestedChecksum,
- cachingStrategy);
-
+ cachingStrategy, allowLazyPersist);
+
storageUuid = blockReceiver.getStorageUuid();
} else {
storageUuid = datanode.data.recoverClose(
@@ -1048,7 +1048,7 @@ class DataXceiver extends Receiver implements Runnable {
proxyReply, proxySock.getRemoteSocketAddress().toString(),
proxySock.getLocalSocketAddress().toString(),
null, 0, 0, 0, "", null, datanode, remoteChecksum,
- CachingStrategy.newDropBehind());
+ CachingStrategy.newDropBehind(), false);
// receive a block
blockReceiver.receiveBlock(null, null, replyOut, null,
http://git-wip-us.apache.org/repos/asf/hadoop/blob/34d0088b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java
index a47f2ef..c313b04 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java
@@ -399,7 +399,7 @@ public class DirectoryScanner implements Runnable {
/**
* Reconcile differences between disk and in-memory blocks
*/
- void reconcile() {
+ void reconcile() throws IOException {
scan();
for (Entry<String, LinkedList<ScanInfo>> entry : diffs.entrySet()) {
String bpid = entry.getKey();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/34d0088b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/Replica.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/Replica.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/Replica.java
index a480bb1..b6e5ba9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/Replica.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/Replica.java
@@ -59,4 +59,9 @@ public interface Replica {
* Return the storageUuid of the volume that stores this replica.
*/
public String getStorageUuid();
+
+ /**
+ * Return true if the target volume is backed by RAM.
+ */
+ public boolean isOnTransientStorage();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/34d0088b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java
index 49ac605..940d3eb 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java
@@ -62,17 +62,6 @@ abstract public class ReplicaInfo extends Block implements Replica {
private static final Map<String, File> internedBaseDirs = new HashMap<String, File>();
/**
- * Constructor for a zero length replica
- * @param blockId block id
- * @param genStamp replica generation stamp
- * @param vol volume where replica is located
- * @param dir directory path where block and meta files are located
- */
- ReplicaInfo(long blockId, long genStamp, FsVolumeSpi vol, File dir) {
- this( blockId, 0L, genStamp, vol, dir);
- }
-
- /**
* Constructor
* @param block a block
* @param vol volume where replica is located
@@ -296,20 +285,6 @@ abstract public class ReplicaInfo extends Block implements Replica {
return true;
}
- /**
- * Set this replica's generation stamp to be a newer one
- * @param newGS new generation stamp
- * @throws IOException is the new generation stamp is not greater than the current one
- */
- void setNewerGenerationStamp(long newGS) throws IOException {
- long curGS = getGenerationStamp();
- if (newGS <= curGS) {
- throw new IOException("New generation stamp (" + newGS
- + ") must be greater than current one (" + curGS + ")");
- }
- setGenerationStamp(newGS);
- }
-
@Override //Object
public String toString() {
return getClass().getSimpleName()
@@ -321,4 +296,9 @@ abstract public class ReplicaInfo extends Block implements Replica {
+ "\n getVolume() = " + getVolume()
+ "\n getBlockFile() = " + getBlockFile();
}
+
+ @Override
+ public boolean isOnTransientStorage() {
+ return volume.isTransientStorage();
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/34d0088b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaUnderRecovery.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaUnderRecovery.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaUnderRecovery.java
index 35f7c93..2cd8a01 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaUnderRecovery.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaUnderRecovery.java
@@ -37,8 +37,7 @@ public class ReplicaUnderRecovery extends ReplicaInfo {
// that the replica will be bumped to after recovery
public ReplicaUnderRecovery(ReplicaInfo replica, long recoveryId) {
- super(replica.getBlockId(), replica.getNumBytes(), replica.getGenerationStamp(),
- replica.getVolume(), replica.getDir());
+ super(replica, replica.getVolume(), replica.getDir());
if ( replica.getState() != ReplicaState.FINALIZED &&
replica.getState() != ReplicaState.RBW &&
replica.getState() != ReplicaState.RWR ) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/34d0088b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/AvailableSpaceVolumeChoosingPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/AvailableSpaceVolumeChoosingPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/AvailableSpaceVolumeChoosingPolicy.java
index ec19ec5..235cd7b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/AvailableSpaceVolumeChoosingPolicy.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/AvailableSpaceVolumeChoosingPolicy.java
@@ -99,7 +99,7 @@ public class AvailableSpaceVolumeChoosingPolicy<V extends FsVolumeSpi>
@Override
public synchronized V chooseVolume(List<V> volumes,
- final long replicaSize) throws IOException {
+ long replicaSize) throws IOException {
if (volumes.size() < 1) {
throw new DiskOutOfSpaceException("No more available volumes");
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/34d0088b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
index 4c03151..4a5580c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
@@ -122,7 +122,7 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
* as corrupted.
*/
public void checkAndUpdate(String bpid, long blockId, File diskFile,
- File diskMetaFile, FsVolumeSpi vol);
+ File diskMetaFile, FsVolumeSpi vol) throws IOException;
/**
* @param b - the block
@@ -197,7 +197,7 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
* @throws IOException if an error occurs
*/
public ReplicaInPipelineInterface createRbw(StorageType storageType,
- ExtendedBlock b) throws IOException;
+ ExtendedBlock b, boolean allowLazyPersist) throws IOException;
/**
* Recovers a RBW replica and returns the meta info of the replica
http://git-wip-us.apache.org/repos/asf/hadoop/blob/34d0088b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java
index cba23c3..4f45922 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java
@@ -56,4 +56,7 @@ public interface FsVolumeSpi {
* Release disk space previously reserved for RBW block.
*/
public void releaseReservedSpace(long bytesToRelease);
-}
\ No newline at end of file
+
+ /** Returns true if the volume is NOT backed by persistent storage. */
+ public boolean isTransientStorage();
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/34d0088b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/RoundRobinVolumeChoosingPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/RoundRobinVolumeChoosingPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/RoundRobinVolumeChoosingPolicy.java
index 7f4bdae..55a3560 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/RoundRobinVolumeChoosingPolicy.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/RoundRobinVolumeChoosingPolicy.java
@@ -20,6 +20,9 @@ package org.apache.hadoop.hdfs.server.datanode.fsdataset;
import java.io.IOException;
import java.util.List;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hdfs.StorageType;
import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
/**
@@ -27,12 +30,14 @@ import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
*/
public class RoundRobinVolumeChoosingPolicy<V extends FsVolumeSpi>
implements VolumeChoosingPolicy<V> {
+ public static final Log LOG = LogFactory.getLog(RoundRobinVolumeChoosingPolicy.class);
private int curVolume = 0;
@Override
- public synchronized V chooseVolume(final List<V> volumes, final long blockSize
- ) throws IOException {
+ public synchronized V chooseVolume(final List<V> volumes, long blockSize)
+ throws IOException {
+
if(volumes.size() < 1) {
throw new DiskOutOfSpaceException("No more available volumes");
}
@@ -50,7 +55,9 @@ public class RoundRobinVolumeChoosingPolicy<V extends FsVolumeSpi>
final V volume = volumes.get(curVolume);
curVolume = (curVolume + 1) % volumes.size();
long availableVolumeSize = volume.getAvailable();
- if (availableVolumeSize > blockSize) { return volume; }
+ if (availableVolumeSize > blockSize) {
+ return volume;
+ }
if (availableVolumeSize > maxAvailable) {
maxAvailable = availableVolumeSize;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/34d0088b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
index 57706f8..701207d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
@@ -45,6 +45,9 @@ import javax.management.ObjectName;
import javax.management.StandardMBean;
import com.google.common.collect.Lists;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.TreeMultimap;
+import org.apache.commons.io.FileUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
@@ -278,7 +281,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
// If IOException raises from FsVolumeImpl() or getVolumeMap(), there is
// nothing needed to be rolled back to make various data structures, e.g.,
// storageMap and asyncDiskService, consistent.
- FsVolumeImpl fsVolume = new FsVolumeImpl(
+ FsVolumeImpl fsVolume = FsVolumeImplAllocator.createVolume(
this, sd.getStorageUuid(), dir, this.conf, storageType);
ReplicaMap tempVolumeMap = new ReplicaMap(this);
fsVolume.getVolumeMap(tempVolumeMap);
@@ -550,16 +553,16 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
* Get File name for a given block.
*/
private File getBlockFile(ExtendedBlock b) throws IOException {
- return getBlockFile(b.getBlockPoolId(), b.getLocalBlock());
+ return getBlockFile(b.getBlockPoolId(), b.getBlockId());
}
/**
* Get File name for a given block.
*/
- File getBlockFile(String bpid, Block b) throws IOException {
- File f = validateBlockFile(bpid, b);
+ File getBlockFile(String bpid, long blockId) throws IOException {
+ File f = validateBlockFile(bpid, blockId);
if(f == null) {
- throw new IOException("Block " + b + " is not valid.");
+ throw new IOException("BlockId " + blockId + " is not valid.");
}
return f;
}
@@ -949,8 +952,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
@Override // FsDatasetSpi
public synchronized ReplicaInPipeline createRbw(StorageType storageType,
- ExtendedBlock b) throws IOException {
- ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(),
+ ExtendedBlock b, boolean allowLazyPersist) throws IOException {
+ ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(),
b.getBlockId());
if (replicaInfo != null) {
throw new ReplicaAlreadyExistsException("Block " + b +
@@ -958,8 +961,25 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
" and thus cannot be created.");
}
// create a new block
- FsVolumeImpl v = volumes.getNextVolume(storageType, b.getNumBytes());
- // create a rbw file to hold block in the designated volume
+ FsVolumeImpl v;
+ while (true) {
+ try {
+ if (allowLazyPersist) {
+ // First try to place the block on a transient volume.
+ v = volumes.getNextTransientVolume(b.getNumBytes());
+ } else {
+ v = volumes.getNextVolume(storageType, b.getNumBytes());
+ }
+ } catch (DiskOutOfSpaceException de) {
+ if (allowLazyPersist) {
+ allowLazyPersist = false;
+ continue;
+ }
+ throw de;
+ }
+ break;
+ }
+ // create an rbw file to hold block in the designated volume
File f = v.createRbwFile(b.getBlockPoolId(), b.getLocalBlock());
ReplicaBeingWritten newReplicaInfo = new ReplicaBeingWritten(b.getBlockId(),
b.getGenerationStamp(), v, f.getParentFile(), b.getNumBytes());
@@ -1321,11 +1341,11 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
/**
* Find the file corresponding to the block and return it if it exists.
*/
- File validateBlockFile(String bpid, Block b) {
+ File validateBlockFile(String bpid, long blockId) {
//Should we check for metadata file too?
final File f;
synchronized(this) {
- f = getFile(bpid, b.getBlockId());
+ f = getFile(bpid, blockId);
}
if(f != null ) {
@@ -1337,7 +1357,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
}
if (LOG.isDebugEnabled()) {
- LOG.debug("b=" + b + ", f=" + f);
+ LOG.debug("blockId=" + blockId + ", f=" + f);
}
return null;
}
@@ -1497,6 +1517,11 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
": volume was not an instance of FsVolumeImpl.");
return;
}
+ if (volume.isTransientStorage()) {
+ LOG.warn("Caching not supported on block with id " + blockId +
+ " since the volume is backed by RAM.");
+ return;
+ }
success = true;
} finally {
if (!success) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/34d0088b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsTransientVolumeImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsTransientVolumeImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsTransientVolumeImpl.java
new file mode 100644
index 0000000..dafa74f
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsTransientVolumeImpl.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.ThreadPoolExecutor;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.StorageType;
+
+/**
+ * Volume for storing replicas in memory. These can be deleted at any time
+ * to make space for new replicas and there is no persistence guarantee.
+ *
+ * The backing store for these replicas is expected to be RAM_DISK.
+ * The backing store may be disk when testing.
+ *
+ * It uses the {@link FsDatasetImpl} object for synchronization.
+ */
+@InterfaceAudience.Private
+@VisibleForTesting
+public class FsTransientVolumeImpl extends FsVolumeImpl {
+
+
+ FsTransientVolumeImpl(FsDatasetImpl dataset, String storageID, File currentDir,
+ Configuration conf, StorageType storageType)
+ throws IOException {
+ super(dataset, storageID, currentDir, conf, storageType);
+ }
+
+ @Override
+ protected ThreadPoolExecutor initializeCacheExecutor(File parent) {
+ // Can't 'cache' replicas already in RAM.
+ return null;
+ }
+
+ @Override
+ public boolean isTransientStorage() {
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/34d0088b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
index 40c6840..4c0b5f8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
@@ -77,7 +77,7 @@ public class FsVolumeImpl implements FsVolumeSpi {
* dfs.datanode.fsdatasetcache.max.threads.per.volume) to limit resource
* contention.
*/
- private final ThreadPoolExecutor cacheExecutor;
+ protected ThreadPoolExecutor cacheExecutor;
FsVolumeImpl(FsDatasetImpl dataset, String storageID, File currentDir,
Configuration conf, StorageType storageType) throws IOException {
@@ -202,6 +202,11 @@ public class FsVolumeImpl implements FsVolumeSpi {
}
@Override
+ public boolean isTransientStorage() {
+ return false;
+ }
+
+ @Override
public String getPath(String bpid) throws IOException {
return getBlockPoolSlice(bpid).getDirectory().getAbsolutePath();
}
@@ -324,7 +329,9 @@ public class FsVolumeImpl implements FsVolumeSpi {
}
void shutdown() {
- cacheExecutor.shutdown();
+ if (cacheExecutor != null) {
+ cacheExecutor.shutdown();
+ }
Set<Entry<String, BlockPoolSlice>> set = bpSlices.entrySet();
for (Entry<String, BlockPoolSlice> entry : set) {
entry.getValue().shutdown();
@@ -417,6 +424,5 @@ public class FsVolumeImpl implements FsVolumeSpi {
DatanodeStorage toDatanodeStorage() {
return new DatanodeStorage(storageID, DatanodeStorage.State.NORMAL, storageType);
}
-
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/34d0088b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImplAllocator.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImplAllocator.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImplAllocator.java
new file mode 100644
index 0000000..14d3aaf
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImplAllocator.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.StorageType;
+
+/**
+ * Generate volumes based on the storageType.
+ */
+@InterfaceAudience.Private
+class FsVolumeImplAllocator {
+ static FsVolumeImpl createVolume(FsDatasetImpl fsDataset, String storageUuid,
+ File dir, Configuration conf, StorageType storageType)
+ throws IOException {
+ switch(storageType) {
+ case RAM_DISK:
+ return new FsTransientVolumeImpl(
+ fsDataset, storageUuid, dir, conf, storageType);
+ default:
+ return new FsVolumeImpl(
+ fsDataset, storageUuid, dir, conf, storageType);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/34d0088b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java
index 90739c3..9fbc349 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java
@@ -68,7 +68,25 @@ class FsVolumeList {
}
return blockChooser.chooseVolume(list, blockSize);
}
-
+
+ /**
+ * Get next volume. Synchronized to ensure {@link #curVolume} is updated
+ * by a single thread and next volume is chosen with no concurrent
+ * update to {@link #volumes}.
+ * @param blockSize free space needed on the volume
+ * @return next volume to store the block in.
+ */
+ synchronized FsVolumeImpl getNextTransientVolume(
+ long blockSize) throws IOException {
+ final List<FsVolumeImpl> list = new ArrayList<FsVolumeImpl>(volumes.size());
+ for(FsVolumeImpl v : volumes) {
+ if (v.isTransientStorage()) {
+ list.add(v);
+ }
+ }
+ return blockChooser.chooseVolume(list, blockSize);
+ }
+
long getDfsUsed() throws IOException {
long dfsUsed = 0L;
for (FsVolumeImpl v : volumes) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/34d0088b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteBlockGetsBlockLengthHint.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteBlockGetsBlockLengthHint.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteBlockGetsBlockLengthHint.java
index 29ac3f2..b7fdccf 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteBlockGetsBlockLengthHint.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteBlockGetsBlockLengthHint.java
@@ -98,9 +98,10 @@ public class TestWriteBlockGetsBlockLengthHint {
*/
@Override
public synchronized ReplicaInPipelineInterface createRbw(
- StorageType storageType, ExtendedBlock b) throws IOException {
+ StorageType storageType, ExtendedBlock b, boolean allowLazyPersist)
+ throws IOException {
assertThat(b.getLocalBlock().getNumBytes(), is(EXPECTED_BLOCK_LENGTH));
- return super.createRbw(storageType, b);
+ return super.createRbw(storageType, b, allowLazyPersist);
}
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/34d0088b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/BlockReportTestBase.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/BlockReportTestBase.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/BlockReportTestBase.java
index 9ea6c51..e9557da 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/BlockReportTestBase.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/BlockReportTestBase.java
@@ -364,7 +364,7 @@ public abstract class BlockReportTestBase {
// Create a bogus new block which will not be present on the namenode.
ExtendedBlock b = new ExtendedBlock(
poolId, rand.nextLong(), 1024L, rand.nextLong());
- dn.getFSDataset().createRbw(StorageType.DEFAULT, b);
+ dn.getFSDataset().createRbw(StorageType.DEFAULT, b, false);
DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
StorageBlockReport[] reports = getBlockReports(dn, poolId, false, false);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/34d0088b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
index 83d93f0..eab599d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
@@ -300,6 +300,11 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
public ChunkChecksum getLastChecksumAndDataLen() {
return new ChunkChecksum(oStream.getLength(), null);
}
+
+ @Override
+ public boolean isOnTransientStorage() {
+ return false;
+ }
}
/**
@@ -747,7 +752,8 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
@Override // FsDatasetSpi
public synchronized ReplicaInPipelineInterface createRbw(
- StorageType storageType, ExtendedBlock b) throws IOException {
+ StorageType storageType, ExtendedBlock b,
+ boolean allowLazyPersist) throws IOException {
return createTemporary(storageType, b);
}
@@ -1083,7 +1089,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
@Override
public void checkAndUpdate(String bpid, long blockId, File diskFile,
- File diskMetaFile, FsVolumeSpi vol) {
+ File diskMetaFile, FsVolumeSpi vol) throws IOException {
throw new UnsupportedOperationException();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/34d0088b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
index f627c42..4dfc773 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
@@ -529,7 +529,7 @@ public class TestBlockRecovery {
if(LOG.isDebugEnabled()) {
LOG.debug("Running " + GenericTestUtils.getMethodName());
}
- dn.data.createRbw(StorageType.DEFAULT, block);
+ dn.data.createRbw(StorageType.DEFAULT, block, false);
try {
dn.syncBlock(rBlock, initBlockRecords(dn));
fail("Sync should fail");
@@ -553,7 +553,7 @@ public class TestBlockRecovery {
LOG.debug("Running " + GenericTestUtils.getMethodName());
}
ReplicaInPipelineInterface replicaInfo = dn.data.createRbw(
- StorageType.DEFAULT, block);
+ StorageType.DEFAULT, block, false);
ReplicaOutputStreams streams = null;
try {
streams = replicaInfo.createStreams(true,
http://git-wip-us.apache.org/repos/asf/hadoop/blob/34d0088b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java
index bc50eaa..78b9e2b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java
@@ -215,7 +215,7 @@ public class TestDirectoryScanner {
}
private void scan(long totalBlocks, int diffsize, long missingMetaFile, long missingBlockFile,
- long missingMemoryBlocks, long mismatchBlocks) {
+ long missingMemoryBlocks, long mismatchBlocks) throws IOException {
scanner.reconcile();
assertTrue(scanner.diffs.containsKey(bpid));
@@ -431,6 +431,10 @@ public class TestDirectoryScanner {
@Override
public void releaseReservedSpace(long bytesToRelease) {
+
+ @Override
+ public boolean isTransientStorage() {
+ return false;
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/34d0088b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java
index bd6c3de..099a0cd 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java
@@ -67,7 +67,7 @@ public class TestSimulatedFSDataset {
// we pass expected len as zero, - fsdataset should use the sizeof actual
// data written
ReplicaInPipelineInterface bInfo = fsdataset.createRbw(
- StorageType.DEFAULT, b);
+ StorageType.DEFAULT, b, false);
ReplicaOutputStreams out = bInfo.createStreams(true,
DataChecksum.newDataChecksum(DataChecksum.Type.CRC32, 512));
try {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/34d0088b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetTestUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetTestUtil.java
index 6bd36ed..d6d7dd7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetTestUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetTestUtil.java
@@ -34,7 +34,7 @@ public class FsDatasetTestUtil {
public static File getBlockFile(FsDatasetSpi<?> fsd, String bpid, Block b
) throws IOException {
- return ((FsDatasetImpl)fsd).getBlockFile(bpid, b);
+ return ((FsDatasetImpl)fsd).getBlockFile(bpid, b.getBlockId());
}
public static File getMetaFile(FsDatasetSpi<?> fsd, String bpid, Block b)
http://git-wip-us.apache.org/repos/asf/hadoop/blob/34d0088b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java
index a870aa9..60c6d03 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java
@@ -358,7 +358,7 @@ public class TestWriteToReplica {
}
try {
- dataSet.createRbw(StorageType.DEFAULT, blocks[FINALIZED]);
+ dataSet.createRbw(StorageType.DEFAULT, blocks[FINALIZED], false);
Assert.fail("Should not have created a replica that's already " +
"finalized " + blocks[FINALIZED]);
} catch (ReplicaAlreadyExistsException e) {
@@ -376,7 +376,7 @@ public class TestWriteToReplica {
}
try {
- dataSet.createRbw(StorageType.DEFAULT, blocks[TEMPORARY]);
+ dataSet.createRbw(StorageType.DEFAULT, blocks[TEMPORARY], false);
Assert.fail("Should not have created a replica that had created as " +
"temporary " + blocks[TEMPORARY]);
} catch (ReplicaAlreadyExistsException e) {
@@ -386,7 +386,7 @@ public class TestWriteToReplica {
0L, blocks[RBW].getNumBytes()); // expect to be successful
try {
- dataSet.createRbw(StorageType.DEFAULT, blocks[RBW]);
+ dataSet.createRbw(StorageType.DEFAULT, blocks[RBW], false);
Assert.fail("Should not have created a replica that had created as RBW " +
blocks[RBW]);
} catch (ReplicaAlreadyExistsException e) {
@@ -402,7 +402,7 @@ public class TestWriteToReplica {
}
try {
- dataSet.createRbw(StorageType.DEFAULT, blocks[RWR]);
+ dataSet.createRbw(StorageType.DEFAULT, blocks[RWR], false);
Assert.fail("Should not have created a replica that was waiting to be " +
"recovered " + blocks[RWR]);
} catch (ReplicaAlreadyExistsException e) {
@@ -418,7 +418,7 @@ public class TestWriteToReplica {
}
try {
- dataSet.createRbw(StorageType.DEFAULT, blocks[RUR]);
+ dataSet.createRbw(StorageType.DEFAULT, blocks[RUR], false);
Assert.fail("Should not have created a replica that was under recovery " +
blocks[RUR]);
} catch (ReplicaAlreadyExistsException e) {
@@ -435,7 +435,7 @@ public class TestWriteToReplica {
e.getMessage().contains(ReplicaNotFoundException.NON_EXISTENT_REPLICA));
}
- dataSet.createRbw(StorageType.DEFAULT, blocks[NON_EXISTENT]);
+ dataSet.createRbw(StorageType.DEFAULT, blocks[NON_EXISTENT], false);
}
private void testWriteToTemporary(FsDatasetImpl dataSet, ExtendedBlock[] blocks) throws IOException {