You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ae...@apache.org on 2017/07/16 18:19:25 UTC
hadoop git commit: HDFS-11786. Add support to make copyFromLocal
multi threaded. Contributed by Mukul Kumar Singh.
Repository: hadoop
Updated Branches:
refs/heads/trunk b778887af -> 02b141ac6
HDFS-11786. Add support to make copyFromLocal multi threaded. Contributed by Mukul Kumar Singh.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/02b141ac
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/02b141ac
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/02b141ac
Branch: refs/heads/trunk
Commit: 02b141ac6059323ec43e472ca36dc570fdca386f
Parents: b778887
Author: Anu Engineer <ae...@apache.org>
Authored: Sun Jul 16 10:59:34 2017 -0700
Committer: Anu Engineer <ae...@apache.org>
Committed: Sun Jul 16 10:59:34 2017 -0700
----------------------------------------------------------------------
.../apache/hadoop/fs/shell/CopyCommands.java | 112 +++++++++++-
.../apache/hadoop/fs/shell/MoveCommands.java | 4 +-
.../hadoop/fs/shell/TestCopyFromLocal.java | 173 +++++++++++++++++++
.../hadoop/fs/shell/TestCopyPreserveFlag.java | 19 ++
.../src/test/resources/testConf.xml | 44 ++++-
5 files changed, 346 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/02b141ac/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/CopyCommands.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/CopyCommands.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/CopyCommands.java
index e2fad75..7b3c53e 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/CopyCommands.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/CopyCommands.java
@@ -26,7 +26,11 @@ import java.net.URISyntaxException;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.FSDataInputStream;
@@ -288,9 +292,113 @@ class CopyCommands {
}
public static class CopyFromLocal extends Put {
+ private ThreadPoolExecutor executor = null;
+ private int numThreads = 1;
+
+ private static final int MAX_THREADS =
+ Runtime.getRuntime().availableProcessors() * 2;
public static final String NAME = "copyFromLocal";
- public static final String USAGE = Put.USAGE;
- public static final String DESCRIPTION = "Identical to the -put command.";
+ public static final String USAGE =
+ "[-f] [-p] [-l] [-d] [-t <thread count>] <localsrc> ... <dst>";
+ public static final String DESCRIPTION =
+ "Copy files from the local file system " +
+ "into fs. Copying fails if the file already " +
+ "exists, unless the -f flag is given.\n" +
+ "Flags:\n" +
+ " -p : Preserves access and modification times, ownership and the" +
+ " mode.\n" +
+ " -f : Overwrites the destination if it already exists.\n" +
+ " -t <thread count> : Number of threads to be used, default is 1.\n" +
+ " -l : Allow DataNode to lazily persist the file to disk. Forces" +
+ " replication factor of 1. This flag will result in reduced" +
+ " durability. Use with care.\n" +
+ " -d : Skip creation of temporary file(<dst>._COPYING_).\n";
+
+ private void setNumberThreads(String numberThreadsString) {
+ if (numberThreadsString == null) {
+ numThreads = 1;
+ } else {
+ int parsedValue = Integer.parseInt(numberThreadsString);
+ if (parsedValue <= 1) {
+ numThreads = 1;
+ } else if (parsedValue > MAX_THREADS) {
+ numThreads = MAX_THREADS;
+ } else {
+ numThreads = parsedValue;
+ }
+ }
+ }
+
+ @Override
+ protected void processOptions(LinkedList<String> args) throws IOException {
+ CommandFormat cf =
+ new CommandFormat(1, Integer.MAX_VALUE, "f", "p", "l", "d");
+ cf.addOptionWithValue("t");
+ cf.parse(args);
+ setNumberThreads(cf.getOptValue("t"));
+ setOverwrite(cf.getOpt("f"));
+ setPreserve(cf.getOpt("p"));
+ setLazyPersist(cf.getOpt("l"));
+ setDirectWrite(cf.getOpt("d"));
+ getRemoteDestination(args);
+ // should have a -r option
+ setRecursive(true);
+ }
+
+ private void copyFile(PathData src, PathData target) throws IOException {
+ if (isPathRecursable(src)) {
+ throw new PathIsDirectoryException(src.toString());
+ }
+ super.copyFileToTarget(src, target);
+ }
+
+ @Override
+ protected void copyFileToTarget(PathData src, PathData target)
+ throws IOException {
+ // if number of thread is 1, mimic put and avoid threading overhead
+ if (numThreads == 1) {
+ copyFile(src, target);
+ return;
+ }
+
+ Runnable task = () -> {
+ try {
+ copyFile(src, target);
+ } catch (IOException e) {
+ displayError(e);
+ }
+ };
+ executor.submit(task);
+ }
+
+ @Override
+ protected void processArguments(LinkedList<PathData> args)
+ throws IOException {
+ executor = new ThreadPoolExecutor(numThreads, numThreads, 1,
+ TimeUnit.SECONDS, new ArrayBlockingQueue<>(1024),
+ new ThreadPoolExecutor.CallerRunsPolicy());
+ super.processArguments(args);
+
+ // issue the command and then wait for it to finish
+ executor.shutdown();
+ try {
+ executor.awaitTermination(Long.MAX_VALUE, TimeUnit.MINUTES);
+ } catch (InterruptedException e) {
+ executor.shutdownNow();
+ displayError(e);
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ @VisibleForTesting
+ public int getNumThreads() {
+ return numThreads;
+ }
+
+ @VisibleForTesting
+ public ThreadPoolExecutor getExecutor() {
+ return executor;
+ }
}
public static class CopyToLocal extends Get {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/02b141ac/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/MoveCommands.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/MoveCommands.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/MoveCommands.java
index d359282..5ef4277 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/MoveCommands.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/MoveCommands.java
@@ -25,7 +25,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.PathIOException;
import org.apache.hadoop.fs.PathExistsException;
-import org.apache.hadoop.fs.shell.CopyCommands.CopyFromLocal;
+import org.apache.hadoop.fs.shell.CopyCommands.Put;
/** Various commands for moving files */
@InterfaceAudience.Private
@@ -41,7 +41,7 @@ class MoveCommands {
/**
* Move local files to a remote filesystem
*/
- public static class MoveFromLocal extends CopyFromLocal {
+ public static class MoveFromLocal extends Put {
public static final String NAME = "moveFromLocal";
public static final String USAGE = "<localsrc> ... <dst>";
public static final String DESCRIPTION =
http://git-wip-us.apache.org/repos/asf/hadoop/blob/02b141ac/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/shell/TestCopyFromLocal.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/shell/TestCopyFromLocal.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/shell/TestCopyFromLocal.java
new file mode 100644
index 0000000..8d354b4
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/shell/TestCopyFromLocal.java
@@ -0,0 +1,173 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.shell;
+
+import org.apache.commons.lang.RandomStringUtils;
+import org.apache.commons.lang.math.RandomUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.FileSystemTestHelper;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.shell.CopyCommands.CopyFromLocal;
+import org.junit.BeforeClass;
+import org.junit.AfterClass;
+import org.junit.Test;
+import org.junit.Assert;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.concurrent.ThreadPoolExecutor;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Test for copyFromLocal.
+ */
+public class TestCopyFromLocal {
+ private static final String FROM_DIR_NAME = "fromDir";
+ private static final String TO_DIR_NAME = "toDir";
+
+ private static FileSystem fs;
+ private static Path testDir;
+ private static Configuration conf;
+
+ public static int initialize(Path dir) throws Exception {
+ fs.mkdirs(dir);
+ Path fromDirPath = new Path(dir, FROM_DIR_NAME);
+ fs.mkdirs(fromDirPath);
+ Path toDirPath = new Path(dir, TO_DIR_NAME);
+ fs.mkdirs(toDirPath);
+
+ int numTotalFiles = 0;
+ int numDirs = RandomUtils.nextInt(5);
+ for (int dirCount = 0; dirCount < numDirs; ++dirCount) {
+ Path subDirPath = new Path(fromDirPath, "subdir" + dirCount);
+ fs.mkdirs(subDirPath);
+ int numFiles = RandomUtils.nextInt(10);
+ for (int fileCount = 0; fileCount < numFiles; ++fileCount) {
+ numTotalFiles++;
+ Path subFile = new Path(subDirPath, "file" + fileCount);
+ fs.createNewFile(subFile);
+ FSDataOutputStream output = fs.create(subFile, true);
+ for(int i = 0; i < 100; ++i) {
+ output.writeInt(i);
+ output.writeChar('\n');
+ }
+ output.close();
+ }
+ }
+
+ return numTotalFiles;
+ }
+
+ @BeforeClass
+ public static void init() throws Exception {
+ conf = new Configuration(false);
+ conf.set("fs.file.impl", LocalFileSystem.class.getName());
+ fs = FileSystem.getLocal(conf);
+ testDir = new FileSystemTestHelper().getTestRootPath(fs);
+ // don't want scheme on the path, just an absolute path
+ testDir = new Path(fs.makeQualified(testDir).toUri().getPath());
+
+ FileSystem.setDefaultUri(conf, fs.getUri());
+ fs.setWorkingDirectory(testDir);
+ }
+
+ @AfterClass
+ public static void cleanup() throws Exception {
+ fs.delete(testDir, true);
+ fs.close();
+ }
+
+ private void run(CommandWithDestination cmd, String... args) {
+ cmd.setConf(conf);
+ assertEquals(0, cmd.run(args));
+ }
+
+ @Test(timeout = 10000)
+ public void testCopyFromLocal() throws Exception {
+ Path dir = new Path("dir" + RandomStringUtils.randomNumeric(4));
+ TestCopyFromLocal.initialize(dir);
+ run(new TestMultiThreadedCopy(1, 0),
+ new Path(dir, FROM_DIR_NAME).toString(),
+ new Path(dir, TO_DIR_NAME).toString());
+ }
+
+ @Test(timeout = 10000)
+ public void testCopyFromLocalWithThreads() throws Exception {
+ Path dir = new Path("dir" + RandomStringUtils.randomNumeric(4));
+ int numFiles = TestCopyFromLocal.initialize(dir);
+ int maxThreads = Runtime.getRuntime().availableProcessors() * 2;
+ int randThreads = RandomUtils.nextInt(maxThreads);
+ int numActualThreads = randThreads == 0 ? 1 : randThreads;
+ String numThreads = Integer.toString(numActualThreads);
+ run(new TestMultiThreadedCopy(numActualThreads, numFiles), "-t", numThreads,
+ new Path(dir, FROM_DIR_NAME).toString(),
+ new Path(dir, TO_DIR_NAME).toString());
+ }
+
+ @Test(timeout = 10000)
+ public void testCopyFromLocalWithThreadWrong() throws Exception {
+ Path dir = new Path("dir" + RandomStringUtils.randomNumeric(4));
+ int numFiles = TestCopyFromLocal.initialize(dir);
+ int maxThreads = Runtime.getRuntime().availableProcessors() * 2;
+ String numThreads = Integer.toString(maxThreads * 2);
+ run(new TestMultiThreadedCopy(maxThreads, numFiles), "-t", numThreads,
+ new Path(dir, FROM_DIR_NAME).toString(),
+ new Path(dir, TO_DIR_NAME).toString());
+ }
+
+ @Test(timeout = 10000)
+ public void testCopyFromLocalWithZeroThreads() throws Exception {
+ Path dir = new Path("dir" + RandomStringUtils.randomNumeric(4));
+ TestCopyFromLocal.initialize(dir);
+ run(new TestMultiThreadedCopy(1, 0), "-t", "0",
+ new Path(dir, FROM_DIR_NAME).toString(),
+ new Path(dir, TO_DIR_NAME).toString());
+ }
+
+ private class TestMultiThreadedCopy extends CopyFromLocal {
+ private int expectedThreads;
+ private int expectedCompletedTaskCount;
+
+ TestMultiThreadedCopy(int expectedThreads,
+ int expectedCompletedTaskCount) {
+ this.expectedThreads = expectedThreads;
+ this.expectedCompletedTaskCount = expectedCompletedTaskCount;
+ }
+
+ @Override
+ protected void processArguments(LinkedList<PathData> args)
+ throws IOException {
+ // Check if the correct number of threads are spawned
+ Assert.assertEquals(expectedThreads, getNumThreads());
+ super.processArguments(args);
+ // Once the copy is complete, check following
+ // 1) number of completed tasks are same as expected
+ // 2) There are no active tasks in the executor
+ // 3) Executor has shutdown correctly
+ ThreadPoolExecutor executor = getExecutor();
+ Assert.assertEquals(executor.getCompletedTaskCount(),
+ expectedCompletedTaskCount);
+ Assert.assertEquals(executor.getActiveCount(), 0);
+ Assert.assertTrue(executor.isTerminated());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/02b141ac/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/shell/TestCopyPreserveFlag.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/shell/TestCopyPreserveFlag.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/shell/TestCopyPreserveFlag.java
index 47dc601..8dd09e5 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/shell/TestCopyPreserveFlag.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/shell/TestCopyPreserveFlag.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.shell.CopyCommands.Cp;
import org.apache.hadoop.fs.shell.CopyCommands.Get;
import org.apache.hadoop.fs.shell.CopyCommands.Put;
+import org.apache.hadoop.fs.shell.CopyCommands.CopyFromLocal;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -120,6 +121,24 @@ public class TestCopyPreserveFlag {
}
@Test(timeout = 10000)
+ public void testCopyFromLocal() throws Exception {
+ run(new CopyFromLocal(), FROM.toString(), TO.toString());
+ assertAttributesChanged(TO);
+ }
+
+ @Test(timeout = 10000)
+ public void testCopyFromLocalWithThreads() throws Exception {
+ run(new CopyFromLocal(), "-t", "10", FROM.toString(), TO.toString());
+ assertAttributesChanged(TO);
+ }
+
+ @Test(timeout = 10000)
+ public void testCopyFromLocalWithThreadsPreserve() throws Exception {
+ run(new CopyFromLocal(), "-p", "-t", "10", FROM.toString(), TO.toString());
+ assertAttributesPreserved(TO);
+ }
+
+ @Test(timeout = 10000)
public void testGetWithP() throws Exception {
run(new Get(), "-p", FROM.toString(), TO.toString());
assertAttributesPreserved(TO);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/02b141ac/hadoop-common-project/hadoop-common/src/test/resources/testConf.xml
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/resources/testConf.xml b/hadoop-common-project/hadoop-common/src/test/resources/testConf.xml
index 342b17c..64677f8 100644
--- a/hadoop-common-project/hadoop-common/src/test/resources/testConf.xml
+++ b/hadoop-common-project/hadoop-common/src/test/resources/testConf.xml
@@ -547,11 +547,51 @@
<comparators>
<comparator>
<type>RegexpComparator</type>
- <expected-output>^-copyFromLocal \[-f\] \[-p\] \[-l\] \[-d\] <localsrc> \.\.\. <dst> :\s*</expected-output>
+ <expected-output>^-copyFromLocal \[-f\] \[-p\] \[-l\] \[-d\] \[-t <thread count>\] <localsrc> \.\.\. <dst> :\s*</expected-output>
</comparator>
<comparator>
<type>RegexpComparator</type>
- <expected-output>^\s*Identical to the -put command\.\s*</expected-output>
+ <expected-output>^\s*Copy files from the local file system into fs.( )*Copying fails if the file already( )*</expected-output>
+ </comparator>
+ <comparator>
+ <type>RegexpComparator</type>
+ <expected-output>^\s*exists, unless the -f flag is given.( )*</expected-output>
+ </comparator>
+ <comparator>
+ <type>RegexpComparator</type>
+ <expected-output>^\s*Flags:( )*</expected-output>
+ </comparator>
+ <comparator>
+ <type>RegexpComparator</type>
+ <expected-output>^\s*-p Preserves access and modification times, ownership and the( )*</expected-output>
+ </comparator>
+ <comparator>
+ <type>RegexpComparator</type>
+ <expected-output>^\s*mode.( )*</expected-output>
+ </comparator>
+ <comparator>
+ <type>RegexpComparator</type>
+ <expected-output>^\s*-f Overwrites the destination if it already exists.( )*</expected-output>
+ </comparator>
+ <comparator>
+ <type>RegexpComparator</type>
+ <expected-output>^\s*-t <thread count> Number of threads to be used, default is 1.( )*</expected-output>
+ </comparator>
+ <comparator>
+ <type>RegexpComparator</type>
+ <expected-output>^\s*-l Allow DataNode to lazily persist the file to disk. Forces( )*</expected-output>
+ </comparator>
+ <comparator>
+ <type>RegexpComparator</type>
+ <expected-output>^\s*replication factor of 1. This flag will result in reduced( )*</expected-output>
+ </comparator>
+ <comparator>
+ <type>RegexpComparator</type>
+ <expected-output>^\s*durability. Use with care.( )*</expected-output>
+ </comparator>
+ <comparator>
+ <type>RegexpComparator</type>
+ <expected-output>^\s*-d Skip creation of temporary file\(<dst>\._COPYING_\).( )*</expected-output>
</comparator>
</comparators>
</test>
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org