You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cm...@apache.org on 2015/12/22 18:37:19 UTC

hadoop git commit: HDFS-9589. Block files which have been hardlinked should be duplicated before the DataNode appends to the them (cmccabe)

Repository: hadoop
Updated Branches:
  refs/heads/trunk 70d6f2012 -> bb540ba85


HDFS-9589. Block files which have been hardlinked should be duplicated before the DataNode appends to the them (cmccabe)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/bb540ba8
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/bb540ba8
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/bb540ba8

Branch: refs/heads/trunk
Commit: bb540ba85aa37d9fe31e640665158afe8a936230
Parents: 70d6f20
Author: Colin Patrick Mccabe <cm...@cloudera.com>
Authored: Mon Dec 21 14:13:36 2015 -0800
Committer: Colin Patrick Mccabe <cm...@cloudera.com>
Committed: Tue Dec 22 09:36:50 2015 -0800

----------------------------------------------------------------------
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt     |  3 +
 .../hdfs/server/datanode/ReplicaInfo.java       | 79 ++++++++++++++++++++
 .../datanode/fsdataset/impl/FsDatasetImpl.java  |  5 ++
 .../org/apache/hadoop/hdfs/TestFileAppend.java  | 66 ++++++++++++++++
 .../fsdataset/impl/FsDatasetTestUtil.java       |  6 ++
 5 files changed, 159 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb540ba8/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 33ce57f..2b980ec 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -2541,6 +2541,9 @@ Release 2.8.0 - UNRELEASED
     HDFS-9347. Invariant assumption in TestQuorumJournalManager.shutdown()
     is wrong. (Wei-Chiu Chuang via zhz)
 
+    HDFS-9589. Block files which have been hardlinked should be duplicated
+    before the DataNode appends to the them (cmccabe)
+
 Release 2.7.3 - UNRELEASED
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb540ba8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java
index d19e656..3ef6390 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java
@@ -18,12 +18,18 @@
 package org.apache.hadoop.hdfs.server.datanode;
 
 import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
 
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.HardLink;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
+import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.util.LightWeightResizableGSet;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -210,6 +216,79 @@ abstract public class ReplicaInfo extends Block
     return 0;
   }
 
+  /**
+   * Copy specified file into a temporary file. Then rename the
+   * temporary file to the original name. This will cause any
+   * hardlinks to the original file to be removed. The temporary
+   * files are created in the same directory. The temporary files will
+   * be recovered (especially on Windows) on datanode restart.
+   */
+  private void breakHardlinks(File file, Block b) throws IOException {
+    File tmpFile = DatanodeUtil.createTmpFile(b, DatanodeUtil.getUnlinkTmpFile(file));
+    try {
+      FileInputStream in = new FileInputStream(file);
+      try {
+        FileOutputStream out = new FileOutputStream(tmpFile);
+        try {
+          IOUtils.copyBytes(in, out, 16 * 1024);
+        } finally {
+          out.close();
+        }
+      } finally {
+        in.close();
+      }
+      if (file.length() != tmpFile.length()) {
+        throw new IOException("Copy of file " + file + " size " + file.length()+
+                              " into file " + tmpFile +
+                              " resulted in a size of " + tmpFile.length());
+      }
+      FileUtil.replaceFile(tmpFile, file);
+    } catch (IOException e) {
+      boolean done = tmpFile.delete();
+      if (!done) {
+        DataNode.LOG.info("detachFile failed to delete temporary file " +
+                          tmpFile);
+      }
+      throw e;
+    }
+  }
+
+  /**
+   * This function "breaks hardlinks" to the current replica file.
+   *
+   * When doing a DataNode upgrade, we create a bunch of hardlinks to each block
+   * file.  This cleverly ensures that both the old and the new storage
+   * directories can contain the same block file, without using additional space
+   * for the data.
+   *
+   * However, when we want to append to the replica file, we need to "break" the
+   * hardlink to ensure that the old snapshot continues to contain the old data
+   * length.  If we failed to do that, we could roll back to the previous/
+   * directory during a downgrade, and find that the block contents were longer
+   * than they were at the time of upgrade.
+   *
+   * @return true only if data was copied.
+   * @throws IOException
+   */
+  public boolean breakHardLinksIfNeeded() throws IOException {
+    File file = getBlockFile();
+    if (file == null || getVolume() == null) {
+      throw new IOException("detachBlock:Block not found. " + this);
+    }
+    File meta = getMetaFile();
+
+    int linkCount = HardLink.getLinkCount(file);
+    if (linkCount > 1) {
+      DataNode.LOG.info("Breaking hardlink for " + linkCount + "x-linked " +
+          "block " + this);
+      breakHardlinks(file, this);
+    }
+    if (HardLink.getLinkCount(meta) > 1) {
+      breakHardlinks(meta, this);
+    }
+    return true;
+  }
+
   @Override  //Object
   public String toString() {
     return getClass().getSimpleName()

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb540ba8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
index 76e65cc..5d987fe 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
@@ -1128,6 +1128,10 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
       throws IOException {
     // If the block is cached, start uncaching it.
     cacheManager.uncacheBlock(bpid, replicaInfo.getBlockId());
+
+    // If there are any hardlinks to the block, break them.  This ensures we are
+    // not appending to a file that is part of a previous/ directory.
+    replicaInfo.breakHardLinksIfNeeded();
     
     // construct a RBW replica with the new GS
     File blkfile = replicaInfo.getBlockFile();
@@ -2493,6 +2497,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
           + ", rur=" + rur);
     }
     if (rur.getNumBytes() > newlength) {
+      rur.breakHardLinksIfNeeded();
       truncateBlock(blockFile, metaFile, rur.getNumBytes(), newlength);
       if(!copyOnTruncate) {
         // update RUR with the new length

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb540ba8/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend.java
index 7b7f415..11d1b18 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend.java
@@ -43,6 +43,8 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
 import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetTestUtil;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.ipc.RemoteException;
 import org.junit.Assert;
@@ -109,6 +111,70 @@ public class TestFileAppend{
         expected, "Read 1", false);
   }
 
+  @Test
+  public void testBreakHardlinksIfNeeded() throws IOException {
+    Configuration conf = new HdfsConfiguration();
+    if (simulatedStorage) {
+      SimulatedFSDataset.setFactory(conf);
+    }
+    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
+    FileSystem fs = cluster.getFileSystem();
+    InetSocketAddress addr = new InetSocketAddress("localhost",
+                                                   cluster.getNameNodePort());
+    DFSClient client = new DFSClient(addr, conf);
+    try {
+      // create a new file, write to it and close it.
+      Path file1 = new Path("/filestatus.dat");
+      FSDataOutputStream stm = AppendTestUtil.createFile(fs, file1, 1);
+      writeFile(stm);
+      stm.close();
+
+      // Get a handle to the datanode
+      DataNode[] dn = cluster.listDataNodes();
+      assertTrue("There should be only one datanode but found " + dn.length,
+                  dn.length == 1);
+
+      LocatedBlocks locations = client.getNamenode().getBlockLocations(
+                                  file1.toString(), 0, Long.MAX_VALUE);
+      List<LocatedBlock> blocks = locations.getLocatedBlocks();
+      final FsDatasetSpi<?> fsd = dn[0].getFSDataset();
+
+      //
+      // Create hard links for a few of the blocks
+      //
+      for (int i = 0; i < blocks.size(); i = i + 2) {
+        ExtendedBlock b = blocks.get(i).getBlock();
+        final File f = FsDatasetTestUtil.getBlockFile(
+            fsd, b.getBlockPoolId(), b.getLocalBlock());
+        File link = new File(f.toString() + ".link");
+        System.out.println("Creating hardlink for File " + f + " to " + link);
+        HardLink.createHardLink(f, link);
+      }
+
+      // Detach all blocks. This should remove hardlinks (if any)
+      for (int i = 0; i < blocks.size(); i++) {
+        ExtendedBlock b = blocks.get(i).getBlock();
+        System.out.println("breakHardlinksIfNeeded detaching block " + b);
+        assertTrue("breakHardlinksIfNeeded(" + b + ") should have returned true",
+            FsDatasetTestUtil.breakHardlinksIfNeeded(fsd, b));
+      }
+
+      // Since the blocks were already detached earlier, these calls should
+      // return false
+      for (int i = 0; i < blocks.size(); i++) {
+        ExtendedBlock b = blocks.get(i).getBlock();
+        System.out.println("breakHardlinksIfNeeded re-attempting to " +
+                "detach block " + b);
+        assertTrue("breakHardlinksIfNeeded(" + b + ") should have returned false",
+            FsDatasetTestUtil.breakHardlinksIfNeeded(fsd, b));
+      }
+    } finally {
+      client.close();
+      fs.close();
+      cluster.shutdown();
+    }
+  }
+
   /**
    * Test a simple flush on a simple HDFS file.
    * @throws IOException an exception might be thrown

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb540ba8/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetTestUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetTestUtil.java
index f4480a1..665befa 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetTestUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetTestUtil.java
@@ -55,6 +55,12 @@ public class FsDatasetTestUtil {
         .getGenerationStamp());
   }
 
+  public static boolean breakHardlinksIfNeeded(FsDatasetSpi<?> fsd,
+      ExtendedBlock block) throws IOException {
+    final ReplicaInfo info = ((FsDatasetImpl)fsd).getReplicaInfo(block);
+    return info.breakHardLinksIfNeeded();
+  }
+
   public static ReplicaInfo fetchReplicaInfo (final FsDatasetSpi<?> fsd,
       final String bpid, final long blockId) {
     return ((FsDatasetImpl)fsd).fetchReplicaInfo(bpid, blockId);