You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by to...@apache.org on 2022/10/27 11:49:46 UTC

[hadoop] branch branch-3.3 updated: HDFS-16716. Improve appendToFile command: support appending on file with new block (#4697)

This is an automated email from the ASF dual-hosted git repository.

tomscut pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
     new cbac2c4875f HDFS-16716. Improve appendToFile command: support appending on file with new block (#4697)
cbac2c4875f is described below

commit cbac2c4875f7359966582f6d6bb67eef880e55fa
Author: M1eyu2018 <44...@users.noreply.github.com>
AuthorDate: Thu Oct 27 19:03:15 2022 +0800

    HDFS-16716. Improve appendToFile command: support appending on file with new block (#4697)
    
    Reviewed-by: xuzq <15...@163.com>
    Signed-off-by: Tao Li <to...@apache.org>
---
 .../main/java/org/apache/hadoop/fs/FileSystem.java | 33 ++++++++
 .../org/apache/hadoop/fs/shell/CopyCommands.java   | 19 ++++-
 .../org/apache/hadoop/fs/TestFilterFileSystem.java |  5 ++
 .../org/apache/hadoop/fs/TestHarFileSystem.java    |  5 ++
 .../apache/hadoop/hdfs/DistributedFileSystem.java  | 10 +++
 .../java/org/apache/hadoop/hdfs/TestDFSShell.java  | 91 ++++++++++++++++++++++
 6 files changed, 160 insertions(+), 3 deletions(-)

diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java
index 57eb8f11cd5..800e9da6cac 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java
@@ -1545,6 +1545,39 @@ public abstract class FileSystem extends Configured
   public abstract FSDataOutputStream append(Path f, int bufferSize,
       Progressable progress) throws IOException;
 
+  /**
+   * Append to an existing file (optional operation).
+   * @param f the existing file to be appended.
+   * @param appendToNewBlock whether to append data to a new block
+   * instead of the end of the last partial block
+   * @throws IOException IO failure
+   * @throws UnsupportedOperationException if the operation is unsupported
+   *         (default).
+   * @return output stream.
+   */
+  public FSDataOutputStream append(Path f, boolean appendToNewBlock) throws IOException {
+    return append(f, getConf().getInt(IO_FILE_BUFFER_SIZE_KEY,
+        IO_FILE_BUFFER_SIZE_DEFAULT), null, appendToNewBlock);
+  }
+
+  /**
+   * Append to an existing file (optional operation).
+   * This function is used for being overridden by some FileSystem like DistributedFileSystem
+   * @param f the existing file to be appended.
+   * @param bufferSize the size of the buffer to be used.
+   * @param progress for reporting progress if it is not null.
+   * @param appendToNewBlock whether to append data to a new block
+   * instead of the end of the last partial block
+   * @throws IOException IO failure
+   * @throws UnsupportedOperationException if the operation is unsupported
+   *         (default).
+   * @return output stream.
+   */
+  public FSDataOutputStream append(Path f, int bufferSize,
+      Progressable progress, boolean appendToNewBlock) throws IOException {
+    return append(f, bufferSize, progress);
+  }
+
   /**
    * Concat existing files together.
    * @param trg the path to the target destination.
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/CopyCommands.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/CopyCommands.java
index 0643a2e983d..1ac204f5f8a 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/CopyCommands.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/CopyCommands.java
@@ -333,15 +333,24 @@ class CopyCommands {
    */
   public static class AppendToFile extends CommandWithDestination {
     public static final String NAME = "appendToFile";
-    public static final String USAGE = "<localsrc> ... <dst>";
+    public static final String USAGE = "[-n] <localsrc> ... <dst>";
     public static final String DESCRIPTION =
         "Appends the contents of all the given local files to the " +
             "given dst file. The dst file will be created if it does " +
             "not exist. If <localSrc> is -, then the input is read " +
-            "from stdin.";
+            "from stdin. Option -n represents that use NEW_BLOCK create flag to append file.";
 
     private static final int DEFAULT_IO_LENGTH = 1024 * 1024;
     boolean readStdin = false;
+    private boolean appendToNewBlock = false;
+
+    public boolean isAppendToNewBlock() {
+      return appendToNewBlock;
+    }
+
+    public void setAppendToNewBlock(boolean appendToNewBlock) {
+      this.appendToNewBlock = appendToNewBlock;
+    }
 
     // commands operating on local paths have no need for glob expansion
     @Override
@@ -372,6 +381,9 @@ class CopyCommands {
         throw new IOException("missing destination argument");
       }
 
+      CommandFormat cf = new CommandFormat(2, Integer.MAX_VALUE, "n");
+      cf.parse(args);
+      appendToNewBlock = cf.getOpt("n");
       getRemoteDestination(args);
       super.processOptions(args);
     }
@@ -385,7 +397,8 @@ class CopyCommands {
       }
 
       InputStream is = null;
-      try (FSDataOutputStream fos = dst.fs.append(dst.path)) {
+      try (FSDataOutputStream fos = appendToNewBlock ?
+          dst.fs.append(dst.path, true) : dst.fs.append(dst.path)) {
         if (readStdin) {
           if (args.size() == 0) {
             IOUtils.copyBytes(System.in, fos, DEFAULT_IO_LENGTH);
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestFilterFileSystem.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestFilterFileSystem.java
index 6cd450610b3..c6d2ff056a7 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestFilterFileSystem.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestFilterFileSystem.java
@@ -143,6 +143,11 @@ public class TestFilterFileSystem {
     of the filter such as checksums.
      */
     MultipartUploaderBuilder createMultipartUploader(Path basePath);
+
+    FSDataOutputStream append(Path f, boolean appendToNewBlock) throws IOException;
+
+    FSDataOutputStream append(Path f, int bufferSize,
+        Progressable progress, boolean appendToNewBlock) throws IOException;
   }
 
   @Test
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestHarFileSystem.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestHarFileSystem.java
index 711ab94fdf1..b227e169088 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestHarFileSystem.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestHarFileSystem.java
@@ -250,6 +250,11 @@ public class TestHarFileSystem {
 
     MultipartUploaderBuilder createMultipartUploader(Path basePath)
         throws IOException;
+
+    FSDataOutputStream append(Path f, boolean appendToNewBlock) throws IOException;
+
+    FSDataOutputStream append(Path f, int bufferSize,
+        Progressable progress, boolean appendToNewBlock) throws IOException;
   }
 
   @Test
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
index f74db63d4f0..2b648ecf67f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
@@ -423,6 +423,16 @@ public class DistributedFileSystem extends FileSystem
     return append(f, EnumSet.of(CreateFlag.APPEND), bufferSize, progress);
   }
 
+  @Override
+  public FSDataOutputStream append(Path f, final int bufferSize,
+      final Progressable progress, boolean appendToNewBlock) throws IOException {
+    EnumSet<CreateFlag> flag = EnumSet.of(CreateFlag.APPEND);
+    if (appendToNewBlock) {
+      flag.add(CreateFlag.NEW_BLOCK);
+    }
+    return append(f, flag, bufferSize, progress);
+  }
+
   /**
    * Append to an existing file (optional operation).
    *
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSShell.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSShell.java
index 3f4af541d7e..239c2bc5ac1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSShell.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSShell.java
@@ -38,6 +38,7 @@ import java.util.function.Supplier;
 import org.apache.hadoop.thirdparty.com.google.common.collect.Lists;
 
 import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
 import org.apache.hadoop.hdfs.protocol.XAttrNotFoundException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -3040,6 +3041,96 @@ public class TestDFSShell {
     assertThat(res, not(0));
   }
 
+  @Test (timeout = 300000)
+  public void testAppendToFileWithOptionN() throws Exception {
+    final int inputFileLength = 1024 * 1024;
+    File testRoot = new File(TEST_ROOT_DIR, "testAppendToFileWithOptionN");
+    testRoot.mkdirs();
+
+    File file1 = new File(testRoot, "file1");
+    createLocalFileWithRandomData(inputFileLength, file1);
+
+    Configuration conf = new HdfsConfiguration();
+    try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(6).build()) {
+      cluster.waitActive();
+      FileSystem hdfs = cluster.getFileSystem();
+      assertTrue("Not a HDFS: " + hdfs.getUri(),
+          hdfs instanceof DistributedFileSystem);
+
+      // Run appendToFile with option n by replica policy once, make sure that the target file is
+      // created and is of the right size and block number is correct.
+      String dir = "/replica";
+      boolean mkdirs = hdfs.mkdirs(new Path(dir));
+      assertTrue("Mkdir fail", mkdirs);
+      Path remoteFile = new Path(dir + "/remoteFile");
+      FsShell shell = new FsShell();
+      shell.setConf(conf);
+      String[] argv = new String[] {
+          "-appendToFile", "-n", file1.toString(), remoteFile.toString() };
+      int res = ToolRunner.run(shell, argv);
+      assertEquals("Run appendToFile command fail", 0, res);
+      FileStatus fileStatus = hdfs.getFileStatus(remoteFile);
+      assertEquals("File size should be " + inputFileLength,
+          inputFileLength, fileStatus.getLen());
+      BlockLocation[] fileBlockLocations =
+          hdfs.getFileBlockLocations(fileStatus, 0, fileStatus.getLen());
+      assertEquals("Block Num should be 1", 1, fileBlockLocations.length);
+
+      // Run appendToFile with option n by replica policy again and
+      // make sure that the target file size has been doubled and block number has been doubled.
+      res = ToolRunner.run(shell, argv);
+      assertEquals("Run appendToFile command fail", 0, res);
+      fileStatus = hdfs.getFileStatus(remoteFile);
+      assertEquals("File size should be " + inputFileLength * 2,
+          inputFileLength * 2, fileStatus.getLen());
+      fileBlockLocations = hdfs.getFileBlockLocations(fileStatus, 0, fileStatus.getLen());
+      assertEquals("Block Num should be 2", 2, fileBlockLocations.length);
+
+      // Before run appendToFile with option n by ec policy, set ec policy for the dir.
+      dir = "/ecPolicy";
+      final String ecPolicyName = "RS-6-3-1024k";
+      mkdirs = hdfs.mkdirs(new Path(dir));
+      assertTrue("Mkdir fail", mkdirs);
+      ((DistributedFileSystem) hdfs).setErasureCodingPolicy(new Path(dir), ecPolicyName);
+      ErasureCodingPolicy erasureCodingPolicy =
+          ((DistributedFileSystem) hdfs).getErasureCodingPolicy(new Path(dir));
+      assertEquals("Set ec policy fail", ecPolicyName, erasureCodingPolicy.getName());
+
+      // Run appendToFile with option n by ec policy once, make sure that the target file is
+      // created and is of the right size and block group number is correct.
+      remoteFile = new Path(dir + "/remoteFile");
+      argv = new String[] {
+          "-appendToFile", "-n", file1.toString(), remoteFile.toString() };
+      res = ToolRunner.run(shell, argv);
+      assertEquals("Run appendToFile command fail", 0, res);
+      fileStatus = hdfs.getFileStatus(remoteFile);
+      assertEquals("File size should be " + inputFileLength,
+          inputFileLength, fileStatus.getLen());
+      fileBlockLocations = hdfs.getFileBlockLocations(fileStatus, 0, fileStatus.getLen());
+      assertEquals("Block Group Num should be 1", 1, fileBlockLocations.length);
+
+      // Run appendToFile without option n by ec policy again and make sure that
+      // append on EC file without new block must fail.
+      argv = new String[] {
+          "-appendToFile", file1.toString(), remoteFile.toString() };
+      res = ToolRunner.run(shell, argv);
+      assertTrue("Run appendToFile command must fail", res != 0);
+
+      // Run appendToFile with option n by ec policy again and
+      // make sure that the target file size has been doubled
+      // and block group number has been doubled.
+      argv = new String[] {
+          "-appendToFile", "-n", file1.toString(), remoteFile.toString() };
+      res = ToolRunner.run(shell, argv);
+      assertEquals("Run appendToFile command fail", 0, res);
+      fileStatus = hdfs.getFileStatus(remoteFile);
+      assertEquals("File size should be " + inputFileLength * 2,
+          inputFileLength * 2, fileStatus.getLen());
+      fileBlockLocations = hdfs.getFileBlockLocations(fileStatus, 0, fileStatus.getLen());
+      assertEquals("Block Group Num should be 2", 2, fileBlockLocations.length);
+    }
+  }
+
   @Test (timeout = 30000)
   public void testSetXAttrPermission() throws Exception {
     UserGroupInformation user = UserGroupInformation.


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org