You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by to...@apache.org on 2022/10/27 11:03:26 UTC
[hadoop] branch trunk updated: HDFS-16716. Improve appendToFile command: support appending on file with new block (#4697)
This is an automated email from the ASF dual-hosted git repository.
tomscut pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new 8396caa4840 HDFS-16716. Improve appendToFile command: support appending on file with new block (#4697)
8396caa4840 is described below
commit 8396caa4840338f54115e92f03082e6044840b73
Author: M1eyu2018 <44...@users.noreply.github.com>
AuthorDate: Thu Oct 27 19:03:15 2022 +0800
HDFS-16716. Improve appendToFile command: support appending on file with new block (#4697)
Reviewed-by: xuzq <15...@163.com>
Signed-off-by: Tao Li <to...@apache.org>
---
.../main/java/org/apache/hadoop/fs/FileSystem.java | 33 ++++++++
.../org/apache/hadoop/fs/shell/CopyCommands.java | 19 ++++-
.../org/apache/hadoop/fs/TestFilterFileSystem.java | 5 ++
.../org/apache/hadoop/fs/TestHarFileSystem.java | 5 ++
.../apache/hadoop/hdfs/DistributedFileSystem.java | 10 +++
.../java/org/apache/hadoop/hdfs/TestDFSShell.java | 91 ++++++++++++++++++++++
6 files changed, 160 insertions(+), 3 deletions(-)
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java
index 7582488e7f9..df853078461 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java
@@ -1543,6 +1543,39 @@ public abstract class FileSystem extends Configured
public abstract FSDataOutputStream append(Path f, int bufferSize,
Progressable progress) throws IOException;
+ /**
+ * Append to an existing file (optional operation).
+ * @param f the existing file to be appended.
+ * @param appendToNewBlock whether to append data to a new block
+ * instead of the end of the last partial block
+ * @throws IOException IO failure
+ * @throws UnsupportedOperationException if the operation is unsupported
+ * (default).
+ * @return output stream.
+ */
+ public FSDataOutputStream append(Path f, boolean appendToNewBlock) throws IOException {
+ return append(f, getConf().getInt(IO_FILE_BUFFER_SIZE_KEY,
+ IO_FILE_BUFFER_SIZE_DEFAULT), null, appendToNewBlock);
+ }
+
+ /**
+ * Append to an existing file (optional operation).
+ * This function is used for being overridden by some FileSystem like DistributedFileSystem
+ * @param f the existing file to be appended.
+ * @param bufferSize the size of the buffer to be used.
+ * @param progress for reporting progress if it is not null.
+ * @param appendToNewBlock whether to append data to a new block
+ * instead of the end of the last partial block
+ * @throws IOException IO failure
+ * @throws UnsupportedOperationException if the operation is unsupported
+ * (default).
+ * @return output stream.
+ */
+ public FSDataOutputStream append(Path f, int bufferSize,
+ Progressable progress, boolean appendToNewBlock) throws IOException {
+ return append(f, bufferSize, progress);
+ }
+
/**
* Concat existing files together.
* @param trg the path to the target destination.
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/CopyCommands.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/CopyCommands.java
index 0643a2e983d..1ac204f5f8a 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/CopyCommands.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/CopyCommands.java
@@ -333,15 +333,24 @@ class CopyCommands {
*/
public static class AppendToFile extends CommandWithDestination {
public static final String NAME = "appendToFile";
- public static final String USAGE = "<localsrc> ... <dst>";
+ public static final String USAGE = "[-n] <localsrc> ... <dst>";
public static final String DESCRIPTION =
"Appends the contents of all the given local files to the " +
"given dst file. The dst file will be created if it does " +
"not exist. If <localSrc> is -, then the input is read " +
- "from stdin.";
+ "from stdin. Option -n represents that use NEW_BLOCK create flag to append file.";
private static final int DEFAULT_IO_LENGTH = 1024 * 1024;
boolean readStdin = false;
+ private boolean appendToNewBlock = false;
+
+ public boolean isAppendToNewBlock() {
+ return appendToNewBlock;
+ }
+
+ public void setAppendToNewBlock(boolean appendToNewBlock) {
+ this.appendToNewBlock = appendToNewBlock;
+ }
// commands operating on local paths have no need for glob expansion
@Override
@@ -372,6 +381,9 @@ class CopyCommands {
throw new IOException("missing destination argument");
}
+ CommandFormat cf = new CommandFormat(2, Integer.MAX_VALUE, "n");
+ cf.parse(args);
+ appendToNewBlock = cf.getOpt("n");
getRemoteDestination(args);
super.processOptions(args);
}
@@ -385,7 +397,8 @@ class CopyCommands {
}
InputStream is = null;
- try (FSDataOutputStream fos = dst.fs.append(dst.path)) {
+ try (FSDataOutputStream fos = appendToNewBlock ?
+ dst.fs.append(dst.path, true) : dst.fs.append(dst.path)) {
if (readStdin) {
if (args.size() == 0) {
IOUtils.copyBytes(System.in, fos, DEFAULT_IO_LENGTH);
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestFilterFileSystem.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestFilterFileSystem.java
index 5ed4d9bc9a7..3d8ea0e826c 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestFilterFileSystem.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestFilterFileSystem.java
@@ -143,6 +143,11 @@ public class TestFilterFileSystem {
of the filter such as checksums.
*/
MultipartUploaderBuilder createMultipartUploader(Path basePath);
+
+ FSDataOutputStream append(Path f, boolean appendToNewBlock) throws IOException;
+
+ FSDataOutputStream append(Path f, int bufferSize,
+ Progressable progress, boolean appendToNewBlock) throws IOException;
}
@Test
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestHarFileSystem.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestHarFileSystem.java
index 711ab94fdf1..b227e169088 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestHarFileSystem.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestHarFileSystem.java
@@ -250,6 +250,11 @@ public class TestHarFileSystem {
MultipartUploaderBuilder createMultipartUploader(Path basePath)
throws IOException;
+
+ FSDataOutputStream append(Path f, boolean appendToNewBlock) throws IOException;
+
+ FSDataOutputStream append(Path f, int bufferSize,
+ Progressable progress, boolean appendToNewBlock) throws IOException;
}
@Test
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
index ff2c5f37c69..9050a4bddee 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
@@ -422,6 +422,16 @@ public class DistributedFileSystem extends FileSystem
return append(f, EnumSet.of(CreateFlag.APPEND), bufferSize, progress);
}
+ @Override
+ public FSDataOutputStream append(Path f, final int bufferSize,
+ final Progressable progress, boolean appendToNewBlock) throws IOException {
+ EnumSet<CreateFlag> flag = EnumSet.of(CreateFlag.APPEND);
+ if (appendToNewBlock) {
+ flag.add(CreateFlag.NEW_BLOCK);
+ }
+ return append(f, flag, bufferSize, progress);
+ }
+
/**
* Append to an existing file (optional operation).
*
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSShell.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSShell.java
index a00f21ecc94..e54b7332b1e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSShell.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSShell.java
@@ -37,6 +37,7 @@ import java.util.zip.GZIPOutputStream;
import java.util.function.Supplier;
import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.hdfs.protocol.XAttrNotFoundException;
import org.apache.hadoop.util.Lists;
import org.slf4j.Logger;
@@ -3043,6 +3044,96 @@ public class TestDFSShell {
assertThat(res, not(0));
}
+ @Test (timeout = 300000)
+ public void testAppendToFileWithOptionN() throws Exception {
+ final int inputFileLength = 1024 * 1024;
+ File testRoot = new File(TEST_ROOT_DIR, "testAppendToFileWithOptionN");
+ testRoot.mkdirs();
+
+ File file1 = new File(testRoot, "file1");
+ createLocalFileWithRandomData(inputFileLength, file1);
+
+ Configuration conf = new HdfsConfiguration();
+ try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(6).build()) {
+ cluster.waitActive();
+ FileSystem hdfs = cluster.getFileSystem();
+ assertTrue("Not a HDFS: " + hdfs.getUri(),
+ hdfs instanceof DistributedFileSystem);
+
+ // Run appendToFile with option n by replica policy once, make sure that the target file is
+ // created and is of the right size and block number is correct.
+ String dir = "/replica";
+ boolean mkdirs = hdfs.mkdirs(new Path(dir));
+ assertTrue("Mkdir fail", mkdirs);
+ Path remoteFile = new Path(dir + "/remoteFile");
+ FsShell shell = new FsShell();
+ shell.setConf(conf);
+ String[] argv = new String[] {
+ "-appendToFile", "-n", file1.toString(), remoteFile.toString() };
+ int res = ToolRunner.run(shell, argv);
+ assertEquals("Run appendToFile command fail", 0, res);
+ FileStatus fileStatus = hdfs.getFileStatus(remoteFile);
+ assertEquals("File size should be " + inputFileLength,
+ inputFileLength, fileStatus.getLen());
+ BlockLocation[] fileBlockLocations =
+ hdfs.getFileBlockLocations(fileStatus, 0, fileStatus.getLen());
+ assertEquals("Block Num should be 1", 1, fileBlockLocations.length);
+
+ // Run appendToFile with option n by replica policy again and
+ // make sure that the target file size has been doubled and block number has been doubled.
+ res = ToolRunner.run(shell, argv);
+ assertEquals("Run appendToFile command fail", 0, res);
+ fileStatus = hdfs.getFileStatus(remoteFile);
+ assertEquals("File size should be " + inputFileLength * 2,
+ inputFileLength * 2, fileStatus.getLen());
+ fileBlockLocations = hdfs.getFileBlockLocations(fileStatus, 0, fileStatus.getLen());
+ assertEquals("Block Num should be 2", 2, fileBlockLocations.length);
+
+ // Before run appendToFile with option n by ec policy, set ec policy for the dir.
+ dir = "/ecPolicy";
+ final String ecPolicyName = "RS-6-3-1024k";
+ mkdirs = hdfs.mkdirs(new Path(dir));
+ assertTrue("Mkdir fail", mkdirs);
+ ((DistributedFileSystem) hdfs).setErasureCodingPolicy(new Path(dir), ecPolicyName);
+ ErasureCodingPolicy erasureCodingPolicy =
+ ((DistributedFileSystem) hdfs).getErasureCodingPolicy(new Path(dir));
+ assertEquals("Set ec policy fail", ecPolicyName, erasureCodingPolicy.getName());
+
+ // Run appendToFile with option n by ec policy once, make sure that the target file is
+ // created and is of the right size and block group number is correct.
+ remoteFile = new Path(dir + "/remoteFile");
+ argv = new String[] {
+ "-appendToFile", "-n", file1.toString(), remoteFile.toString() };
+ res = ToolRunner.run(shell, argv);
+ assertEquals("Run appendToFile command fail", 0, res);
+ fileStatus = hdfs.getFileStatus(remoteFile);
+ assertEquals("File size should be " + inputFileLength,
+ inputFileLength, fileStatus.getLen());
+ fileBlockLocations = hdfs.getFileBlockLocations(fileStatus, 0, fileStatus.getLen());
+ assertEquals("Block Group Num should be 1", 1, fileBlockLocations.length);
+
+ // Run appendToFile without option n by ec policy again and make sure that
+ // append on EC file without new block must fail.
+ argv = new String[] {
+ "-appendToFile", file1.toString(), remoteFile.toString() };
+ res = ToolRunner.run(shell, argv);
+ assertTrue("Run appendToFile command must fail", res != 0);
+
+ // Run appendToFile with option n by ec policy again and
+ // make sure that the target file size has been doubled
+ // and block group number has been doubled.
+ argv = new String[] {
+ "-appendToFile", "-n", file1.toString(), remoteFile.toString() };
+ res = ToolRunner.run(shell, argv);
+ assertEquals("Run appendToFile command fail", 0, res);
+ fileStatus = hdfs.getFileStatus(remoteFile);
+ assertEquals("File size should be " + inputFileLength * 2,
+ inputFileLength * 2, fileStatus.getLen());
+ fileBlockLocations = hdfs.getFileBlockLocations(fileStatus, 0, fileStatus.getLen());
+ assertEquals("Block Group Num should be 2", 2, fileBlockLocations.length);
+ }
+ }
+
@Test (timeout = 30000)
public void testSetXAttrPermission() throws Exception {
UserGroupInformation user = UserGroupInformation.
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org