You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by an...@apache.org on 2018/07/04 10:31:28 UTC

oozie git commit: OOZIE-2791 ShareLib installation may fail on busy Hadoop clusters (asasvari, kmarton via pbacsko, andras.piros)

Repository: oozie
Updated Branches:
  refs/heads/master a299d4a6d -> 6b89aba42


OOZIE-2791 ShareLib installation may fail on busy Hadoop clusters (asasvari, kmarton via pbacsko, andras.piros)


Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/6b89aba4
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/6b89aba4
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/6b89aba4

Branch: refs/heads/master
Commit: 6b89aba4227326cd125a9ec409f418824cc2cada
Parents: a299d4a
Author: Andras Piros <an...@cloudera.com>
Authored: Wed Jul 4 12:29:28 2018 +0200
Committer: Andras Piros <an...@cloudera.com>
Committed: Wed Jul 4 12:29:28 2018 +0200

----------------------------------------------------------------------
 release-log.txt                                 |   1 +
 .../apache/oozie/tools/OozieSharelibCLI.java    | 281 +++++++++++++++----
 .../tools/OozieSharelibFileOperations.java      |  74 +++++
 .../oozie/tools/TestBlockSizeCalculator.java    |  49 ++++
 .../tools/TestConcurrentCopyFromLocal.java      | 121 ++++++++
 .../oozie/tools/TestCopyTaskCallable.java       | 145 ++++++++++
 .../oozie/tools/TestOozieSharelibCLI.java       |  58 ++--
 7 files changed, 640 insertions(+), 89 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/oozie/blob/6b89aba4/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index 4fad9a2..65628c0 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
 -- Oozie 5.1.0 release (trunk - unreleased)
 
+OOZIE-2791 ShareLib installation may fail on busy Hadoop clusters (asasvari, kmarton via pbacsko, andras.piros)
 OOZIE-3297 Retry logic does not handle the exception from BulkJPAExecutor properly (andras.piros)
 OOZIE-2955 [oozie-client] Fix Findbugs warnings (Jan Hentschel, kmarton via andras.piros)
 OOZIE-3109 [log-streaming] Escape HTML-specific characters (dionusos via andras.piros)

http://git-wip-us.apache.org/repos/asf/oozie/blob/6b89aba4/tools/src/main/java/org/apache/oozie/tools/OozieSharelibCLI.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/oozie/tools/OozieSharelibCLI.java b/tools/src/main/java/org/apache/oozie/tools/OozieSharelibCLI.java
index dce1c55..75e932c 100644
--- a/tools/src/main/java/org/apache/oozie/tools/OozieSharelibCLI.java
+++ b/tools/src/main/java/org/apache/oozie/tools/OozieSharelibCLI.java
@@ -20,12 +20,14 @@ package org.apache.oozie.tools;
 import java.io.File;
 import java.io.IOException;
 import java.net.URI;
+import java.nio.file.Files;
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Date;
 import java.util.List;
+import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.ExecutionException;
@@ -33,19 +35,25 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
 import org.apache.commons.cli.Option;
 import org.apache.commons.cli.Options;
 import org.apache.commons.cli.ParseException;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.io.filefilter.WildcardFileFilter;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.oozie.cli.CLIParser;
 import org.apache.oozie.service.HadoopAccessorService;
 import org.apache.oozie.service.Services;
 import org.apache.oozie.service.WorkflowAppService;
+import org.eclipse.jetty.util.ConcurrentHashSet;
 
 public class OozieSharelibCLI {
     public static final String[] HELP_INFO = {
@@ -60,7 +68,6 @@ public class OozieSharelibCLI {
     public static final String CONCURRENCY_OPT = "concurrency";
     public static final String OOZIE_HOME = "oozie.home.dir";
     public static final String SHARE_LIB_PREFIX = "lib_";
-
     private boolean used;
 
     public static void main(String[] args) throws Exception{
@@ -181,7 +188,13 @@ public class OozieSharelibCLI {
             }
 
             if (threadPoolSize > 1) {
-                concurrentCopyFromLocal(fs, threadPoolSize, srcFile, dstPath);
+                long fsLimitsMinBlockSize = fs.getConf()
+                        .getLong(DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY, DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_DEFAULT);
+                long bytesPerChecksum = fs.getConf()
+                        .getLong(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_DEFAULT);
+                new ConcurrentCopyFromLocal(threadPoolSize, fsLimitsMinBlockSize, bytesPerChecksum)
+                        .concurrentCopyFromLocal(fs, srcFile, dstPath);
+
             } else {
                 fs.copyFromLocalFile(false, srcPath, dstPath);
             }
@@ -197,13 +210,19 @@ public class OozieSharelibCLI {
             System.err.println(parser.shortHelp());
             return 1;
         }
+        catch (NumberFormatException ex) {
+            logError("Invalid configuration value: ", ex);
+            return 1;
+        }
         catch (Exception ex) {
             logError(ex.getMessage(), ex);
             return 1;
         }
     }
 
-    private void logError(String errorMessage, Throwable ex) {
+
+
+    private static void logError(String errorMessage, Throwable ex) {
         System.err.println();
         System.err.println("Error: " + errorMessage);
         System.err.println();
@@ -220,66 +239,228 @@ public class OozieSharelibCLI {
         return dateFormat.format(date).toString();
     }
 
-    private void concurrentCopyFromLocal(final FileSystem fs, int threadPoolSize,
-            File srcFile, final Path dstPath) throws IOException {
-        List<Future<Void>> futures = Collections.emptyList();
-        ExecutorService threadPool = Executors.newFixedThreadPool(threadPoolSize);
-        try {
-            futures = copyFolderRecursively(fs, threadPool, srcFile, dstPath);
-            System.out.println("Running " + futures.size() + " copy tasks on " + threadPoolSize + " threads");
-        } finally {
+    @VisibleForTesting
+    static final class CopyTaskConfiguration {
+        private final FileSystem fs;
+        private final File srcFile;
+        private final Path dstPath;
+
+        CopyTaskConfiguration(FileSystem fs, File srcFile, Path dstPath) {
+            this.fs = fs;
+            this.srcFile = srcFile;
+            this.dstPath = dstPath;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+
+            CopyTaskConfiguration that = (CopyTaskConfiguration) o;
+            if (!srcFile.equals(that.srcFile)) {
+                return false;
+            }
+            return dstPath.equals(that.dstPath);
+        }
+
+        @Override
+        public int hashCode() {
+            int result = srcFile.hashCode();
+            result = 31 * result + dstPath.hashCode();
+            return result;
+        }
+
+    }
+
+    @VisibleForTesting
+    static final class BlockSizeCalculator {
+
+        protected static long getValidBlockSize (long fileLenght, long fsLimitsMinBlockSize, long bytesPerChecksum) {
+            if (fsLimitsMinBlockSize > fileLenght) {
+                return fsLimitsMinBlockSize;
+            }
+            // bytesPerChecksum must divide block size
+            if (fileLenght % bytesPerChecksum == 0) {
+                return fileLenght;
+            }
+            long ratio = fileLenght/bytesPerChecksum;
+            return (ratio + 1) * bytesPerChecksum;
+        }
+    }
+
+    @VisibleForTesting
+    static final class CopyTaskCallable implements Callable<CopyTaskConfiguration> {
+
+        private final static short REPLICATION_FACTOR = 3;
+        private final FileSystem fileSystem;
+        private final File file;
+        private final Path destinationPath;
+        private final Path targetName;
+        private final long blockSize;
+
+        private final Set<CopyTaskConfiguration> failedCopyTasks;
+
+        CopyTaskCallable(CopyTaskConfiguration copyTask, File file, Path trgName, long blockSize,
+                                 Set<CopyTaskConfiguration> failedCopyTasks) {
+            Preconditions.checkNotNull(copyTask);
+            Preconditions.checkNotNull(file);
+            Preconditions.checkNotNull(trgName);
+            Preconditions.checkNotNull(failedCopyTasks);
+            Preconditions.checkNotNull(copyTask.dstPath);
+            Preconditions.checkNotNull(copyTask.fs);
+            this.file = file;
+            this.destinationPath = copyTask.dstPath;
+            this.failedCopyTasks = failedCopyTasks;
+            this.fileSystem = copyTask.fs;
+            this.blockSize = blockSize;
+            this.targetName = trgName;
+        }
+
+        @Override
+        public CopyTaskConfiguration call() throws Exception {
+            CopyTaskConfiguration cp = new CopyTaskConfiguration(fileSystem, file, targetName);
+            failedCopyTasks.add(cp);
+            final Path destinationFilePath = new Path(destinationPath + File.separator +  file.getName());
+            final boolean overwrite = true;
+            final int bufferSize = CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT;
+            try (FSDataOutputStream out = fileSystem
+                    .create(destinationFilePath, overwrite, bufferSize, REPLICATION_FACTOR, blockSize)) {
+                Files.copy(file.toPath(), out);
+            }
+            return cp;
+        }
+    }
+
+    @VisibleForTesting
+    static final class ConcurrentCopyFromLocal {
+
+        private static final int DEFAULT_RETRY_COUNT = 5;
+        private static final int STARTING_RETRY_DELAY_IN_MS = 1000;
+        private int retryCount;
+        private int retryDelayInMs;
+        private long fsLimitsMinBlockSize;
+        private long bytesPerChecksum;
+
+        private final int threadPoolSize;
+        private final ExecutorService threadPool;
+        private final Set<CopyTaskConfiguration> failedCopyTasks = new ConcurrentHashSet<>();
+
+        public ConcurrentCopyFromLocal(int threadPoolSize, long fsLimitsMinBlockSize, long bytesPerChecksum) {
+            Preconditions.checkArgument(threadPoolSize > 0, "Thread Pool size must be greater than 0");
+            Preconditions.checkArgument(fsLimitsMinBlockSize > 0, "Minimun block size must be greater than 0");
+            Preconditions.checkArgument(bytesPerChecksum > 0, "Bytes per checksum must be greater than 0");
+            this.bytesPerChecksum = bytesPerChecksum;
+            this.fsLimitsMinBlockSize = fsLimitsMinBlockSize;
+            this.threadPoolSize = threadPoolSize;
+            this.threadPool = Executors.newFixedThreadPool(threadPoolSize);
+            this.retryCount = DEFAULT_RETRY_COUNT;
+            this.retryDelayInMs = STARTING_RETRY_DELAY_IN_MS;
+        }
+
+        @VisibleForTesting
+        void concurrentCopyFromLocal(FileSystem fs, File srcFile, Path dstPath) throws IOException {
+            List<Future<CopyTaskConfiguration>> futures = Collections.emptyList();
+            CopyTaskConfiguration copyTask = new CopyTaskConfiguration(fs, srcFile, dstPath);
             try {
-                threadPool.shutdown();
+                futures = copyFolderRecursively(copyTask);
+                System.out.println("Running " + futures.size() + " copy tasks on " + threadPoolSize + " threads");
             } finally {
                 checkCopyResults(futures);
+                System.out.println("Copy tasks are done");
+                threadPool.shutdown();
             }
         }
-    }
 
-    private void checkCopyResults(List<Future<Void>> futures) throws IOException {
-        Throwable t = null;
-        for (Future<Void> future : futures) {
-            try {
-                future.get();
-            } catch (CancellationException ce) {
-                t = ce;
-                logError("Copy task was cancelled", ce);
-            } catch (ExecutionException ee) {
-                t = ee.getCause();
-                logError("Copy task failed with exception", t);
-            } catch (InterruptedException ie) {
-                Thread.currentThread().interrupt();
+        private List<Future<CopyTaskConfiguration>> copyFolderRecursively(final CopyTaskConfiguration copyTask) {
+            List<Future<CopyTaskConfiguration>> taskList = new ArrayList<>();
+            File[] fileList = copyTask.srcFile.listFiles();
+            if (fileList != null) {
+                for (final File file : fileList) {
+                    final Path trgName = new Path(copyTask.dstPath, file.getName());
+                    if (file.isDirectory()) {
+                        taskList.addAll(copyFolderRecursively(
+                                new CopyTaskConfiguration(copyTask.fs, file, trgName)));
+                    } else {
+                        final long blockSize = BlockSizeCalculator
+                                .getValidBlockSize(file.length(), fsLimitsMinBlockSize, bytesPerChecksum);
+                        taskList.add(threadPool
+                                .submit(new CopyTaskCallable(copyTask, file, trgName, blockSize, failedCopyTasks)));
+                    }
+                }
             }
+            return taskList;
         }
-        if (t != null) {
-            throw new IOException ("At least one copy task failed with exception", t);
+
+        private void checkCopyResults(final List<Future<CopyTaskConfiguration>> futures)
+                throws IOException {
+            boolean exceptionOccurred = false;
+            for (Future<CopyTaskConfiguration> future : futures) {
+                CopyTaskConfiguration cp;
+                try {
+                    cp = future.get();
+                    if (cp != null) {
+                        failedCopyTasks.remove(cp);
+                    }
+                } catch (CancellationException ce) {
+                    exceptionOccurred = true;
+                    logError("Copy task was cancelled", ce);
+                } catch (ExecutionException ee) {
+                    exceptionOccurred = true;
+                    logError("Copy task failed with exception", ee.getCause());
+                } catch (InterruptedException ie) {
+                    exceptionOccurred = true;
+                    Thread.currentThread().interrupt();
+                }
+            }
+            if (exceptionOccurred) {
+                System.err.println("At least one copy task failed with exception. Retrying failed copy tasks.");
+                retryFailedCopyTasks();
+
+                if (!failedCopyTasks.isEmpty() && retryCount == 0) {
+                    throw new IOException("At least one copy task failed with exception");
+                }
+            }
         }
-    }
 
-    private List<Future<Void>> copyFolderRecursively(final FileSystem fs, final ExecutorService threadPool,
-            File srcFile, final Path dstPath) throws IOException {
-        List<Future<Void>> taskList = new ArrayList<Future<Void>>();
-        File[] files = srcFile.listFiles();
-
-        if (files != null) {
-            for (final File file : files) {
-                final Path trgName = new Path(dstPath, file.getName());
-                if (file.isDirectory()) {
-                    taskList.addAll(copyFolderRecursively(fs, threadPool, file, trgName));
-                } else {
-                    taskList.add(threadPool.submit(new Callable<Void>() {
-                        @Override
-                        public Void call() throws Exception {
-                            fs.copyFromLocalFile(new Path(file.toURI()), trgName);
-                            return null;
-                        }
-                    }));
+        private void retryFailedCopyTasks() throws IOException {
+
+            while (retryCount > 0 && !failedCopyTasks.isEmpty()) {
+                try {
+                    System.err.println("Waiting " + retryDelayInMs + " ms before retrying failed copy tasks.");
+                    Thread.sleep(retryDelayInMs);
+                    retryDelayInMs = retryDelayInMs * 2;
+                } catch (InterruptedException e) {
+                    System.err.println(e.getMessage());
                 }
+
+                for (CopyTaskConfiguration cp : failedCopyTasks) {
+                    System.err.println("Retrying to copy " + cp.srcFile + " to " + cp.dstPath);
+                    try {
+                        copyFromLocalFile(cp);
+                        failedCopyTasks.remove(cp);
+                    }
+                    catch (IOException e) {
+                        System.err.printf("Copying [%s] to [%s] failed with exception: [%s]%n. Proceed to next file.%n"
+                                ,cp.srcFile, cp.dstPath, e.getMessage());
+                    }
+                }
+
+                --retryCount;
+            }
+
+            if (!failedCopyTasks.isEmpty() && retryCount == 0) {
+                throw new IOException("Could not install Oozie ShareLib properly.");
             }
-        } else {
-            System.out.println("WARNING: directory listing of " + srcFile.getAbsolutePath().toString() + " returned null");
         }
 
-        return taskList;
+        private void copyFromLocalFile(CopyTaskConfiguration cp) throws IOException{
+            final FileSystem fs = cp.fs;
+            fs.delete(cp.dstPath, false);
+            fs.copyFromLocalFile(false, new Path(cp.srcFile.toURI()), cp.dstPath);
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/oozie/blob/6b89aba4/tools/src/test/java/org/apache/oozie/tools/OozieSharelibFileOperations.java
----------------------------------------------------------------------
diff --git a/tools/src/test/java/org/apache/oozie/tools/OozieSharelibFileOperations.java b/tools/src/test/java/org/apache/oozie/tools/OozieSharelibFileOperations.java
new file mode 100644
index 0000000..d344300
--- /dev/null
+++ b/tools/src/test/java/org/apache/oozie/tools/OozieSharelibFileOperations.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.tools;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.Writer;
+import java.util.List;
+
+public final class OozieSharelibFileOperations {
+
+    /**
+     * Suppress default constructor for noninstantiability
+     */
+    private OozieSharelibFileOperations() {
+        throw new UnsupportedOperationException();
+    }
+
+    /**
+     * generate a number of files equals with fileNr, and save the fileList parameter
+     * @param fileNr number of files to be generated
+     * @param fileList a list of the generated files
+     * @throws Exception
+     */
+    public static void generateAndWriteFiles(File libDirectory, int fileNr, List<File> fileList) throws IOException {
+        for (int i=0; i<fileNr; i++) {
+            String fileName = generateFileName(i);
+            String fileContent = generateFileContent(i);
+            fileList.add(writeFile(libDirectory, fileName, fileContent));
+        }
+    }
+
+    /**
+     * Create a file in a specified folder, with a specific name and content
+     * @param folder source folder
+     * @param filename name of the generated file
+     * @param content content of the generated file
+     * @return the created file
+     * @throws Exception
+     */
+    public static File writeFile(File folder, String filename, String content) throws IOException {
+        File file = new File(folder.getAbsolutePath() + File.separator + filename);
+        Writer writer = new FileWriter(file);
+        writer.write(content);
+        writer.flush();
+        writer.close();
+        return file;
+    }
+
+    public static String generateFileName(int i) {
+        return "file_" + i;
+    }
+
+    public static String generateFileContent(int i) {
+        return "test File " + i;
+    }
+}

http://git-wip-us.apache.org/repos/asf/oozie/blob/6b89aba4/tools/src/test/java/org/apache/oozie/tools/TestBlockSizeCalculator.java
----------------------------------------------------------------------
diff --git a/tools/src/test/java/org/apache/oozie/tools/TestBlockSizeCalculator.java b/tools/src/test/java/org/apache/oozie/tools/TestBlockSizeCalculator.java
new file mode 100644
index 0000000..b4a668e
--- /dev/null
+++ b/tools/src/test/java/org/apache/oozie/tools/TestBlockSizeCalculator.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.tools;
+
+import junit.framework.TestCase;
+import org.junit.Assert;
+
+public class TestBlockSizeCalculator extends TestCase{
+
+    private long minBlockSize = 1048576;
+    private long bytesPerChecksum = 512;
+
+    public void testGetValidBlockSizeWhenFileLengthLowerThanMinBlockSize() {
+        long fileLength = 615100;
+        long validBlockSize = OozieSharelibCLI.BlockSizeCalculator.getValidBlockSize(fileLength, minBlockSize, bytesPerChecksum);
+        Assert.assertEquals("The block size should be equal to the defined min block size", minBlockSize, validBlockSize);
+    }
+
+    public void testGetValidBlockSizeWhenBytesPerChecksumDoesNotDivideFileLength() {
+        long fileLength = 1048577;
+        long expectedBlockSize = (fileLength / bytesPerChecksum + 1) * bytesPerChecksum;
+        long validBlockSize = OozieSharelibCLI.BlockSizeCalculator.getValidBlockSize(fileLength, minBlockSize, bytesPerChecksum);
+        Assert.assertEquals("The block size should be the first greater value than the file size, dividable by bytes per checksum",
+                expectedBlockSize, validBlockSize);
+    }
+
+    public void testGetValidBlockSizeWhenBytesPerChecksumDivideFileLength() {
+        long fileLength = 1049088;
+        long validBlockSize = OozieSharelibCLI.BlockSizeCalculator.getValidBlockSize(fileLength, minBlockSize, bytesPerChecksum);
+        Assert.assertEquals("The block size should be equal with the file length", fileLength, validBlockSize);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/oozie/blob/6b89aba4/tools/src/test/java/org/apache/oozie/tools/TestConcurrentCopyFromLocal.java
----------------------------------------------------------------------
diff --git a/tools/src/test/java/org/apache/oozie/tools/TestConcurrentCopyFromLocal.java b/tools/src/test/java/org/apache/oozie/tools/TestConcurrentCopyFromLocal.java
new file mode 100644
index 0000000..d77eba6
--- /dev/null
+++ b/tools/src/test/java/org/apache/oozie/tools/TestConcurrentCopyFromLocal.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.tools;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.oozie.service.ConfigurationService;
+import org.apache.oozie.service.HadoopAccessorService;
+import org.apache.oozie.service.Services;
+import org.apache.oozie.service.WorkflowAppService;
+import org.apache.oozie.test.XTestCase;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.InputStream;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+
+public class TestConcurrentCopyFromLocal extends XTestCase {
+
+    private final String outPath = "outFolder";
+    private final TemporaryFolder tmpFolder = new TemporaryFolder();
+    private File libDirectory;
+    private Services services = null;
+    private Path dstPath = null;
+    private FileSystem fs;
+
+    @Override
+    protected void setUp() throws Exception {
+        super.setUp(false);
+        tmpFolder.create();
+        libDirectory = tmpFolder.newFolder("lib");
+        services = new Services();
+        services.get(ConfigurationService.class).getConf()
+                .set(Services.CONF_SERVICE_CLASSES,"org.apache.oozie.service.LiteWorkflowAppService,"
+                        + "org.apache.oozie.service.SchedulerService,"
+                        + "org.apache.oozie.service.HadoopAccessorService,"
+                        + "org.apache.oozie.service.ShareLibService");
+        services.init();
+
+        HadoopAccessorService has = services.get(HadoopAccessorService.class);
+        URI uri = new Path(outPath).toUri();
+        Configuration fsConf = has.createConfiguration(uri.getAuthority());
+        fs = has.createFileSystem(System.getProperty("user.name"), uri, fsConf);
+
+        WorkflowAppService lwas = services.get(WorkflowAppService.class);
+        dstPath = lwas.getSystemLibPath();
+    }
+
+    @Override
+    protected void tearDown() throws Exception {
+        tmpFolder.delete();
+        services.destroy();
+        super.tearDown();
+    }
+
+    public void testConcurrentCopyFromLocalSameFileNrAndThreadNr() throws Exception {
+        final int testFiles = 15;
+        final int  threadPoolSize = 15;
+        final long fsLimitsMinBlockSize = 1048576;
+        final long bytesPerChecksum = 512;
+        performAndCheckConcurrentCopy(testFiles, threadPoolSize, fsLimitsMinBlockSize, bytesPerChecksum);
+    }
+
+    public void testConcurrentCopyFromLocalMoreThreadsThanFiles() throws Exception {
+        final int testFiles = 15;
+        final int  threadPoolSize = 35;
+        final long fsLimitsMinBlockSize = 1048576;
+        final long bytesPerChecksum = 512;
+        performAndCheckConcurrentCopy(testFiles, threadPoolSize, fsLimitsMinBlockSize, bytesPerChecksum);
+    }
+
+    public void testConcurrentCopyFromLocalHighThreadNr() throws Exception {
+        final int testFiles = 200;
+        final int  threadPoolSize = 150;
+        final long fsLimitsMinBlockSize = 1048576;
+        final long bytesPerChecksum = 512;
+        performAndCheckConcurrentCopy(testFiles, threadPoolSize, fsLimitsMinBlockSize, bytesPerChecksum);
+    }
+
+    private void performAndCheckConcurrentCopy(final int testFiles, final int threadPoolSize, final long fsLimitsMinBlockSize,
+                                               final long bytesPerChecksum) throws Exception {
+        List<File> fileList = new ArrayList<>();
+
+        OozieSharelibFileOperations.generateAndWriteFiles(libDirectory, testFiles, fileList);
+        File srcFile = new File(libDirectory.getParentFile().getAbsolutePath());
+        OozieSharelibCLI.ConcurrentCopyFromLocal concurrentCopy = new OozieSharelibCLI
+                .ConcurrentCopyFromLocal(threadPoolSize, fsLimitsMinBlockSize, bytesPerChecksum);
+        concurrentCopy.concurrentCopyFromLocal(fs, srcFile, dstPath);
+
+        for (int i = 0; i < testFiles; i++) {
+            try (
+                    InputStream originalFileStream = new FileInputStream(fileList.get(i));
+                    InputStream copiedFileStream = fs.open(new Path(dstPath + File.separator + "lib",
+                            fileList.get(i).getName()))){
+
+                assertTrue("The content of the files must be equal", IOUtils.contentEquals(originalFileStream, copiedFileStream));
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/oozie/blob/6b89aba4/tools/src/test/java/org/apache/oozie/tools/TestCopyTaskCallable.java
----------------------------------------------------------------------
diff --git a/tools/src/test/java/org/apache/oozie/tools/TestCopyTaskCallable.java b/tools/src/test/java/org/apache/oozie/tools/TestCopyTaskCallable.java
new file mode 100644
index 0000000..bce0433
--- /dev/null
+++ b/tools/src/test/java/org/apache/oozie/tools/TestCopyTaskCallable.java
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.oozie.tools;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.oozie.service.HadoopAccessorService;
+import org.apache.oozie.service.Services;
+import org.apache.oozie.service.WorkflowAppService;
+import org.apache.oozie.test.XTestCase;
+import org.eclipse.jetty.util.ConcurrentHashSet;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.InputStream;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+
+public class TestCopyTaskCallable extends XTestCase {
+    private final String outPath =  "outFolder";
+    private final TemporaryFolder tmpFolder = new TemporaryFolder();
+    private File libDirectory;
+    private Services services = null;
+    private Path dstPath = null;
+    private FileSystem fs;
+
+    @Override
+    protected void setUp() throws Exception {
+        super.setUp(false);
+        tmpFolder.create();
+        libDirectory = tmpFolder.newFolder("lib");
+        services = new Services();
+        services.getConf()
+                .set(Services.CONF_SERVICE_CLASSES,"org.apache.oozie.service.LiteWorkflowAppService,"
+                        + "org.apache.oozie.service.SchedulerService,"
+                        + "org.apache.oozie.service.HadoopAccessorService,"
+                        + "org.apache.oozie.service.ShareLibService");
+        services.init();
+
+        HadoopAccessorService has = services.get(HadoopAccessorService.class);
+        URI uri = new Path(outPath).toUri();
+        Configuration fsConf = has.createConfiguration(uri.getAuthority());
+        fs = has.createFileSystem(System.getProperty("user.name"), uri, fsConf);
+
+        WorkflowAppService lwas = services.get(WorkflowAppService.class);
+        dstPath = lwas.getSystemLibPath();
+    }
+
+    @Override
+    protected void tearDown() throws Exception {
+        tmpFolder.delete();
+        services.destroy();
+        super.tearDown();
+    }
+
+    public void testCallCopyTaskSameFileNrAndThreadNr() throws Exception {
+        final long blockSize = 1048576;
+        final int testFiles = 150;
+        final int poolSize = 150;
+        performAndCheckCallCopyTask(blockSize, poolSize, testFiles);
+    }
+
+    public void testCallCopyTaskOneThread() throws Exception {
+        final long blockSize = 1048576;
+        final int testFiles = 15;
+        final int poolSize = 1;
+        performAndCheckCallCopyTask(blockSize, poolSize, testFiles);
+    }
+
+    public void testCallCopyTaskMoreFilesThanThreads() throws Exception {
+        final long blockSize = 1048576;
+        final int testFiles = 150;
+        final int poolSize = 10;
+        performAndCheckCallCopyTask(blockSize, poolSize, testFiles);
+    }
+
+    public void testCallCopyTaskMoreThreadsThanFiles() throws Exception {
+        final long blockSize = 1048576;
+        final int testFiles = 15;
+        final int poolSize = 20;
+        performAndCheckCallCopyTask(blockSize, poolSize, testFiles);
+    }
+
+    private void performAndCheckCallCopyTask(final long blockSize, final int poolSize, final int testFiles) throws Exception {
+        Set<OozieSharelibCLI.CopyTaskConfiguration> failedCopyTasks = new ConcurrentHashSet<>();
+
+        List<File> fileList = new ArrayList<>();
+        OozieSharelibFileOperations.generateAndWriteFiles(libDirectory, testFiles, fileList);
+
+        File srcFile = new File(libDirectory.getParentFile().getAbsolutePath());
+
+        OozieSharelibCLI.CopyTaskConfiguration copyTask =
+                new OozieSharelibCLI.CopyTaskConfiguration(fs, srcFile, dstPath);
+        List<Future<OozieSharelibCLI.CopyTaskConfiguration>> taskList = new ArrayList<>();
+
+        final ExecutorService threadPool = Executors.newFixedThreadPool(poolSize);
+        try {
+            for (final File file : libDirectory.listFiles()) {
+                final Path trgName = new Path(dstPath, file.getName());
+                taskList.add(threadPool
+                        .submit(new OozieSharelibCLI.CopyTaskCallable(copyTask, file, trgName, blockSize, failedCopyTasks)));
+            }
+            for (Future<OozieSharelibCLI.CopyTaskConfiguration> future : taskList) {
+                future.get();
+            }
+        } finally {
+            threadPool.shutdown();
+        }
+
+        for (int i = 0; i < testFiles; i++) {
+
+            try (
+                    InputStream originalFileStream = new FileInputStream(fileList.get(i));
+                    InputStream copiedFileStream = fs.open(new Path(dstPath, fileList.get(i).getName()))){
+                assertTrue("The content of the files must be equal", IOUtils.contentEquals(originalFileStream, copiedFileStream));
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/oozie/blob/6b89aba4/tools/src/test/java/org/apache/oozie/tools/TestOozieSharelibCLI.java
----------------------------------------------------------------------
diff --git a/tools/src/test/java/org/apache/oozie/tools/TestOozieSharelibCLI.java b/tools/src/test/java/org/apache/oozie/tools/TestOozieSharelibCLI.java
index f53d987..5929e5c 100644
--- a/tools/src/test/java/org/apache/oozie/tools/TestOozieSharelibCLI.java
+++ b/tools/src/test/java/org/apache/oozie/tools/TestOozieSharelibCLI.java
@@ -19,19 +19,12 @@
 
 package org.apache.oozie.tools;
 
-import java.io.ByteArrayOutputStream;
-import java.io.File;
-import java.io.FileWriter;
-import java.io.InputStream;
-import java.io.PrintStream;
-import java.io.Writer;
-import java.net.URI;
 import org.apache.commons.io.IOUtils;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.oozie.action.hadoop.security.LauncherSecurityManager;
+import org.apache.oozie.service.ConfigurationService;
 import org.apache.oozie.service.HadoopAccessorService;
 import org.apache.oozie.service.ServiceException;
 import org.apache.oozie.service.Services;
@@ -40,21 +33,29 @@ import org.apache.oozie.service.WorkflowAppService;
 import org.apache.oozie.test.XTestCase;
 import org.junit.rules.TemporaryFolder;
 
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.InputStream;
+import java.io.PrintStream;
+import java.net.URI;
+
 /**
  * Test OozieSharelibCLI
  */
 public class TestOozieSharelibCLI extends XTestCase {
-    private final String outPath = "outFolder";
+    public final String outPath = "outFolder";
+    private final TemporaryFolder tmpFolder = new TemporaryFolder();
+    private File libDirectory;
     private Services services = null;
     private Path dstPath = null;
     private FileSystem fs;
-    private final TemporaryFolder tmpFolder = new TemporaryFolder();
     private LauncherSecurityManager launcherSecurityManager;
     @Override
     protected void setUp() throws Exception {
         launcherSecurityManager = new LauncherSecurityManager();
         launcherSecurityManager.enable();
         tmpFolder.create();
+        libDirectory = tmpFolder.newFolder("lib");
         super.setUp(false);
 
     }
@@ -65,7 +66,6 @@ public class TestOozieSharelibCLI extends XTestCase {
         if (services != null) {
             services.destroy();
         }
-        tmpFolder.delete();
         super.tearDown();
     }
 
@@ -100,10 +100,8 @@ public class TestOozieSharelibCLI extends XTestCase {
      */
     public void testOozieSharelibCLICreate() throws Exception {
 
-        File libDirectory = tmpFolder.newFolder("lib");
-
-        writeFile(libDirectory, "file1", "test File");
-        writeFile(libDirectory, "file2", "test File2");
+        OozieSharelibFileOperations.writeFile(libDirectory, "file1", "test File");
+        OozieSharelibFileOperations.writeFile(libDirectory, "file2", "test File2");
 
         String[] argsCreate = { "create", "-fs", outPath, "-locallib", libDirectory.getParentFile().getAbsolutePath() };
         assertEquals(0, execOozieSharelibCLICommands(argsCreate));
@@ -127,10 +125,9 @@ public class TestOozieSharelibCLI extends XTestCase {
         final int testFiles = 7;
         final int concurrency = 5;
 
-        File libDirectory = tmpFolder.newFolder("lib");
-
         for (int i = 0; i < testFiles; i++) {
-            writeFile(libDirectory, generateFileName(i), generateFileContent(i));
+            OozieSharelibFileOperations.writeFile(libDirectory, OozieSharelibFileOperations.generateFileName(i),
+                    OozieSharelibFileOperations.generateFileContent(i));
         }
 
         String[] argsCreate = {"create", "-fs", outPath, "-locallib", libDirectory.getParentFile().getAbsolutePath(),
@@ -145,11 +142,11 @@ public class TestOozieSharelibCLI extends XTestCase {
         // test files in new folder
 
         for (int i = 0; i < testFiles; i++) {
-            String fileName = generateFileName(i);
-            String expectedFileContent = generateFileContent(i);
+            String fileName = OozieSharelibFileOperations.generateFileName(i);
+            String expectedFileContent = OozieSharelibFileOperations.generateFileContent(i);
             InputStream in = null;
             try {
-                in = fs.open(new Path(latestLibPath, fileName));
+                in = getTargetFileSysyem().open(new Path(latestLibPath, fileName));
                 String actualFileContent = IOUtils.toString(in);
                 assertEquals(fileName, expectedFileContent, actualFileContent);
             } finally {
@@ -193,7 +190,7 @@ public class TestOozieSharelibCLI extends XTestCase {
     private Services getServices() throws ServiceException {
         if (services == null) {
             services = new Services();
-            services.getConf()
+            services.get(ConfigurationService.class).getConf()
                     .set(Services.CONF_SERVICE_CLASSES,"org.apache.oozie.service.LiteWorkflowAppService,"
                             + "org.apache.oozie.service.SchedulerService,"
                             + "org.apache.oozie.service.HadoopAccessorService,"
@@ -211,15 +208,6 @@ public class TestOozieSharelibCLI extends XTestCase {
         return dstPath;
     }
 
-    private void writeFile(File folder, String filename, String content) throws Exception {
-        File file = new File(folder.getAbsolutePath() + File.separator + filename);
-        Writer writer = new FileWriter(file);
-        writer.write(content);
-        writer.flush();
-        writer.close();
-
-    }
-
     private int execOozieSharelibCLICommands(String[] args) throws Exception {
         try {
             OozieSharelibCLI.main(args);
@@ -236,12 +224,4 @@ public class TestOozieSharelibCLI extends XTestCase {
         }
         return 1;
     }
-
-    private static String generateFileName(int i) {
-        return "file_" + i;
-    }
-
-    private static String generateFileContent(int i) {
-        return "test File " + i;
-    }
 }