You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cm...@apache.org on 2015/12/22 18:37:19 UTC
hadoop git commit: HDFS-9589. Block files which have been hardlinked
should be duplicated before the DataNode appends to the them (cmccabe)
Repository: hadoop
Updated Branches:
refs/heads/trunk 70d6f2012 -> bb540ba85
HDFS-9589. Block files which have been hardlinked should be duplicated before the DataNode appends to the them (cmccabe)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/bb540ba8
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/bb540ba8
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/bb540ba8
Branch: refs/heads/trunk
Commit: bb540ba85aa37d9fe31e640665158afe8a936230
Parents: 70d6f20
Author: Colin Patrick Mccabe <cm...@cloudera.com>
Authored: Mon Dec 21 14:13:36 2015 -0800
Committer: Colin Patrick Mccabe <cm...@cloudera.com>
Committed: Tue Dec 22 09:36:50 2015 -0800
----------------------------------------------------------------------
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 +
.../hdfs/server/datanode/ReplicaInfo.java | 79 ++++++++++++++++++++
.../datanode/fsdataset/impl/FsDatasetImpl.java | 5 ++
.../org/apache/hadoop/hdfs/TestFileAppend.java | 66 ++++++++++++++++
.../fsdataset/impl/FsDatasetTestUtil.java | 6 ++
5 files changed, 159 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb540ba8/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 33ce57f..2b980ec 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -2541,6 +2541,9 @@ Release 2.8.0 - UNRELEASED
HDFS-9347. Invariant assumption in TestQuorumJournalManager.shutdown()
is wrong. (Wei-Chiu Chuang via zhz)
+ HDFS-9589. Block files which have been hardlinked should be duplicated
+ before the DataNode appends to the them (cmccabe)
+
Release 2.7.3 - UNRELEASED
INCOMPATIBLE CHANGES
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb540ba8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java
index d19e656..3ef6390 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java
@@ -18,12 +18,18 @@
package org.apache.hadoop.hdfs.server.datanode;
import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.HardLink;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
+import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.LightWeightResizableGSet;
import com.google.common.annotations.VisibleForTesting;
@@ -210,6 +216,79 @@ abstract public class ReplicaInfo extends Block
return 0;
}
+ /**
+ * Copy specified file into a temporary file. Then rename the
+ * temporary file to the original name. This will cause any
+ * hardlinks to the original file to be removed. The temporary
+ * files are created in the same directory. The temporary files will
+ * be recovered (especially on Windows) on datanode restart.
+ */
+ private void breakHardlinks(File file, Block b) throws IOException {
+ File tmpFile = DatanodeUtil.createTmpFile(b, DatanodeUtil.getUnlinkTmpFile(file));
+ try {
+ FileInputStream in = new FileInputStream(file);
+ try {
+ FileOutputStream out = new FileOutputStream(tmpFile);
+ try {
+ IOUtils.copyBytes(in, out, 16 * 1024);
+ } finally {
+ out.close();
+ }
+ } finally {
+ in.close();
+ }
+ if (file.length() != tmpFile.length()) {
+ throw new IOException("Copy of file " + file + " size " + file.length()+
+ " into file " + tmpFile +
+ " resulted in a size of " + tmpFile.length());
+ }
+ FileUtil.replaceFile(tmpFile, file);
+ } catch (IOException e) {
+ boolean done = tmpFile.delete();
+ if (!done) {
+ DataNode.LOG.info("detachFile failed to delete temporary file " +
+ tmpFile);
+ }
+ throw e;
+ }
+ }
+
+ /**
+ * This function "breaks hardlinks" to the current replica file.
+ *
+ * When doing a DataNode upgrade, we create a bunch of hardlinks to each block
+ * file. This cleverly ensures that both the old and the new storage
+ * directories can contain the same block file, without using additional space
+ * for the data.
+ *
+ * However, when we want to append to the replica file, we need to "break" the
+ * hardlink to ensure that the old snapshot continues to contain the old data
+ * length. If we failed to do that, we could roll back to the previous/
+ * directory during a downgrade, and find that the block contents were longer
+ * than they were at the time of upgrade.
+ *
+ * @return true only if data was copied.
+ * @throws IOException
+ */
+ public boolean breakHardLinksIfNeeded() throws IOException {
+ File file = getBlockFile();
+ if (file == null || getVolume() == null) {
+ throw new IOException("detachBlock:Block not found. " + this);
+ }
+ File meta = getMetaFile();
+
+ int linkCount = HardLink.getLinkCount(file);
+ if (linkCount > 1) {
+ DataNode.LOG.info("Breaking hardlink for " + linkCount + "x-linked " +
+ "block " + this);
+ breakHardlinks(file, this);
+ }
+ if (HardLink.getLinkCount(meta) > 1) {
+ breakHardlinks(meta, this);
+ }
+ return true;
+ }
+
@Override //Object
public String toString() {
return getClass().getSimpleName()
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb540ba8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
index 76e65cc..5d987fe 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
@@ -1128,6 +1128,10 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
throws IOException {
// If the block is cached, start uncaching it.
cacheManager.uncacheBlock(bpid, replicaInfo.getBlockId());
+
+ // If there are any hardlinks to the block, break them. This ensures we are
+ // not appending to a file that is part of a previous/ directory.
+ replicaInfo.breakHardLinksIfNeeded();
// construct a RBW replica with the new GS
File blkfile = replicaInfo.getBlockFile();
@@ -2493,6 +2497,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
+ ", rur=" + rur);
}
if (rur.getNumBytes() > newlength) {
+ rur.breakHardLinksIfNeeded();
truncateBlock(blockFile, metaFile, rur.getNumBytes(), newlength);
if(!copyOnTruncate) {
// update RUR with the new length
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb540ba8/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend.java
index 7b7f415..11d1b18 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend.java
@@ -43,6 +43,8 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetTestUtil;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ipc.RemoteException;
import org.junit.Assert;
@@ -109,6 +111,70 @@ public class TestFileAppend{
expected, "Read 1", false);
}
+ @Test
+ public void testBreakHardlinksIfNeeded() throws IOException {
+ Configuration conf = new HdfsConfiguration();
+ if (simulatedStorage) {
+ SimulatedFSDataset.setFactory(conf);
+ }
+ MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
+ FileSystem fs = cluster.getFileSystem();
+ InetSocketAddress addr = new InetSocketAddress("localhost",
+ cluster.getNameNodePort());
+ DFSClient client = new DFSClient(addr, conf);
+ try {
+ // create a new file, write to it and close it.
+ Path file1 = new Path("/filestatus.dat");
+ FSDataOutputStream stm = AppendTestUtil.createFile(fs, file1, 1);
+ writeFile(stm);
+ stm.close();
+
+ // Get a handle to the datanode
+ DataNode[] dn = cluster.listDataNodes();
+ assertTrue("There should be only one datanode but found " + dn.length,
+ dn.length == 1);
+
+ LocatedBlocks locations = client.getNamenode().getBlockLocations(
+ file1.toString(), 0, Long.MAX_VALUE);
+ List<LocatedBlock> blocks = locations.getLocatedBlocks();
+ final FsDatasetSpi<?> fsd = dn[0].getFSDataset();
+
+ //
+ // Create hard links for a few of the blocks
+ //
+ for (int i = 0; i < blocks.size(); i = i + 2) {
+ ExtendedBlock b = blocks.get(i).getBlock();
+ final File f = FsDatasetTestUtil.getBlockFile(
+ fsd, b.getBlockPoolId(), b.getLocalBlock());
+ File link = new File(f.toString() + ".link");
+ System.out.println("Creating hardlink for File " + f + " to " + link);
+ HardLink.createHardLink(f, link);
+ }
+
+ // Detach all blocks. This should remove hardlinks (if any)
+ for (int i = 0; i < blocks.size(); i++) {
+ ExtendedBlock b = blocks.get(i).getBlock();
+ System.out.println("breakHardlinksIfNeeded detaching block " + b);
+ assertTrue("breakHardlinksIfNeeded(" + b + ") should have returned true",
+ FsDatasetTestUtil.breakHardlinksIfNeeded(fsd, b));
+ }
+
+ // Since the blocks were already detached earlier, these calls should
+ // return false
+ for (int i = 0; i < blocks.size(); i++) {
+ ExtendedBlock b = blocks.get(i).getBlock();
+ System.out.println("breakHardlinksIfNeeded re-attempting to " +
+ "detach block " + b);
+ assertTrue("breakHardlinksIfNeeded(" + b + ") should have returned false",
+ FsDatasetTestUtil.breakHardlinksIfNeeded(fsd, b));
+ }
+ } finally {
+ client.close();
+ fs.close();
+ cluster.shutdown();
+ }
+ }
+
/**
* Test a simple flush on a simple HDFS file.
* @throws IOException an exception might be thrown
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb540ba8/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetTestUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetTestUtil.java
index f4480a1..665befa 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetTestUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetTestUtil.java
@@ -55,6 +55,12 @@ public class FsDatasetTestUtil {
.getGenerationStamp());
}
+ public static boolean breakHardlinksIfNeeded(FsDatasetSpi<?> fsd,
+ ExtendedBlock block) throws IOException {
+ final ReplicaInfo info = ((FsDatasetImpl)fsd).getReplicaInfo(block);
+ return info.breakHardLinksIfNeeded();
+ }
+
public static ReplicaInfo fetchReplicaInfo (final FsDatasetSpi<?> fsd,
final String bpid, final long blockId) {
return ((FsDatasetImpl)fsd).fetchReplicaInfo(bpid, blockId);