You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ar...@apache.org on 2015/04/08 20:39:20 UTC
[1/3] hadoop git commit: HDFS-8072. Reserved RBW space is not
released if client terminates while writing block. (Arpit Agarwal)
Repository: hadoop
Updated Branches:
refs/heads/branch-2 5f59e621b -> f0324738c
refs/heads/branch-2.7 ffa3f3a10 -> 12739b541
refs/heads/trunk ba9ee22ca -> 608c49984
HDFS-8072. Reserved RBW space is not released if client terminates while writing block. (Arpit Agarwal)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/608c4998
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/608c4998
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/608c4998
Branch: refs/heads/trunk
Commit: 608c4998419c18fd95019b28cc56b5bd5aa4cc01
Parents: ba9ee22
Author: Arpit Agarwal <ar...@apache.org>
Authored: Wed Apr 8 11:38:21 2015 -0700
Committer: Arpit Agarwal <ar...@apache.org>
Committed: Wed Apr 8 11:38:21 2015 -0700
----------------------------------------------------------------------
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 +
.../hdfs/server/datanode/BlockReceiver.java | 1 +
.../hdfs/server/datanode/ReplicaInPipeline.java | 6 ++
.../datanode/ReplicaInPipelineInterface.java | 5 ++
.../server/datanode/SimulatedFSDataset.java | 4 ++
.../extdataset/ExternalReplicaInPipeline.java | 4 ++
.../fsdataset/impl/TestRbwSpaceReservation.java | 67 +++++++++++++++++---
7 files changed, 81 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/608c4998/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 84e382a..91a16bc 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -1403,6 +1403,9 @@ Release 2.7.0 - UNRELEASED
HDFS-8038. PBImageDelimitedTextWriter#getEntry output HDFS path in
platform-specific format. (Xiaoyu Yao via cnauroth)
+ HDFS-8072. Reserved RBW space is not released if client terminates while
+ writing block. (Arpit Agarwal)
+
BREAKDOWN OF HDFS-7584 SUBTASKS AND RELATED JIRAS
HDFS-7720. Quota by Storage Type API, tools and ClientNameNode
http://git-wip-us.apache.org/repos/asf/hadoop/blob/608c4998/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
index 58cb8b1..c0be956 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
@@ -817,6 +817,7 @@ class BlockReceiver implements Closeable {
}
} catch (IOException ioe) {
+ replicaInfo.releaseAllBytesReserved();
if (datanode.isRestarting()) {
// Do not throw if shutting down for restart. Otherwise, it will cause
// premature termination of responder.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/608c4998/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java
index 6a26640..cc55f85 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java
@@ -148,6 +148,12 @@ public class ReplicaInPipeline extends ReplicaInfo
return bytesReserved;
}
+ @Override
+ public void releaseAllBytesReserved() { // ReplicaInPipelineInterface
+ getVolume().releaseReservedSpace(bytesReserved);
+ bytesReserved = 0;
+ }
+
@Override // ReplicaInPipelineInterface
public synchronized void setLastChecksumAndDataLen(long dataLength, byte[] lastChecksum) {
this.bytesOnDisk = dataLength;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/608c4998/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipelineInterface.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipelineInterface.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipelineInterface.java
index 7f08b81..0263d0f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipelineInterface.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipelineInterface.java
@@ -45,6 +45,11 @@ public interface ReplicaInPipelineInterface extends Replica {
void setBytesAcked(long bytesAcked);
/**
+ * Release any disk space reserved for this replica.
+ */
+ public void releaseAllBytesReserved();
+
+ /**
* store the checksum for the last chunk along with the data length
* @param dataLength number of bytes on disk
* @param lastChecksum - checksum bytes for the last chunk
http://git-wip-us.apache.org/repos/asf/hadoop/blob/608c4998/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
index 160a86c..a358e22 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
@@ -290,6 +290,10 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
}
@Override
+ public void releaseAllBytesReserved() {
+ }
+
+ @Override
synchronized public long getBytesOnDisk() {
if (finalized) {
return theBlock.getNumBytes();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/608c4998/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalReplicaInPipeline.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalReplicaInPipeline.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalReplicaInPipeline.java
index c3c0197..ad44500 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalReplicaInPipeline.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalReplicaInPipeline.java
@@ -41,6 +41,10 @@ public class ExternalReplicaInPipeline implements ReplicaInPipelineInterface {
}
@Override
+ public void releaseAllBytesReserved() {
+ }
+
+ @Override
public void setLastChecksumAndDataLen(long dataLength, byte[] lastChecksum) {
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/608c4998/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestRbwSpaceReservation.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestRbwSpaceReservation.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestRbwSpaceReservation.java
index 487f3ab..ebf2f3b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestRbwSpaceReservation.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestRbwSpaceReservation.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
+import com.google.common.base.Supplier;
import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -45,6 +46,7 @@ import java.io.IOException;
import java.io.OutputStream;
import java.util.List;
import java.util.Random;
+import java.util.concurrent.TimeoutException;
/**
* Ensure that the DN reserves disk space equivalent to a full block for
@@ -53,7 +55,6 @@ import java.util.Random;
public class TestRbwSpaceReservation {
static final Log LOG = LogFactory.getLog(TestRbwSpaceReservation.class);
- private static final short REPL_FACTOR = 1;
private static final int DU_REFRESH_INTERVAL_MSEC = 500;
private static final int STORAGES_PER_DATANODE = 1;
private static final int BLOCK_SIZE = 1024 * 1024;
@@ -83,25 +84,38 @@ public class TestRbwSpaceReservation {
((Log4JLogger) DataNode.LOG).getLogger().setLevel(Level.ALL);
}
- private void startCluster(int blockSize, long perVolumeCapacity) throws IOException {
+ /**
+ *
+ * @param blockSize
+ * @param perVolumeCapacity limit the capacity of each volume to the given
+ * value. If negative, then don't limit.
+ * @throws IOException
+ */
+ private void startCluster(int blockSize, int numDatanodes, long perVolumeCapacity) throws IOException {
initConfig(blockSize);
cluster = new MiniDFSCluster
.Builder(conf)
.storagesPerDatanode(STORAGES_PER_DATANODE)
- .numDataNodes(REPL_FACTOR)
+ .numDataNodes(numDatanodes)
.build();
fs = cluster.getFileSystem();
client = fs.getClient();
cluster.waitActive();
if (perVolumeCapacity >= 0) {
+ for (DataNode dn : cluster.getDataNodes()) {
+ for (FsVolumeSpi volume : dn.getFSDataset().getVolumes()) {
+ ((FsVolumeImpl) volume).setCapacityForTesting(perVolumeCapacity);
+ }
+ }
+ }
+
+ if (numDatanodes == 1) {
List<? extends FsVolumeSpi> volumes =
cluster.getDataNodes().get(0).getFSDataset().getVolumes();
-
assertThat(volumes.size(), is(1));
singletonVolume = ((FsVolumeImpl) volumes.get(0));
- singletonVolume.setCapacityForTesting(perVolumeCapacity);
}
}
@@ -128,7 +142,7 @@ public class TestRbwSpaceReservation {
throws IOException, InterruptedException {
// Enough for 1 block + meta files + some delta.
final long configuredCapacity = fileBlockSize * 2 - 1;
- startCluster(BLOCK_SIZE, configuredCapacity);
+ startCluster(BLOCK_SIZE, 1, configuredCapacity);
FSDataOutputStream out = null;
Path path = new Path("/" + fileNamePrefix + ".dat");
@@ -195,7 +209,7 @@ public class TestRbwSpaceReservation {
@Test (timeout=300000)
public void testWithLimitedSpace() throws IOException {
// Cluster with just enough space for a full block + meta.
- startCluster(BLOCK_SIZE, 2 * BLOCK_SIZE - 1);
+ startCluster(BLOCK_SIZE, 1, 2 * BLOCK_SIZE - 1);
final String methodName = GenericTestUtils.getMethodName();
Path file1 = new Path("/" + methodName + ".01.dat");
Path file2 = new Path("/" + methodName + ".02.dat");
@@ -208,7 +222,6 @@ public class TestRbwSpaceReservation {
os2 = fs.create(file2);
// Write one byte to the first file.
- LOG.info("arpit: writing first file");
byte[] data = new byte[1];
os1.write(data);
os1.hsync();
@@ -228,6 +241,42 @@ public class TestRbwSpaceReservation {
}
/**
+ * Ensure that reserved space is released when the client goes away
+ * unexpectedly.
+ *
+ * The verification is done for each replica in the write pipeline.
+ *
+ * @throws IOException
+ */
+ @Test(timeout=300000)
+ public void testSpaceReleasedOnUnexpectedEof()
+ throws IOException, InterruptedException, TimeoutException {
+ final short replication = 3;
+ startCluster(BLOCK_SIZE, replication, -1);
+
+ final String methodName = GenericTestUtils.getMethodName();
+ final Path file = new Path("/" + methodName + ".01.dat");
+
+ // Write 1 byte to the file and kill the writer.
+ FSDataOutputStream os = fs.create(file, replication);
+ os.write(new byte[1]);
+ os.hsync();
+ DFSTestUtil.abortStream((DFSOutputStream) os.getWrappedStream());
+
+ // Ensure all space reserved for the replica was released on each
+ // DataNode.
+ for (DataNode dn : cluster.getDataNodes()) {
+ final FsVolumeImpl volume = (FsVolumeImpl) dn.getFSDataset().getVolumes().get(0);
+ GenericTestUtils.waitFor(new Supplier<Boolean>() {
+ @Override
+ public Boolean get() {
+ return (volume.getReservedForRbw() == 0);
+ }
+ }, 500, Integer.MAX_VALUE); // Wait until the test times out.
+ }
+ }
+
+ /**
* Stress test to ensure we are not leaking reserved space.
* @throws IOException
* @throws InterruptedException
@@ -235,7 +284,7 @@ public class TestRbwSpaceReservation {
@Test (timeout=600000)
public void stressTest() throws IOException, InterruptedException {
final int numWriters = 5;
- startCluster(SMALL_BLOCK_SIZE, SMALL_BLOCK_SIZE * numWriters * 10);
+ startCluster(SMALL_BLOCK_SIZE, 1, SMALL_BLOCK_SIZE * numWriters * 10);
Writer[] writers = new Writer[numWriters];
// Start a few writers and let them run for a while.
[2/3] hadoop git commit: HDFS-8072. Reserved RBW space is not
released if client terminates while writing block. (Arpit Agarwal)
Posted by ar...@apache.org.
HDFS-8072. Reserved RBW space is not released if client terminates while writing block. (Arpit Agarwal)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/f0324738
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/f0324738
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/f0324738
Branch: refs/heads/branch-2
Commit: f0324738c9db4f45d2b1ec5cfb46c5f2b7669571
Parents: 5f59e62
Author: Arpit Agarwal <ar...@apache.org>
Authored: Wed Apr 8 11:38:21 2015 -0700
Committer: Arpit Agarwal <ar...@apache.org>
Committed: Wed Apr 8 11:38:30 2015 -0700
----------------------------------------------------------------------
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 +
.../hdfs/server/datanode/BlockReceiver.java | 1 +
.../hdfs/server/datanode/ReplicaInPipeline.java | 6 ++
.../datanode/ReplicaInPipelineInterface.java | 5 ++
.../server/datanode/SimulatedFSDataset.java | 4 ++
.../extdataset/ExternalReplicaInPipeline.java | 4 ++
.../fsdataset/impl/TestRbwSpaceReservation.java | 67 +++++++++++++++++---
7 files changed, 81 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f0324738/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 8290bcc..e767f45 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -1102,6 +1102,9 @@ Release 2.7.0 - UNRELEASED
HDFS-8038. PBImageDelimitedTextWriter#getEntry output HDFS path in
platform-specific format. (Xiaoyu Yao via cnauroth)
+ HDFS-8072. Reserved RBW space is not released if client terminates while
+ writing block. (Arpit Agarwal)
+
BREAKDOWN OF HDFS-7584 SUBTASKS AND RELATED JIRAS
HDFS-7720. Quota by Storage Type API, tools and ClientNameNode
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f0324738/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
index 58cb8b1..c0be956 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
@@ -817,6 +817,7 @@ class BlockReceiver implements Closeable {
}
} catch (IOException ioe) {
+ replicaInfo.releaseAllBytesReserved();
if (datanode.isRestarting()) {
// Do not throw if shutting down for restart. Otherwise, it will cause
// premature termination of responder.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f0324738/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java
index 6a26640..cc55f85 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java
@@ -148,6 +148,12 @@ public class ReplicaInPipeline extends ReplicaInfo
return bytesReserved;
}
+ @Override
+ public void releaseAllBytesReserved() { // ReplicaInPipelineInterface
+ getVolume().releaseReservedSpace(bytesReserved);
+ bytesReserved = 0;
+ }
+
@Override // ReplicaInPipelineInterface
public synchronized void setLastChecksumAndDataLen(long dataLength, byte[] lastChecksum) {
this.bytesOnDisk = dataLength;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f0324738/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipelineInterface.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipelineInterface.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipelineInterface.java
index 7f08b81..0263d0f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipelineInterface.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipelineInterface.java
@@ -45,6 +45,11 @@ public interface ReplicaInPipelineInterface extends Replica {
void setBytesAcked(long bytesAcked);
/**
+ * Release any disk space reserved for this replica.
+ */
+ public void releaseAllBytesReserved();
+
+ /**
* store the checksum for the last chunk along with the data length
* @param dataLength number of bytes on disk
* @param lastChecksum - checksum bytes for the last chunk
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f0324738/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
index 160a86c..a358e22 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
@@ -290,6 +290,10 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
}
@Override
+ public void releaseAllBytesReserved() {
+ }
+
+ @Override
synchronized public long getBytesOnDisk() {
if (finalized) {
return theBlock.getNumBytes();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f0324738/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalReplicaInPipeline.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalReplicaInPipeline.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalReplicaInPipeline.java
index c3c0197..ad44500 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalReplicaInPipeline.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalReplicaInPipeline.java
@@ -41,6 +41,10 @@ public class ExternalReplicaInPipeline implements ReplicaInPipelineInterface {
}
@Override
+ public void releaseAllBytesReserved() {
+ }
+
+ @Override
public void setLastChecksumAndDataLen(long dataLength, byte[] lastChecksum) {
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f0324738/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestRbwSpaceReservation.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestRbwSpaceReservation.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestRbwSpaceReservation.java
index 487f3ab..ebf2f3b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestRbwSpaceReservation.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestRbwSpaceReservation.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
+import com.google.common.base.Supplier;
import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -45,6 +46,7 @@ import java.io.IOException;
import java.io.OutputStream;
import java.util.List;
import java.util.Random;
+import java.util.concurrent.TimeoutException;
/**
* Ensure that the DN reserves disk space equivalent to a full block for
@@ -53,7 +55,6 @@ import java.util.Random;
public class TestRbwSpaceReservation {
static final Log LOG = LogFactory.getLog(TestRbwSpaceReservation.class);
- private static final short REPL_FACTOR = 1;
private static final int DU_REFRESH_INTERVAL_MSEC = 500;
private static final int STORAGES_PER_DATANODE = 1;
private static final int BLOCK_SIZE = 1024 * 1024;
@@ -83,25 +84,38 @@ public class TestRbwSpaceReservation {
((Log4JLogger) DataNode.LOG).getLogger().setLevel(Level.ALL);
}
- private void startCluster(int blockSize, long perVolumeCapacity) throws IOException {
+ /**
+ *
+ * @param blockSize
+ * @param perVolumeCapacity limit the capacity of each volume to the given
+ * value. If negative, then don't limit.
+ * @throws IOException
+ */
+ private void startCluster(int blockSize, int numDatanodes, long perVolumeCapacity) throws IOException {
initConfig(blockSize);
cluster = new MiniDFSCluster
.Builder(conf)
.storagesPerDatanode(STORAGES_PER_DATANODE)
- .numDataNodes(REPL_FACTOR)
+ .numDataNodes(numDatanodes)
.build();
fs = cluster.getFileSystem();
client = fs.getClient();
cluster.waitActive();
if (perVolumeCapacity >= 0) {
+ for (DataNode dn : cluster.getDataNodes()) {
+ for (FsVolumeSpi volume : dn.getFSDataset().getVolumes()) {
+ ((FsVolumeImpl) volume).setCapacityForTesting(perVolumeCapacity);
+ }
+ }
+ }
+
+ if (numDatanodes == 1) {
List<? extends FsVolumeSpi> volumes =
cluster.getDataNodes().get(0).getFSDataset().getVolumes();
-
assertThat(volumes.size(), is(1));
singletonVolume = ((FsVolumeImpl) volumes.get(0));
- singletonVolume.setCapacityForTesting(perVolumeCapacity);
}
}
@@ -128,7 +142,7 @@ public class TestRbwSpaceReservation {
throws IOException, InterruptedException {
// Enough for 1 block + meta files + some delta.
final long configuredCapacity = fileBlockSize * 2 - 1;
- startCluster(BLOCK_SIZE, configuredCapacity);
+ startCluster(BLOCK_SIZE, 1, configuredCapacity);
FSDataOutputStream out = null;
Path path = new Path("/" + fileNamePrefix + ".dat");
@@ -195,7 +209,7 @@ public class TestRbwSpaceReservation {
@Test (timeout=300000)
public void testWithLimitedSpace() throws IOException {
// Cluster with just enough space for a full block + meta.
- startCluster(BLOCK_SIZE, 2 * BLOCK_SIZE - 1);
+ startCluster(BLOCK_SIZE, 1, 2 * BLOCK_SIZE - 1);
final String methodName = GenericTestUtils.getMethodName();
Path file1 = new Path("/" + methodName + ".01.dat");
Path file2 = new Path("/" + methodName + ".02.dat");
@@ -208,7 +222,6 @@ public class TestRbwSpaceReservation {
os2 = fs.create(file2);
// Write one byte to the first file.
- LOG.info("arpit: writing first file");
byte[] data = new byte[1];
os1.write(data);
os1.hsync();
@@ -228,6 +241,42 @@ public class TestRbwSpaceReservation {
}
/**
+ * Ensure that reserved space is released when the client goes away
+ * unexpectedly.
+ *
+ * The verification is done for each replica in the write pipeline.
+ *
+ * @throws IOException
+ */
+ @Test(timeout=300000)
+ public void testSpaceReleasedOnUnexpectedEof()
+ throws IOException, InterruptedException, TimeoutException {
+ final short replication = 3;
+ startCluster(BLOCK_SIZE, replication, -1);
+
+ final String methodName = GenericTestUtils.getMethodName();
+ final Path file = new Path("/" + methodName + ".01.dat");
+
+ // Write 1 byte to the file and kill the writer.
+ FSDataOutputStream os = fs.create(file, replication);
+ os.write(new byte[1]);
+ os.hsync();
+ DFSTestUtil.abortStream((DFSOutputStream) os.getWrappedStream());
+
+ // Ensure all space reserved for the replica was released on each
+ // DataNode.
+ for (DataNode dn : cluster.getDataNodes()) {
+ final FsVolumeImpl volume = (FsVolumeImpl) dn.getFSDataset().getVolumes().get(0);
+ GenericTestUtils.waitFor(new Supplier<Boolean>() {
+ @Override
+ public Boolean get() {
+ return (volume.getReservedForRbw() == 0);
+ }
+ }, 500, Integer.MAX_VALUE); // Wait until the test times out.
+ }
+ }
+
+ /**
* Stress test to ensure we are not leaking reserved space.
* @throws IOException
* @throws InterruptedException
@@ -235,7 +284,7 @@ public class TestRbwSpaceReservation {
@Test (timeout=600000)
public void stressTest() throws IOException, InterruptedException {
final int numWriters = 5;
- startCluster(SMALL_BLOCK_SIZE, SMALL_BLOCK_SIZE * numWriters * 10);
+ startCluster(SMALL_BLOCK_SIZE, 1, SMALL_BLOCK_SIZE * numWriters * 10);
Writer[] writers = new Writer[numWriters];
// Start a few writers and let them run for a while.
[3/3] hadoop git commit: HDFS-8072. Reserved RBW space is not
released if client terminates while writing block. (Arpit Agarwal)
Posted by ar...@apache.org.
HDFS-8072. Reserved RBW space is not released if client terminates while writing block. (Arpit Agarwal)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/12739b54
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/12739b54
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/12739b54
Branch: refs/heads/branch-2.7
Commit: 12739b541bf9cbe39b352c7651eef91557209b4e
Parents: ffa3f3a
Author: Arpit Agarwal <ar...@apache.org>
Authored: Wed Apr 8 11:38:21 2015 -0700
Committer: Arpit Agarwal <ar...@apache.org>
Committed: Wed Apr 8 11:38:42 2015 -0700
----------------------------------------------------------------------
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 +
.../hdfs/server/datanode/BlockReceiver.java | 1 +
.../hdfs/server/datanode/ReplicaInPipeline.java | 6 ++
.../datanode/ReplicaInPipelineInterface.java | 5 ++
.../server/datanode/SimulatedFSDataset.java | 4 ++
.../extdataset/ExternalReplicaInPipeline.java | 4 ++
.../fsdataset/impl/TestRbwSpaceReservation.java | 67 +++++++++++++++++---
7 files changed, 81 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/12739b54/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index c36ea82..3fe17cd 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -974,6 +974,9 @@ Release 2.7.0 - UNRELEASED
HDFS-8038. PBImageDelimitedTextWriter#getEntry output HDFS path in
platform-specific format. (Xiaoyu Yao via cnauroth)
+ HDFS-8072. Reserved RBW space is not released if client terminates while
+ writing block. (Arpit Agarwal)
+
BREAKDOWN OF HDFS-7584 SUBTASKS AND RELATED JIRAS
HDFS-7720. Quota by Storage Type API, tools and ClientNameNode
http://git-wip-us.apache.org/repos/asf/hadoop/blob/12739b54/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
index 58cb8b1..c0be956 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
@@ -817,6 +817,7 @@ class BlockReceiver implements Closeable {
}
} catch (IOException ioe) {
+ replicaInfo.releaseAllBytesReserved();
if (datanode.isRestarting()) {
// Do not throw if shutting down for restart. Otherwise, it will cause
// premature termination of responder.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/12739b54/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java
index 6a26640..cc55f85 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java
@@ -148,6 +148,12 @@ public class ReplicaInPipeline extends ReplicaInfo
return bytesReserved;
}
+ @Override
+ public void releaseAllBytesReserved() { // ReplicaInPipelineInterface
+ getVolume().releaseReservedSpace(bytesReserved);
+ bytesReserved = 0;
+ }
+
@Override // ReplicaInPipelineInterface
public synchronized void setLastChecksumAndDataLen(long dataLength, byte[] lastChecksum) {
this.bytesOnDisk = dataLength;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/12739b54/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipelineInterface.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipelineInterface.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipelineInterface.java
index 7f08b81..0263d0f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipelineInterface.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipelineInterface.java
@@ -45,6 +45,11 @@ public interface ReplicaInPipelineInterface extends Replica {
void setBytesAcked(long bytesAcked);
/**
+ * Release any disk space reserved for this replica.
+ */
+ public void releaseAllBytesReserved();
+
+ /**
* store the checksum for the last chunk along with the data length
* @param dataLength number of bytes on disk
* @param lastChecksum - checksum bytes for the last chunk
http://git-wip-us.apache.org/repos/asf/hadoop/blob/12739b54/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
index 5c7b4ac..bebf371 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
@@ -290,6 +290,10 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
}
@Override
+ public void releaseAllBytesReserved() {
+ }
+
+ @Override
synchronized public long getBytesOnDisk() {
if (finalized) {
return theBlock.getNumBytes();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/12739b54/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalReplicaInPipeline.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalReplicaInPipeline.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalReplicaInPipeline.java
index c3c0197..ad44500 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalReplicaInPipeline.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalReplicaInPipeline.java
@@ -41,6 +41,10 @@ public class ExternalReplicaInPipeline implements ReplicaInPipelineInterface {
}
@Override
+ public void releaseAllBytesReserved() {
+ }
+
+ @Override
public void setLastChecksumAndDataLen(long dataLength, byte[] lastChecksum) {
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/12739b54/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestRbwSpaceReservation.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestRbwSpaceReservation.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestRbwSpaceReservation.java
index 487f3ab..ebf2f3b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestRbwSpaceReservation.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestRbwSpaceReservation.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
+import com.google.common.base.Supplier;
import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -45,6 +46,7 @@ import java.io.IOException;
import java.io.OutputStream;
import java.util.List;
import java.util.Random;
+import java.util.concurrent.TimeoutException;
/**
* Ensure that the DN reserves disk space equivalent to a full block for
@@ -53,7 +55,6 @@ import java.util.Random;
public class TestRbwSpaceReservation {
static final Log LOG = LogFactory.getLog(TestRbwSpaceReservation.class);
- private static final short REPL_FACTOR = 1;
private static final int DU_REFRESH_INTERVAL_MSEC = 500;
private static final int STORAGES_PER_DATANODE = 1;
private static final int BLOCK_SIZE = 1024 * 1024;
@@ -83,25 +84,38 @@ public class TestRbwSpaceReservation {
((Log4JLogger) DataNode.LOG).getLogger().setLevel(Level.ALL);
}
- private void startCluster(int blockSize, long perVolumeCapacity) throws IOException {
+ /**
+ *
+ * @param blockSize
+ * @param perVolumeCapacity limit the capacity of each volume to the given
+ * value. If negative, then don't limit.
+ * @throws IOException
+ */
+ private void startCluster(int blockSize, int numDatanodes, long perVolumeCapacity) throws IOException {
initConfig(blockSize);
cluster = new MiniDFSCluster
.Builder(conf)
.storagesPerDatanode(STORAGES_PER_DATANODE)
- .numDataNodes(REPL_FACTOR)
+ .numDataNodes(numDatanodes)
.build();
fs = cluster.getFileSystem();
client = fs.getClient();
cluster.waitActive();
if (perVolumeCapacity >= 0) {
+ for (DataNode dn : cluster.getDataNodes()) {
+ for (FsVolumeSpi volume : dn.getFSDataset().getVolumes()) {
+ ((FsVolumeImpl) volume).setCapacityForTesting(perVolumeCapacity);
+ }
+ }
+ }
+
+ if (numDatanodes == 1) {
List<? extends FsVolumeSpi> volumes =
cluster.getDataNodes().get(0).getFSDataset().getVolumes();
-
assertThat(volumes.size(), is(1));
singletonVolume = ((FsVolumeImpl) volumes.get(0));
- singletonVolume.setCapacityForTesting(perVolumeCapacity);
}
}
@@ -128,7 +142,7 @@ public class TestRbwSpaceReservation {
throws IOException, InterruptedException {
// Enough for 1 block + meta files + some delta.
final long configuredCapacity = fileBlockSize * 2 - 1;
- startCluster(BLOCK_SIZE, configuredCapacity);
+ startCluster(BLOCK_SIZE, 1, configuredCapacity);
FSDataOutputStream out = null;
Path path = new Path("/" + fileNamePrefix + ".dat");
@@ -195,7 +209,7 @@ public class TestRbwSpaceReservation {
@Test (timeout=300000)
public void testWithLimitedSpace() throws IOException {
// Cluster with just enough space for a full block + meta.
- startCluster(BLOCK_SIZE, 2 * BLOCK_SIZE - 1);
+ startCluster(BLOCK_SIZE, 1, 2 * BLOCK_SIZE - 1);
final String methodName = GenericTestUtils.getMethodName();
Path file1 = new Path("/" + methodName + ".01.dat");
Path file2 = new Path("/" + methodName + ".02.dat");
@@ -208,7 +222,6 @@ public class TestRbwSpaceReservation {
os2 = fs.create(file2);
// Write one byte to the first file.
- LOG.info("arpit: writing first file");
byte[] data = new byte[1];
os1.write(data);
os1.hsync();
@@ -228,6 +241,42 @@ public class TestRbwSpaceReservation {
}
/**
+ * Ensure that reserved space is released when the client goes away
+ * unexpectedly.
+ *
+ * The verification is done for each replica in the write pipeline.
+ *
+ * @throws IOException
+ */
+ @Test(timeout=300000)
+ public void testSpaceReleasedOnUnexpectedEof()
+ throws IOException, InterruptedException, TimeoutException {
+ final short replication = 3;
+ startCluster(BLOCK_SIZE, replication, -1);
+
+ final String methodName = GenericTestUtils.getMethodName();
+ final Path file = new Path("/" + methodName + ".01.dat");
+
+ // Write 1 byte to the file and kill the writer.
+ FSDataOutputStream os = fs.create(file, replication);
+ os.write(new byte[1]);
+ os.hsync();
+ DFSTestUtil.abortStream((DFSOutputStream) os.getWrappedStream());
+
+ // Ensure all space reserved for the replica was released on each
+ // DataNode.
+ for (DataNode dn : cluster.getDataNodes()) {
+ final FsVolumeImpl volume = (FsVolumeImpl) dn.getFSDataset().getVolumes().get(0);
+ GenericTestUtils.waitFor(new Supplier<Boolean>() {
+ @Override
+ public Boolean get() {
+ return (volume.getReservedForRbw() == 0);
+ }
+ }, 500, Integer.MAX_VALUE); // Wait until the test times out.
+ }
+ }
+
+ /**
* Stress test to ensure we are not leaking reserved space.
* @throws IOException
* @throws InterruptedException
@@ -235,7 +284,7 @@ public class TestRbwSpaceReservation {
@Test (timeout=600000)
public void stressTest() throws IOException, InterruptedException {
final int numWriters = 5;
- startCluster(SMALL_BLOCK_SIZE, SMALL_BLOCK_SIZE * numWriters * 10);
+ startCluster(SMALL_BLOCK_SIZE, 1, SMALL_BLOCK_SIZE * numWriters * 10);
Writer[] writers = new Writer[numWriters];
// Start a few writers and let them run for a while.