You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by an...@apache.org on 2018/07/04 10:31:28 UTC
oozie git commit: OOZIE-2791 ShareLib installation may fail on busy
Hadoop clusters (asasvari, kmarton via pbacsko, andras.piros)
Repository: oozie
Updated Branches:
refs/heads/master a299d4a6d -> 6b89aba42
OOZIE-2791 ShareLib installation may fail on busy Hadoop clusters (asasvari, kmarton via pbacsko, andras.piros)
Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/6b89aba4
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/6b89aba4
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/6b89aba4
Branch: refs/heads/master
Commit: 6b89aba4227326cd125a9ec409f418824cc2cada
Parents: a299d4a
Author: Andras Piros <an...@cloudera.com>
Authored: Wed Jul 4 12:29:28 2018 +0200
Committer: Andras Piros <an...@cloudera.com>
Committed: Wed Jul 4 12:29:28 2018 +0200
----------------------------------------------------------------------
release-log.txt | 1 +
.../apache/oozie/tools/OozieSharelibCLI.java | 281 +++++++++++++++----
.../tools/OozieSharelibFileOperations.java | 74 +++++
.../oozie/tools/TestBlockSizeCalculator.java | 49 ++++
.../tools/TestConcurrentCopyFromLocal.java | 121 ++++++++
.../oozie/tools/TestCopyTaskCallable.java | 145 ++++++++++
.../oozie/tools/TestOozieSharelibCLI.java | 58 ++--
7 files changed, 640 insertions(+), 89 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/oozie/blob/6b89aba4/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index 4fad9a2..65628c0 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
-- Oozie 5.1.0 release (trunk - unreleased)
+OOZIE-2791 ShareLib installation may fail on busy Hadoop clusters (asasvari, kmarton via pbacsko, andras.piros)
OOZIE-3297 Retry logic does not handle the exception from BulkJPAExecutor properly (andras.piros)
OOZIE-2955 [oozie-client] Fix Findbugs warnings (Jan Hentschel, kmarton via andras.piros)
OOZIE-3109 [log-streaming] Escape HTML-specific characters (dionusos via andras.piros)
http://git-wip-us.apache.org/repos/asf/oozie/blob/6b89aba4/tools/src/main/java/org/apache/oozie/tools/OozieSharelibCLI.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/oozie/tools/OozieSharelibCLI.java b/tools/src/main/java/org/apache/oozie/tools/OozieSharelibCLI.java
index dce1c55..75e932c 100644
--- a/tools/src/main/java/org/apache/oozie/tools/OozieSharelibCLI.java
+++ b/tools/src/main/java/org/apache/oozie/tools/OozieSharelibCLI.java
@@ -20,12 +20,14 @@ package org.apache.oozie.tools;
import java.io.File;
import java.io.IOException;
import java.net.URI;
+import java.nio.file.Files;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.List;
+import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
@@ -33,19 +35,25 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.filefilter.WildcardFileFilter;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.oozie.cli.CLIParser;
import org.apache.oozie.service.HadoopAccessorService;
import org.apache.oozie.service.Services;
import org.apache.oozie.service.WorkflowAppService;
+import org.eclipse.jetty.util.ConcurrentHashSet;
public class OozieSharelibCLI {
public static final String[] HELP_INFO = {
@@ -60,7 +68,6 @@ public class OozieSharelibCLI {
public static final String CONCURRENCY_OPT = "concurrency";
public static final String OOZIE_HOME = "oozie.home.dir";
public static final String SHARE_LIB_PREFIX = "lib_";
-
private boolean used;
public static void main(String[] args) throws Exception{
@@ -181,7 +188,13 @@ public class OozieSharelibCLI {
}
if (threadPoolSize > 1) {
- concurrentCopyFromLocal(fs, threadPoolSize, srcFile, dstPath);
+ long fsLimitsMinBlockSize = fs.getConf()
+ .getLong(DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY, DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_DEFAULT);
+ long bytesPerChecksum = fs.getConf()
+ .getLong(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_DEFAULT);
+ new ConcurrentCopyFromLocal(threadPoolSize, fsLimitsMinBlockSize, bytesPerChecksum)
+ .concurrentCopyFromLocal(fs, srcFile, dstPath);
+
} else {
fs.copyFromLocalFile(false, srcPath, dstPath);
}
@@ -197,13 +210,19 @@ public class OozieSharelibCLI {
System.err.println(parser.shortHelp());
return 1;
}
+ catch (NumberFormatException ex) {
+ logError("Invalid configuration value: ", ex);
+ return 1;
+ }
catch (Exception ex) {
logError(ex.getMessage(), ex);
return 1;
}
}
- private void logError(String errorMessage, Throwable ex) {
+
+
+ private static void logError(String errorMessage, Throwable ex) {
System.err.println();
System.err.println("Error: " + errorMessage);
System.err.println();
@@ -220,66 +239,228 @@ public class OozieSharelibCLI {
return dateFormat.format(date).toString();
}
- private void concurrentCopyFromLocal(final FileSystem fs, int threadPoolSize,
- File srcFile, final Path dstPath) throws IOException {
- List<Future<Void>> futures = Collections.emptyList();
- ExecutorService threadPool = Executors.newFixedThreadPool(threadPoolSize);
- try {
- futures = copyFolderRecursively(fs, threadPool, srcFile, dstPath);
- System.out.println("Running " + futures.size() + " copy tasks on " + threadPoolSize + " threads");
- } finally {
+ @VisibleForTesting
+ static final class CopyTaskConfiguration {
+ private final FileSystem fs;
+ private final File srcFile;
+ private final Path dstPath;
+
+ CopyTaskConfiguration(FileSystem fs, File srcFile, Path dstPath) {
+ this.fs = fs;
+ this.srcFile = srcFile;
+ this.dstPath = dstPath;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ CopyTaskConfiguration that = (CopyTaskConfiguration) o;
+ if (!srcFile.equals(that.srcFile)) {
+ return false;
+ }
+ return dstPath.equals(that.dstPath);
+ }
+
+ @Override
+ public int hashCode() {
+ int result = srcFile.hashCode();
+ result = 31 * result + dstPath.hashCode();
+ return result;
+ }
+
+ }
+
+ @VisibleForTesting
+ static final class BlockSizeCalculator {
+
+ protected static long getValidBlockSize (long fileLenght, long fsLimitsMinBlockSize, long bytesPerChecksum) {
+ if (fsLimitsMinBlockSize > fileLenght) {
+ return fsLimitsMinBlockSize;
+ }
+ // bytesPerChecksum must divide block size
+ if (fileLenght % bytesPerChecksum == 0) {
+ return fileLenght;
+ }
+ long ratio = fileLenght/bytesPerChecksum;
+ return (ratio + 1) * bytesPerChecksum;
+ }
+ }
+
+ @VisibleForTesting
+ static final class CopyTaskCallable implements Callable<CopyTaskConfiguration> {
+
+ private final static short REPLICATION_FACTOR = 3;
+ private final FileSystem fileSystem;
+ private final File file;
+ private final Path destinationPath;
+ private final Path targetName;
+ private final long blockSize;
+
+ private final Set<CopyTaskConfiguration> failedCopyTasks;
+
+ CopyTaskCallable(CopyTaskConfiguration copyTask, File file, Path trgName, long blockSize,
+ Set<CopyTaskConfiguration> failedCopyTasks) {
+ Preconditions.checkNotNull(copyTask);
+ Preconditions.checkNotNull(file);
+ Preconditions.checkNotNull(trgName);
+ Preconditions.checkNotNull(failedCopyTasks);
+ Preconditions.checkNotNull(copyTask.dstPath);
+ Preconditions.checkNotNull(copyTask.fs);
+ this.file = file;
+ this.destinationPath = copyTask.dstPath;
+ this.failedCopyTasks = failedCopyTasks;
+ this.fileSystem = copyTask.fs;
+ this.blockSize = blockSize;
+ this.targetName = trgName;
+ }
+
+ @Override
+ public CopyTaskConfiguration call() throws Exception {
+ CopyTaskConfiguration cp = new CopyTaskConfiguration(fileSystem, file, targetName);
+ failedCopyTasks.add(cp);
+ final Path destinationFilePath = new Path(destinationPath + File.separator + file.getName());
+ final boolean overwrite = true;
+ final int bufferSize = CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT;
+ try (FSDataOutputStream out = fileSystem
+ .create(destinationFilePath, overwrite, bufferSize, REPLICATION_FACTOR, blockSize)) {
+ Files.copy(file.toPath(), out);
+ }
+ return cp;
+ }
+ }
+
+ @VisibleForTesting
+ static final class ConcurrentCopyFromLocal {
+
+ private static final int DEFAULT_RETRY_COUNT = 5;
+ private static final int STARTING_RETRY_DELAY_IN_MS = 1000;
+ private int retryCount;
+ private int retryDelayInMs;
+ private long fsLimitsMinBlockSize;
+ private long bytesPerChecksum;
+
+ private final int threadPoolSize;
+ private final ExecutorService threadPool;
+ private final Set<CopyTaskConfiguration> failedCopyTasks = new ConcurrentHashSet<>();
+
+ public ConcurrentCopyFromLocal(int threadPoolSize, long fsLimitsMinBlockSize, long bytesPerChecksum) {
+ Preconditions.checkArgument(threadPoolSize > 0, "Thread Pool size must be greater than 0");
+ Preconditions.checkArgument(fsLimitsMinBlockSize > 0, "Minimun block size must be greater than 0");
+ Preconditions.checkArgument(bytesPerChecksum > 0, "Bytes per checksum must be greater than 0");
+ this.bytesPerChecksum = bytesPerChecksum;
+ this.fsLimitsMinBlockSize = fsLimitsMinBlockSize;
+ this.threadPoolSize = threadPoolSize;
+ this.threadPool = Executors.newFixedThreadPool(threadPoolSize);
+ this.retryCount = DEFAULT_RETRY_COUNT;
+ this.retryDelayInMs = STARTING_RETRY_DELAY_IN_MS;
+ }
+
+ @VisibleForTesting
+ void concurrentCopyFromLocal(FileSystem fs, File srcFile, Path dstPath) throws IOException {
+ List<Future<CopyTaskConfiguration>> futures = Collections.emptyList();
+ CopyTaskConfiguration copyTask = new CopyTaskConfiguration(fs, srcFile, dstPath);
try {
- threadPool.shutdown();
+ futures = copyFolderRecursively(copyTask);
+ System.out.println("Running " + futures.size() + " copy tasks on " + threadPoolSize + " threads");
} finally {
checkCopyResults(futures);
+ System.out.println("Copy tasks are done");
+ threadPool.shutdown();
}
}
- }
- private void checkCopyResults(List<Future<Void>> futures) throws IOException {
- Throwable t = null;
- for (Future<Void> future : futures) {
- try {
- future.get();
- } catch (CancellationException ce) {
- t = ce;
- logError("Copy task was cancelled", ce);
- } catch (ExecutionException ee) {
- t = ee.getCause();
- logError("Copy task failed with exception", t);
- } catch (InterruptedException ie) {
- Thread.currentThread().interrupt();
+ private List<Future<CopyTaskConfiguration>> copyFolderRecursively(final CopyTaskConfiguration copyTask) {
+ List<Future<CopyTaskConfiguration>> taskList = new ArrayList<>();
+ File[] fileList = copyTask.srcFile.listFiles();
+ if (fileList != null) {
+ for (final File file : fileList) {
+ final Path trgName = new Path(copyTask.dstPath, file.getName());
+ if (file.isDirectory()) {
+ taskList.addAll(copyFolderRecursively(
+ new CopyTaskConfiguration(copyTask.fs, file, trgName)));
+ } else {
+ final long blockSize = BlockSizeCalculator
+ .getValidBlockSize(file.length(), fsLimitsMinBlockSize, bytesPerChecksum);
+ taskList.add(threadPool
+ .submit(new CopyTaskCallable(copyTask, file, trgName, blockSize, failedCopyTasks)));
+ }
+ }
}
+ return taskList;
}
- if (t != null) {
- throw new IOException ("At least one copy task failed with exception", t);
+
+ private void checkCopyResults(final List<Future<CopyTaskConfiguration>> futures)
+ throws IOException {
+ boolean exceptionOccurred = false;
+ for (Future<CopyTaskConfiguration> future : futures) {
+ CopyTaskConfiguration cp;
+ try {
+ cp = future.get();
+ if (cp != null) {
+ failedCopyTasks.remove(cp);
+ }
+ } catch (CancellationException ce) {
+ exceptionOccurred = true;
+ logError("Copy task was cancelled", ce);
+ } catch (ExecutionException ee) {
+ exceptionOccurred = true;
+ logError("Copy task failed with exception", ee.getCause());
+ } catch (InterruptedException ie) {
+ exceptionOccurred = true;
+ Thread.currentThread().interrupt();
+ }
+ }
+ if (exceptionOccurred) {
+ System.err.println("At least one copy task failed with exception. Retrying failed copy tasks.");
+ retryFailedCopyTasks();
+
+ if (!failedCopyTasks.isEmpty() && retryCount == 0) {
+ throw new IOException("At least one copy task failed with exception");
+ }
+ }
}
- }
- private List<Future<Void>> copyFolderRecursively(final FileSystem fs, final ExecutorService threadPool,
- File srcFile, final Path dstPath) throws IOException {
- List<Future<Void>> taskList = new ArrayList<Future<Void>>();
- File[] files = srcFile.listFiles();
-
- if (files != null) {
- for (final File file : files) {
- final Path trgName = new Path(dstPath, file.getName());
- if (file.isDirectory()) {
- taskList.addAll(copyFolderRecursively(fs, threadPool, file, trgName));
- } else {
- taskList.add(threadPool.submit(new Callable<Void>() {
- @Override
- public Void call() throws Exception {
- fs.copyFromLocalFile(new Path(file.toURI()), trgName);
- return null;
- }
- }));
+ private void retryFailedCopyTasks() throws IOException {
+
+ while (retryCount > 0 && !failedCopyTasks.isEmpty()) {
+ try {
+ System.err.println("Waiting " + retryDelayInMs + " ms before retrying failed copy tasks.");
+ Thread.sleep(retryDelayInMs);
+ retryDelayInMs = retryDelayInMs * 2;
+ } catch (InterruptedException e) {
+ System.err.println(e.getMessage());
}
+
+ for (CopyTaskConfiguration cp : failedCopyTasks) {
+ System.err.println("Retrying to copy " + cp.srcFile + " to " + cp.dstPath);
+ try {
+ copyFromLocalFile(cp);
+ failedCopyTasks.remove(cp);
+ }
+ catch (IOException e) {
+ System.err.printf("Copying [%s] to [%s] failed with exception: [%s]%n. Proceed to next file.%n"
+ ,cp.srcFile, cp.dstPath, e.getMessage());
+ }
+ }
+
+ --retryCount;
+ }
+
+ if (!failedCopyTasks.isEmpty() && retryCount == 0) {
+ throw new IOException("Could not install Oozie ShareLib properly.");
}
- } else {
- System.out.println("WARNING: directory listing of " + srcFile.getAbsolutePath().toString() + " returned null");
}
- return taskList;
+ private void copyFromLocalFile(CopyTaskConfiguration cp) throws IOException{
+ final FileSystem fs = cp.fs;
+ fs.delete(cp.dstPath, false);
+ fs.copyFromLocalFile(false, new Path(cp.srcFile.toURI()), cp.dstPath);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/6b89aba4/tools/src/test/java/org/apache/oozie/tools/OozieSharelibFileOperations.java
----------------------------------------------------------------------
diff --git a/tools/src/test/java/org/apache/oozie/tools/OozieSharelibFileOperations.java b/tools/src/test/java/org/apache/oozie/tools/OozieSharelibFileOperations.java
new file mode 100644
index 0000000..d344300
--- /dev/null
+++ b/tools/src/test/java/org/apache/oozie/tools/OozieSharelibFileOperations.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.tools;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.Writer;
+import java.util.List;
+
+public final class OozieSharelibFileOperations {
+
+ /**
+ * Suppress default constructor for noninstantiability
+ */
+ private OozieSharelibFileOperations() {
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * generate a number of files equals with fileNr, and save the fileList parameter
+ * @param fileNr number of files to be generated
+ * @param fileList a list of the generated files
+ * @throws Exception
+ */
+ public static void generateAndWriteFiles(File libDirectory, int fileNr, List<File> fileList) throws IOException {
+ for (int i=0; i<fileNr; i++) {
+ String fileName = generateFileName(i);
+ String fileContent = generateFileContent(i);
+ fileList.add(writeFile(libDirectory, fileName, fileContent));
+ }
+ }
+
+ /**
+ * Create a file in a specified folder, with a specific name and content
+ * @param folder source folder
+ * @param filename name of the generated file
+ * @param content content of the generated file
+ * @return the created file
+ * @throws Exception
+ */
+ public static File writeFile(File folder, String filename, String content) throws IOException {
+ File file = new File(folder.getAbsolutePath() + File.separator + filename);
+ Writer writer = new FileWriter(file);
+ writer.write(content);
+ writer.flush();
+ writer.close();
+ return file;
+ }
+
+ public static String generateFileName(int i) {
+ return "file_" + i;
+ }
+
+ public static String generateFileContent(int i) {
+ return "test File " + i;
+ }
+}
http://git-wip-us.apache.org/repos/asf/oozie/blob/6b89aba4/tools/src/test/java/org/apache/oozie/tools/TestBlockSizeCalculator.java
----------------------------------------------------------------------
diff --git a/tools/src/test/java/org/apache/oozie/tools/TestBlockSizeCalculator.java b/tools/src/test/java/org/apache/oozie/tools/TestBlockSizeCalculator.java
new file mode 100644
index 0000000..b4a668e
--- /dev/null
+++ b/tools/src/test/java/org/apache/oozie/tools/TestBlockSizeCalculator.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.tools;
+
+import junit.framework.TestCase;
+import org.junit.Assert;
+
+public class TestBlockSizeCalculator extends TestCase{
+
+ private long minBlockSize = 1048576;
+ private long bytesPerChecksum = 512;
+
+ public void testGetValidBlockSizeWhenFileLengthLowerThanMinBlockSize() {
+ long fileLength = 615100;
+ long validBlockSize = OozieSharelibCLI.BlockSizeCalculator.getValidBlockSize(fileLength, minBlockSize, bytesPerChecksum);
+ Assert.assertEquals("The block size should be equal to the defined min block size", minBlockSize, validBlockSize);
+ }
+
+ public void testGetValidBlockSizeWhenBytesPerChecksumDoesNotDivideFileLength() {
+ long fileLength = 1048577;
+ long expectedBlockSize = (fileLength / bytesPerChecksum + 1) * bytesPerChecksum;
+ long validBlockSize = OozieSharelibCLI.BlockSizeCalculator.getValidBlockSize(fileLength, minBlockSize, bytesPerChecksum);
+ Assert.assertEquals("The block size should be the first greater value than the file size, dividable by bytes per checksum",
+ expectedBlockSize, validBlockSize);
+ }
+
+ public void testGetValidBlockSizeWhenBytesPerChecksumDivideFileLength() {
+ long fileLength = 1049088;
+ long validBlockSize = OozieSharelibCLI.BlockSizeCalculator.getValidBlockSize(fileLength, minBlockSize, bytesPerChecksum);
+ Assert.assertEquals("The block size should be equal with the file length", fileLength, validBlockSize);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/oozie/blob/6b89aba4/tools/src/test/java/org/apache/oozie/tools/TestConcurrentCopyFromLocal.java
----------------------------------------------------------------------
diff --git a/tools/src/test/java/org/apache/oozie/tools/TestConcurrentCopyFromLocal.java b/tools/src/test/java/org/apache/oozie/tools/TestConcurrentCopyFromLocal.java
new file mode 100644
index 0000000..d77eba6
--- /dev/null
+++ b/tools/src/test/java/org/apache/oozie/tools/TestConcurrentCopyFromLocal.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.tools;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.oozie.service.ConfigurationService;
+import org.apache.oozie.service.HadoopAccessorService;
+import org.apache.oozie.service.Services;
+import org.apache.oozie.service.WorkflowAppService;
+import org.apache.oozie.test.XTestCase;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.InputStream;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+
+public class TestConcurrentCopyFromLocal extends XTestCase {
+
+ private final String outPath = "outFolder";
+ private final TemporaryFolder tmpFolder = new TemporaryFolder();
+ private File libDirectory;
+ private Services services = null;
+ private Path dstPath = null;
+ private FileSystem fs;
+
+ @Override
+ protected void setUp() throws Exception {
+ super.setUp(false);
+ tmpFolder.create();
+ libDirectory = tmpFolder.newFolder("lib");
+ services = new Services();
+ services.get(ConfigurationService.class).getConf()
+ .set(Services.CONF_SERVICE_CLASSES,"org.apache.oozie.service.LiteWorkflowAppService,"
+ + "org.apache.oozie.service.SchedulerService,"
+ + "org.apache.oozie.service.HadoopAccessorService,"
+ + "org.apache.oozie.service.ShareLibService");
+ services.init();
+
+ HadoopAccessorService has = services.get(HadoopAccessorService.class);
+ URI uri = new Path(outPath).toUri();
+ Configuration fsConf = has.createConfiguration(uri.getAuthority());
+ fs = has.createFileSystem(System.getProperty("user.name"), uri, fsConf);
+
+ WorkflowAppService lwas = services.get(WorkflowAppService.class);
+ dstPath = lwas.getSystemLibPath();
+ }
+
+ @Override
+ protected void tearDown() throws Exception {
+ tmpFolder.delete();
+ services.destroy();
+ super.tearDown();
+ }
+
+ public void testConcurrentCopyFromLocalSameFileNrAndThreadNr() throws Exception {
+ final int testFiles = 15;
+ final int threadPoolSize = 15;
+ final long fsLimitsMinBlockSize = 1048576;
+ final long bytesPerChecksum = 512;
+ performAndCheckConcurrentCopy(testFiles, threadPoolSize, fsLimitsMinBlockSize, bytesPerChecksum);
+ }
+
+ public void testConcurrentCopyFromLocalMoreThreadsThanFiles() throws Exception {
+ final int testFiles = 15;
+ final int threadPoolSize = 35;
+ final long fsLimitsMinBlockSize = 1048576;
+ final long bytesPerChecksum = 512;
+ performAndCheckConcurrentCopy(testFiles, threadPoolSize, fsLimitsMinBlockSize, bytesPerChecksum);
+ }
+
+ public void testConcurrentCopyFromLocalHighThreadNr() throws Exception {
+ final int testFiles = 200;
+ final int threadPoolSize = 150;
+ final long fsLimitsMinBlockSize = 1048576;
+ final long bytesPerChecksum = 512;
+ performAndCheckConcurrentCopy(testFiles, threadPoolSize, fsLimitsMinBlockSize, bytesPerChecksum);
+ }
+
+ private void performAndCheckConcurrentCopy(final int testFiles, final int threadPoolSize, final long fsLimitsMinBlockSize,
+ final long bytesPerChecksum) throws Exception {
+ List<File> fileList = new ArrayList<>();
+
+ OozieSharelibFileOperations.generateAndWriteFiles(libDirectory, testFiles, fileList);
+ File srcFile = new File(libDirectory.getParentFile().getAbsolutePath());
+ OozieSharelibCLI.ConcurrentCopyFromLocal concurrentCopy = new OozieSharelibCLI
+ .ConcurrentCopyFromLocal(threadPoolSize, fsLimitsMinBlockSize, bytesPerChecksum);
+ concurrentCopy.concurrentCopyFromLocal(fs, srcFile, dstPath);
+
+ for (int i = 0; i < testFiles; i++) {
+ try (
+ InputStream originalFileStream = new FileInputStream(fileList.get(i));
+ InputStream copiedFileStream = fs.open(new Path(dstPath + File.separator + "lib",
+ fileList.get(i).getName()))){
+
+ assertTrue("The content of the files must be equal", IOUtils.contentEquals(originalFileStream, copiedFileStream));
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/oozie/blob/6b89aba4/tools/src/test/java/org/apache/oozie/tools/TestCopyTaskCallable.java
----------------------------------------------------------------------
diff --git a/tools/src/test/java/org/apache/oozie/tools/TestCopyTaskCallable.java b/tools/src/test/java/org/apache/oozie/tools/TestCopyTaskCallable.java
new file mode 100644
index 0000000..bce0433
--- /dev/null
+++ b/tools/src/test/java/org/apache/oozie/tools/TestCopyTaskCallable.java
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.oozie.tools;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.oozie.service.HadoopAccessorService;
+import org.apache.oozie.service.Services;
+import org.apache.oozie.service.WorkflowAppService;
+import org.apache.oozie.test.XTestCase;
+import org.eclipse.jetty.util.ConcurrentHashSet;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.InputStream;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+
+public class TestCopyTaskCallable extends XTestCase {
+ private final String outPath = "outFolder";
+ private final TemporaryFolder tmpFolder = new TemporaryFolder();
+ private File libDirectory;
+ private Services services = null;
+ private Path dstPath = null;
+ private FileSystem fs;
+
+ @Override
+ protected void setUp() throws Exception {
+ super.setUp(false);
+ tmpFolder.create();
+ libDirectory = tmpFolder.newFolder("lib");
+ services = new Services();
+ services.getConf()
+ .set(Services.CONF_SERVICE_CLASSES,"org.apache.oozie.service.LiteWorkflowAppService,"
+ + "org.apache.oozie.service.SchedulerService,"
+ + "org.apache.oozie.service.HadoopAccessorService,"
+ + "org.apache.oozie.service.ShareLibService");
+ services.init();
+
+ HadoopAccessorService has = services.get(HadoopAccessorService.class);
+ URI uri = new Path(outPath).toUri();
+ Configuration fsConf = has.createConfiguration(uri.getAuthority());
+ fs = has.createFileSystem(System.getProperty("user.name"), uri, fsConf);
+
+ WorkflowAppService lwas = services.get(WorkflowAppService.class);
+ dstPath = lwas.getSystemLibPath();
+ }
+
+ @Override
+ protected void tearDown() throws Exception {
+ tmpFolder.delete();
+ services.destroy();
+ super.tearDown();
+ }
+
+ public void testCallCopyTaskSameFileNrAndThreadNr() throws Exception {
+ final long blockSize = 1048576;
+ final int testFiles = 150;
+ final int poolSize = 150;
+ performAndCheckCallCopyTask(blockSize, poolSize, testFiles);
+ }
+
+ public void testCallCopyTaskOneThread() throws Exception {
+ final long blockSize = 1048576;
+ final int testFiles = 15;
+ final int poolSize = 1;
+ performAndCheckCallCopyTask(blockSize, poolSize, testFiles);
+ }
+
+ public void testCallCopyTaskMoreFilesThanThreads() throws Exception {
+ final long blockSize = 1048576;
+ final int testFiles = 150;
+ final int poolSize = 10;
+ performAndCheckCallCopyTask(blockSize, poolSize, testFiles);
+ }
+
+ public void testCallCopyTaskMoreThreadsThanFiles() throws Exception {
+ final long blockSize = 1048576;
+ final int testFiles = 15;
+ final int poolSize = 20;
+ performAndCheckCallCopyTask(blockSize, poolSize, testFiles);
+ }
+
+ private void performAndCheckCallCopyTask(final long blockSize, final int poolSize, final int testFiles) throws Exception {
+ Set<OozieSharelibCLI.CopyTaskConfiguration> failedCopyTasks = new ConcurrentHashSet<>();
+
+ List<File> fileList = new ArrayList<>();
+ OozieSharelibFileOperations.generateAndWriteFiles(libDirectory, testFiles, fileList);
+
+ File srcFile = new File(libDirectory.getParentFile().getAbsolutePath());
+
+ OozieSharelibCLI.CopyTaskConfiguration copyTask =
+ new OozieSharelibCLI.CopyTaskConfiguration(fs, srcFile, dstPath);
+ List<Future<OozieSharelibCLI.CopyTaskConfiguration>> taskList = new ArrayList<>();
+
+ final ExecutorService threadPool = Executors.newFixedThreadPool(poolSize);
+ try {
+ for (final File file : libDirectory.listFiles()) {
+ final Path trgName = new Path(dstPath, file.getName());
+ taskList.add(threadPool
+ .submit(new OozieSharelibCLI.CopyTaskCallable(copyTask, file, trgName, blockSize, failedCopyTasks)));
+ }
+ for (Future<OozieSharelibCLI.CopyTaskConfiguration> future : taskList) {
+ future.get();
+ }
+ } finally {
+ threadPool.shutdown();
+ }
+
+ for (int i = 0; i < testFiles; i++) {
+
+ try (
+ InputStream originalFileStream = new FileInputStream(fileList.get(i));
+ InputStream copiedFileStream = fs.open(new Path(dstPath, fileList.get(i).getName()))){
+ assertTrue("The content of the files must be equal", IOUtils.contentEquals(originalFileStream, copiedFileStream));
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/oozie/blob/6b89aba4/tools/src/test/java/org/apache/oozie/tools/TestOozieSharelibCLI.java
----------------------------------------------------------------------
diff --git a/tools/src/test/java/org/apache/oozie/tools/TestOozieSharelibCLI.java b/tools/src/test/java/org/apache/oozie/tools/TestOozieSharelibCLI.java
index f53d987..5929e5c 100644
--- a/tools/src/test/java/org/apache/oozie/tools/TestOozieSharelibCLI.java
+++ b/tools/src/test/java/org/apache/oozie/tools/TestOozieSharelibCLI.java
@@ -19,19 +19,12 @@
package org.apache.oozie.tools;
-import java.io.ByteArrayOutputStream;
-import java.io.File;
-import java.io.FileWriter;
-import java.io.InputStream;
-import java.io.PrintStream;
-import java.io.Writer;
-import java.net.URI;
import org.apache.commons.io.IOUtils;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.oozie.action.hadoop.security.LauncherSecurityManager;
+import org.apache.oozie.service.ConfigurationService;
import org.apache.oozie.service.HadoopAccessorService;
import org.apache.oozie.service.ServiceException;
import org.apache.oozie.service.Services;
@@ -40,21 +33,29 @@ import org.apache.oozie.service.WorkflowAppService;
import org.apache.oozie.test.XTestCase;
import org.junit.rules.TemporaryFolder;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.InputStream;
+import java.io.PrintStream;
+import java.net.URI;
+
/**
* Test OozieSharelibCLI
*/
public class TestOozieSharelibCLI extends XTestCase {
- private final String outPath = "outFolder";
+ public final String outPath = "outFolder";
+ private final TemporaryFolder tmpFolder = new TemporaryFolder();
+ private File libDirectory;
private Services services = null;
private Path dstPath = null;
private FileSystem fs;
- private final TemporaryFolder tmpFolder = new TemporaryFolder();
private LauncherSecurityManager launcherSecurityManager;
@Override
protected void setUp() throws Exception {
launcherSecurityManager = new LauncherSecurityManager();
launcherSecurityManager.enable();
tmpFolder.create();
+ libDirectory = tmpFolder.newFolder("lib");
super.setUp(false);
}
@@ -65,7 +66,6 @@ public class TestOozieSharelibCLI extends XTestCase {
if (services != null) {
services.destroy();
}
- tmpFolder.delete();
super.tearDown();
}
@@ -100,10 +100,8 @@ public class TestOozieSharelibCLI extends XTestCase {
*/
public void testOozieSharelibCLICreate() throws Exception {
- File libDirectory = tmpFolder.newFolder("lib");
-
- writeFile(libDirectory, "file1", "test File");
- writeFile(libDirectory, "file2", "test File2");
+ OozieSharelibFileOperations.writeFile(libDirectory, "file1", "test File");
+ OozieSharelibFileOperations.writeFile(libDirectory, "file2", "test File2");
String[] argsCreate = { "create", "-fs", outPath, "-locallib", libDirectory.getParentFile().getAbsolutePath() };
assertEquals(0, execOozieSharelibCLICommands(argsCreate));
@@ -127,10 +125,9 @@ public class TestOozieSharelibCLI extends XTestCase {
final int testFiles = 7;
final int concurrency = 5;
- File libDirectory = tmpFolder.newFolder("lib");
-
for (int i = 0; i < testFiles; i++) {
- writeFile(libDirectory, generateFileName(i), generateFileContent(i));
+ OozieSharelibFileOperations.writeFile(libDirectory, OozieSharelibFileOperations.generateFileName(i),
+ OozieSharelibFileOperations.generateFileContent(i));
}
String[] argsCreate = {"create", "-fs", outPath, "-locallib", libDirectory.getParentFile().getAbsolutePath(),
@@ -145,11 +142,11 @@ public class TestOozieSharelibCLI extends XTestCase {
// test files in new folder
for (int i = 0; i < testFiles; i++) {
- String fileName = generateFileName(i);
- String expectedFileContent = generateFileContent(i);
+ String fileName = OozieSharelibFileOperations.generateFileName(i);
+ String expectedFileContent = OozieSharelibFileOperations.generateFileContent(i);
InputStream in = null;
try {
- in = fs.open(new Path(latestLibPath, fileName));
+ in = getTargetFileSysyem().open(new Path(latestLibPath, fileName));
String actualFileContent = IOUtils.toString(in);
assertEquals(fileName, expectedFileContent, actualFileContent);
} finally {
@@ -193,7 +190,7 @@ public class TestOozieSharelibCLI extends XTestCase {
private Services getServices() throws ServiceException {
if (services == null) {
services = new Services();
- services.getConf()
+ services.get(ConfigurationService.class).getConf()
.set(Services.CONF_SERVICE_CLASSES,"org.apache.oozie.service.LiteWorkflowAppService,"
+ "org.apache.oozie.service.SchedulerService,"
+ "org.apache.oozie.service.HadoopAccessorService,"
@@ -211,15 +208,6 @@ public class TestOozieSharelibCLI extends XTestCase {
return dstPath;
}
- private void writeFile(File folder, String filename, String content) throws Exception {
- File file = new File(folder.getAbsolutePath() + File.separator + filename);
- Writer writer = new FileWriter(file);
- writer.write(content);
- writer.flush();
- writer.close();
-
- }
-
private int execOozieSharelibCLICommands(String[] args) throws Exception {
try {
OozieSharelibCLI.main(args);
@@ -236,12 +224,4 @@ public class TestOozieSharelibCLI extends XTestCase {
}
return 1;
}
-
- private static String generateFileName(int i) {
- return "file_" + i;
- }
-
- private static String generateFileContent(int i) {
- return "test File " + i;
- }
}