You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by st...@apache.org on 2018/01/08 15:14:13 UTC

[1/2] hadoop git commit: HADOOP-15086. NativeAzureFileSystem file rename is not atomic. Contributed by Thomas Marquardt (Backported to branch-2 via /HADOOP-15156)

Repository: hadoop
Updated Branches:
  refs/heads/branch-2 37a822410 -> d2ceef0f2
  refs/heads/branch-2.9 8e0a5b151 -> a35267b47


 HADOOP-15086. NativeAzureFileSystem file rename is not atomic.
 Contributed by Thomas Marquardt
(Backported to branch-2 via /HADOOP-15156)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/a35267b4
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/a35267b4
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/a35267b4

Branch: refs/heads/branch-2.9
Commit: a35267b47abf62d2ab32bc6860d2080284323042
Parents: 8e0a5b1
Author: Steve Loughran <st...@apache.org>
Authored: Mon Jan 8 15:12:41 2018 +0000
Committer: Steve Loughran <st...@apache.org>
Committed: Mon Jan 8 15:12:41 2018 +0000

----------------------------------------------------------------------
 .../fs/azure/AzureNativeFileSystemStore.java    |  16 ++-
 .../hadoop/fs/azure/NativeAzureFileSystem.java  |  25 +++--
 .../fs/azure/NativeAzureFileSystemHelper.java   |  18 +++
 .../hadoop/fs/azure/NativeFileSystemStore.java  |   4 +
 .../fs/azure/SecureStorageInterfaceImpl.java    |   8 +-
 .../hadoop/fs/azure/StorageInterface.java       |   2 +-
 .../hadoop/fs/azure/StorageInterfaceImpl.java   |   8 +-
 .../azure/ITestNativeAzureFileSystemLive.java   | 109 +++++++++++++++++++
 .../hadoop/fs/azure/MockStorageInterface.java   |   9 +-
 9 files changed, 182 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/a35267b4/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java
index 730373c..7111dcf 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java
@@ -2605,12 +2605,18 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
 
   @Override
   public void rename(String srcKey, String dstKey) throws IOException {
-    rename(srcKey, dstKey, false, null);
+    rename(srcKey, dstKey, false, null, true);
   }
 
   @Override
   public void rename(String srcKey, String dstKey, boolean acquireLease,
-      SelfRenewingLease existingLease) throws IOException {
+                     SelfRenewingLease existingLease) throws IOException {
+    rename(srcKey, dstKey, acquireLease, existingLease, true);
+  }
+
+    @Override
+  public void rename(String srcKey, String dstKey, boolean acquireLease,
+      SelfRenewingLease existingLease, boolean overwriteDestination) throws IOException {
 
     LOG.debug("Moving {} to {}", srcKey, dstKey);
 
@@ -2672,7 +2678,8 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
       // a more intensive exponential retry policy when the cluster is getting
       // throttled.
       try {
-        dstBlob.startCopyFromBlob(srcBlob, null, getInstrumentedContext());
+        dstBlob.startCopyFromBlob(srcBlob, null,
+            getInstrumentedContext(), overwriteDestination);
       } catch (StorageException se) {
         if (se.getHttpStatusCode() == HttpURLConnection.HTTP_UNAVAILABLE) {
           int copyBlobMinBackoff = sessionConfiguration.getInt(
@@ -2695,7 +2702,8 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
           options.setRetryPolicyFactory(new RetryExponentialRetry(
             copyBlobMinBackoff, copyBlobDeltaBackoff, copyBlobMaxBackoff,
             copyBlobMaxRetries));
-          dstBlob.startCopyFromBlob(srcBlob, options, getInstrumentedContext());
+          dstBlob.startCopyFromBlob(srcBlob, options,
+              getInstrumentedContext(), overwriteDestination);
         } else {
           throw se;
         }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a35267b4/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java
index d89a523..af42849 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java
@@ -3287,16 +3287,27 @@ public class NativeAzureFileSystem extends FileSystem {
     } else if (!srcMetadata.isDir()) {
       LOG.debug("Source {} found as a file, renaming.", src);
       try {
-        store.rename(srcKey, dstKey);
+        // HADOOP-15086 - file rename must ensure that the destination does
+        // not exist.  The fix is targeted to this call only to avoid
+        // regressions.  Other call sites are attempting to rename temporary
+        // files, redo a failed rename operation, or rename a directory
+        // recursively; for these cases the destination may exist.
+        store.rename(srcKey, dstKey, false, null,
+            false);
       } catch(IOException ex) {
-
         Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex);
 
-        if (innerException instanceof StorageException
-            && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
-
-          LOG.debug("BlobNotFoundException encountered. Failing rename", src);
-          return false;
+        if (innerException instanceof StorageException) {
+          if (NativeAzureFileSystemHelper.isFileNotFoundException(
+              (StorageException) innerException)) {
+            LOG.debug("BlobNotFoundException encountered. Failing rename", src);
+            return false;
+          }
+          if (NativeAzureFileSystemHelper.isBlobAlreadyExistsConflict(
+              (StorageException) innerException)) {
+            LOG.debug("Destination BlobAlreadyExists. Failing rename", src);
+            return false;
+          }
         }
 
         throw ex;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a35267b4/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystemHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystemHelper.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystemHelper.java
index 57af1f8..754f343 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystemHelper.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystemHelper.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.fs.azure;
 
 import java.io.EOFException;
 import java.io.IOException;
+import java.net.HttpURLConnection;
 import java.util.Map;
 
 import com.google.common.base.Preconditions;
@@ -96,6 +97,23 @@ final class NativeAzureFileSystemHelper {
   }
 
   /*
+   * Determines if a conditional request failed because the blob already
+   * exists.
+   *
+   * @param e - the storage exception thrown by the failed operation.
+   *
+   * @return true if a conditional request failed because the blob already
+   * exists; otherwise, returns false.
+   */
+  static boolean isBlobAlreadyExistsConflict(StorageException e) {
+    if (e.getHttpStatusCode() == HttpURLConnection.HTTP_CONFLICT
+        && StorageErrorCodeStrings.BLOB_ALREADY_EXISTS.equals(e.getErrorCode())) {
+      return true;
+    }
+    return false;
+  }
+
+  /*
    * Helper method that logs stack traces from all live threads.
    */
   public static void logAllLiveStackTraces() {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a35267b4/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeFileSystemStore.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeFileSystemStore.java
index 57a729d..b67ab1b 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeFileSystemStore.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeFileSystemStore.java
@@ -91,6 +91,10 @@ interface NativeFileSystemStore {
   void rename(String srcKey, String dstKey, boolean acquireLease, SelfRenewingLease existingLease)
       throws IOException;
 
+  void rename(String srcKey, String dstKey, boolean acquireLease,
+              SelfRenewingLease existingLease, boolean overwriteDestination)
+      throws IOException;
+
   /**
    * Delete all keys with the given prefix. Used for testing.
    *

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a35267b4/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SecureStorageInterfaceImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SecureStorageInterfaceImpl.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SecureStorageInterfaceImpl.java
index 7c2722e..0f54249 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SecureStorageInterfaceImpl.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SecureStorageInterfaceImpl.java
@@ -503,10 +503,14 @@ public class SecureStorageInterfaceImpl extends StorageInterface {
 
     @Override
     public void startCopyFromBlob(CloudBlobWrapper sourceBlob, BlobRequestOptions options,
-        OperationContext opContext)
+        OperationContext opContext, boolean overwriteDestination)
             throws StorageException, URISyntaxException {
+      AccessCondition dstAccessCondition =
+          overwriteDestination
+              ? null
+              : AccessCondition.generateIfNotExistsCondition();
       getBlob().startCopy(sourceBlob.getBlob().getQualifiedUri(),
-          null, null, options, opContext);
+          null, dstAccessCondition, options, opContext);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a35267b4/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterface.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterface.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterface.java
index e03d731..dbb3849 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterface.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterface.java
@@ -406,7 +406,7 @@ abstract class StorageInterface {
      *
      */
     public abstract void startCopyFromBlob(CloudBlobWrapper sourceBlob,
-        BlobRequestOptions options, OperationContext opContext)
+        BlobRequestOptions options, OperationContext opContext, boolean overwriteDestination)
         throws StorageException, URISyntaxException;
     
     /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a35267b4/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterfaceImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterfaceImpl.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterfaceImpl.java
index 41a4dbb..e600f9e 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterfaceImpl.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterfaceImpl.java
@@ -425,10 +425,14 @@ class StorageInterfaceImpl extends StorageInterface {
 
     @Override
     public void startCopyFromBlob(CloudBlobWrapper sourceBlob, BlobRequestOptions options,
-        OperationContext opContext)
+        OperationContext opContext, boolean overwriteDestination)
             throws StorageException, URISyntaxException {
+      AccessCondition dstAccessCondition =
+          overwriteDestination
+              ? null
+              : AccessCondition.generateIfNotExistsCondition();
       getBlob().startCopy(sourceBlob.getBlob().getQualifiedUri(),
-          null, null, options, opContext);
+          null, dstAccessCondition, options, opContext);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a35267b4/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestNativeAzureFileSystemLive.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestNativeAzureFileSystemLive.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestNativeAzureFileSystemLive.java
index f969968..702ad66 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestNativeAzureFileSystemLive.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestNativeAzureFileSystemLive.java
@@ -18,8 +18,17 @@
 
 package org.apache.hadoop.fs.azure;
 
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.contract.ContractTestUtils;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.Path;
@@ -40,6 +49,106 @@ public class ITestNativeAzureFileSystemLive extends
     return AzureBlobStorageTestAccount.create();
   }
 
+  /**
+   * Implements the thread start routine for the test
+   * testMultipleRenameFileOperationsToSameDestination.
+   */
+  private static class RenameThread implements Runnable {
+
+    private final FileSystem fs;
+    private final CountDownLatch latch;
+    private final int threadNumber;
+    private final Path src;
+    private final Path dst;
+    private final AtomicInteger successfulRenameCount;
+    private final AtomicReference<IOException> unexpectedError;
+
+    RenameThread(FileSystem fs,
+                 CountDownLatch latch,
+                 int threadNumber,
+                 Path src,
+                 Path dst,
+                 AtomicInteger successfulRenameCount,
+                 AtomicReference<IOException> unexpectedError) {
+      this.fs = fs;
+      this.latch = latch;
+      this.threadNumber = threadNumber;
+      this.src = src;
+      this.dst = dst;
+      this.successfulRenameCount = successfulRenameCount;
+      this.unexpectedError = unexpectedError;
+    }
+
+    @Override
+    public void run() {
+      try {
+        latch.await(Long.MAX_VALUE, TimeUnit.SECONDS);
+      } catch (InterruptedException e) {
+      }
+      try {
+        try (OutputStream output = fs.create(src)) {
+          output.write(("Source file number " + threadNumber).getBytes());
+        }
+
+        if (fs.rename(src, dst)) {
+          LOG.info("rename succeeded for thread " + threadNumber);
+          successfulRenameCount.incrementAndGet();
+        }
+      } catch (IOException e) {
+        unexpectedError.compareAndSet(null, e);
+        ContractTestUtils.fail("Exception unexpected", e);
+      }
+    }
+  }
+
+   /**
+   * Tests the rename file operation to ensure that when there are multiple
+   * attempts to rename a file to the same destination, only one rename
+   * operation is successful (HADOOP-15086).
+   */
+  @Test
+  public void testMultipleRenameFileOperationsToSameDestination()
+      throws IOException, InterruptedException {
+    final CountDownLatch latch = new CountDownLatch(1);
+    final AtomicInteger successfulRenameCount = new AtomicInteger(0);
+    final AtomicReference<IOException> unexpectedError = new AtomicReference<IOException>();
+    final Path dest = path("dest");
+
+    // Run 10 threads to rename multiple files to the same target path
+    List<Thread> threads = new ArrayList<>();
+
+    for (int i = 0; i < 10; i++) {
+      final int threadNumber = i;
+      Path src = path("test" + threadNumber);
+      threads.add(new Thread(new RenameThread(fs, latch, threadNumber, src, dest, successfulRenameCount, unexpectedError)));
+    }
+
+    // Start each thread
+    for (int i = 0; i < threads.size(); i++) {
+      threads.get(i).start();
+    }
+
+    // Wait for threads to start and wait on latch
+    Thread.sleep(2000);
+
+    // Now start to rename
+    latch.countDown();
+
+    // Wait for all threads to complete
+    for (int i = 0; i < threads.size(); i++) {
+      try {
+        threads.get(i).join();
+      } catch (InterruptedException e) {
+      }
+    }
+
+    if (unexpectedError.get() != null) {
+      throw unexpectedError.get();
+    }
+    assertEquals(1, successfulRenameCount.get());
+    LOG.info("Success, only one rename operation succeeded!");
+  }
+
   @Test
   public void testLazyRenamePendingCanOverwriteExistingFile()
     throws Exception {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a35267b4/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/MockStorageInterface.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/MockStorageInterface.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/MockStorageInterface.java
index e0ae7b4..d5f6437 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/MockStorageInterface.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/MockStorageInterface.java
@@ -425,7 +425,14 @@ public class MockStorageInterface extends StorageInterface {
 
     @Override
     public void startCopyFromBlob(CloudBlobWrapper sourceBlob, BlobRequestOptions options,
-        OperationContext opContext) throws StorageException, URISyntaxException {
+        OperationContext opContext, boolean overwriteDestination) throws StorageException, URISyntaxException {
+      if (!overwriteDestination && backingStore.exists(convertUriToDecodedString(uri))) {
+        throw new StorageException("BlobAlreadyExists",
+            "The blob already exists.",
+            HttpURLConnection.HTTP_CONFLICT,
+            null,
+            null);
+      }
       backingStore.copy(convertUriToDecodedString(sourceBlob.getUri()), convertUriToDecodedString(uri));
       //TODO: set the backingStore.properties.CopyState and
       //      update azureNativeFileSystemStore.waitForCopyToComplete


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[2/2] hadoop git commit: HADOOP-15086. NativeAzureFileSystem file rename is not atomic. Contributed by Thomas Marquardt (Backported to branch-2 via /HADOOP-15156)

Posted by st...@apache.org.
 HADOOP-15086. NativeAzureFileSystem file rename is not atomic.
 Contributed by Thomas Marquardt
(Backported to branch-2 via /HADOOP-15156)

(cherry picked from commit a35267b47abf62d2ab32bc6860d2080284323042)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/d2ceef0f
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/d2ceef0f
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/d2ceef0f

Branch: refs/heads/branch-2
Commit: d2ceef0f206b5c2cdcbf01ab91c50ee044addd5f
Parents: 37a8224
Author: Steve Loughran <st...@apache.org>
Authored: Mon Jan 8 15:13:50 2018 +0000
Committer: Steve Loughran <st...@apache.org>
Committed: Mon Jan 8 15:13:50 2018 +0000

----------------------------------------------------------------------
 .../fs/azure/AzureNativeFileSystemStore.java    |  16 ++-
 .../hadoop/fs/azure/NativeAzureFileSystem.java  |  25 +++--
 .../fs/azure/NativeAzureFileSystemHelper.java   |  18 +++
 .../hadoop/fs/azure/NativeFileSystemStore.java  |   4 +
 .../fs/azure/SecureStorageInterfaceImpl.java    |   8 +-
 .../hadoop/fs/azure/StorageInterface.java       |   2 +-
 .../hadoop/fs/azure/StorageInterfaceImpl.java   |   8 +-
 .../azure/ITestNativeAzureFileSystemLive.java   | 109 +++++++++++++++++++
 .../hadoop/fs/azure/MockStorageInterface.java   |   9 +-
 9 files changed, 182 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/d2ceef0f/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java
index 730373c..7111dcf 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java
@@ -2605,12 +2605,18 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
 
   @Override
   public void rename(String srcKey, String dstKey) throws IOException {
-    rename(srcKey, dstKey, false, null);
+    rename(srcKey, dstKey, false, null, true);
   }
 
   @Override
   public void rename(String srcKey, String dstKey, boolean acquireLease,
-      SelfRenewingLease existingLease) throws IOException {
+                     SelfRenewingLease existingLease) throws IOException {
+    rename(srcKey, dstKey, acquireLease, existingLease, true);
+  }
+
+    @Override
+  public void rename(String srcKey, String dstKey, boolean acquireLease,
+      SelfRenewingLease existingLease, boolean overwriteDestination) throws IOException {
 
     LOG.debug("Moving {} to {}", srcKey, dstKey);
 
@@ -2672,7 +2678,8 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
       // a more intensive exponential retry policy when the cluster is getting
       // throttled.
       try {
-        dstBlob.startCopyFromBlob(srcBlob, null, getInstrumentedContext());
+        dstBlob.startCopyFromBlob(srcBlob, null,
+            getInstrumentedContext(), overwriteDestination);
       } catch (StorageException se) {
         if (se.getHttpStatusCode() == HttpURLConnection.HTTP_UNAVAILABLE) {
           int copyBlobMinBackoff = sessionConfiguration.getInt(
@@ -2695,7 +2702,8 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
           options.setRetryPolicyFactory(new RetryExponentialRetry(
             copyBlobMinBackoff, copyBlobDeltaBackoff, copyBlobMaxBackoff,
             copyBlobMaxRetries));
-          dstBlob.startCopyFromBlob(srcBlob, options, getInstrumentedContext());
+          dstBlob.startCopyFromBlob(srcBlob, options,
+              getInstrumentedContext(), overwriteDestination);
         } else {
           throw se;
         }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d2ceef0f/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java
index d89a523..af42849 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java
@@ -3287,16 +3287,27 @@ public class NativeAzureFileSystem extends FileSystem {
     } else if (!srcMetadata.isDir()) {
       LOG.debug("Source {} found as a file, renaming.", src);
       try {
-        store.rename(srcKey, dstKey);
+        // HADOOP-15086 - file rename must ensure that the destination does
+        // not exist.  The fix is targeted to this call only to avoid
+        // regressions.  Other call sites are attempting to rename temporary
+        // files, redo a failed rename operation, or rename a directory
+        // recursively; for these cases the destination may exist.
+        store.rename(srcKey, dstKey, false, null,
+            false);
       } catch(IOException ex) {
-
         Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex);
 
-        if (innerException instanceof StorageException
-            && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
-
-          LOG.debug("BlobNotFoundException encountered. Failing rename", src);
-          return false;
+        if (innerException instanceof StorageException) {
+          if (NativeAzureFileSystemHelper.isFileNotFoundException(
+              (StorageException) innerException)) {
+            LOG.debug("BlobNotFoundException encountered. Failing rename", src);
+            return false;
+          }
+          if (NativeAzureFileSystemHelper.isBlobAlreadyExistsConflict(
+              (StorageException) innerException)) {
+            LOG.debug("Destination BlobAlreadyExists. Failing rename", src);
+            return false;
+          }
         }
 
         throw ex;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d2ceef0f/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystemHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystemHelper.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystemHelper.java
index 57af1f8..754f343 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystemHelper.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystemHelper.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.fs.azure;
 
 import java.io.EOFException;
 import java.io.IOException;
+import java.net.HttpURLConnection;
 import java.util.Map;
 
 import com.google.common.base.Preconditions;
@@ -96,6 +97,23 @@ final class NativeAzureFileSystemHelper {
   }
 
   /*
+   * Determines if a conditional request failed because the blob already
+   * exists.
+   *
+   * @param e - the storage exception thrown by the failed operation.
+   *
+   * @return true if a conditional request failed because the blob already
+   * exists; otherwise, returns false.
+   */
+  static boolean isBlobAlreadyExistsConflict(StorageException e) {
+    if (e.getHttpStatusCode() == HttpURLConnection.HTTP_CONFLICT
+        && StorageErrorCodeStrings.BLOB_ALREADY_EXISTS.equals(e.getErrorCode())) {
+      return true;
+    }
+    return false;
+  }
+
+  /*
    * Helper method that logs stack traces from all live threads.
    */
   public static void logAllLiveStackTraces() {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d2ceef0f/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeFileSystemStore.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeFileSystemStore.java
index 57a729d..b67ab1b 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeFileSystemStore.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeFileSystemStore.java
@@ -91,6 +91,10 @@ interface NativeFileSystemStore {
   void rename(String srcKey, String dstKey, boolean acquireLease, SelfRenewingLease existingLease)
       throws IOException;
 
+  void rename(String srcKey, String dstKey, boolean acquireLease,
+              SelfRenewingLease existingLease, boolean overwriteDestination)
+      throws IOException;
+
   /**
    * Delete all keys with the given prefix. Used for testing.
    *

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d2ceef0f/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SecureStorageInterfaceImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SecureStorageInterfaceImpl.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SecureStorageInterfaceImpl.java
index 7c2722e..0f54249 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SecureStorageInterfaceImpl.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SecureStorageInterfaceImpl.java
@@ -503,10 +503,14 @@ public class SecureStorageInterfaceImpl extends StorageInterface {
 
     @Override
     public void startCopyFromBlob(CloudBlobWrapper sourceBlob, BlobRequestOptions options,
-        OperationContext opContext)
+        OperationContext opContext, boolean overwriteDestination)
             throws StorageException, URISyntaxException {
+      AccessCondition dstAccessCondition =
+          overwriteDestination
+              ? null
+              : AccessCondition.generateIfNotExistsCondition();
       getBlob().startCopy(sourceBlob.getBlob().getQualifiedUri(),
-          null, null, options, opContext);
+          null, dstAccessCondition, options, opContext);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d2ceef0f/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterface.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterface.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterface.java
index e03d731..dbb3849 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterface.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterface.java
@@ -406,7 +406,7 @@ abstract class StorageInterface {
      *
      */
     public abstract void startCopyFromBlob(CloudBlobWrapper sourceBlob,
-        BlobRequestOptions options, OperationContext opContext)
+        BlobRequestOptions options, OperationContext opContext, boolean overwriteDestination)
         throws StorageException, URISyntaxException;
     
     /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d2ceef0f/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterfaceImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterfaceImpl.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterfaceImpl.java
index 41a4dbb..e600f9e 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterfaceImpl.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterfaceImpl.java
@@ -425,10 +425,14 @@ class StorageInterfaceImpl extends StorageInterface {
 
     @Override
     public void startCopyFromBlob(CloudBlobWrapper sourceBlob, BlobRequestOptions options,
-        OperationContext opContext)
+        OperationContext opContext, boolean overwriteDestination)
             throws StorageException, URISyntaxException {
+      AccessCondition dstAccessCondition =
+          overwriteDestination
+              ? null
+              : AccessCondition.generateIfNotExistsCondition();
       getBlob().startCopy(sourceBlob.getBlob().getQualifiedUri(),
-          null, null, options, opContext);
+          null, dstAccessCondition, options, opContext);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d2ceef0f/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestNativeAzureFileSystemLive.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestNativeAzureFileSystemLive.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestNativeAzureFileSystemLive.java
index f969968..702ad66 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestNativeAzureFileSystemLive.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestNativeAzureFileSystemLive.java
@@ -18,8 +18,17 @@
 
 package org.apache.hadoop.fs.azure;
 
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.contract.ContractTestUtils;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.Path;
@@ -40,6 +49,106 @@ public class ITestNativeAzureFileSystemLive extends
     return AzureBlobStorageTestAccount.create();
   }
 
+  /**
+   * Implements the thread start routine for the test
+   * testMultipleRenameFileOperationsToSameDestination.
+   */
+  private static class RenameThread implements Runnable {
+
+    private final FileSystem fs;
+    private final CountDownLatch latch;
+    private final int threadNumber;
+    private final Path src;
+    private final Path dst;
+    private final AtomicInteger successfulRenameCount;
+    private final AtomicReference<IOException> unexpectedError;
+
+    RenameThread(FileSystem fs,
+                 CountDownLatch latch,
+                 int threadNumber,
+                 Path src,
+                 Path dst,
+                 AtomicInteger successfulRenameCount,
+                 AtomicReference<IOException> unexpectedError) {
+      this.fs = fs;
+      this.latch = latch;
+      this.threadNumber = threadNumber;
+      this.src = src;
+      this.dst = dst;
+      this.successfulRenameCount = successfulRenameCount;
+      this.unexpectedError = unexpectedError;
+    }
+
+    @Override
+    public void run() {
+      try {
+        latch.await(Long.MAX_VALUE, TimeUnit.SECONDS);
+      } catch (InterruptedException e) {
+      }
+      try {
+        try (OutputStream output = fs.create(src)) {
+          output.write(("Source file number " + threadNumber).getBytes());
+        }
+
+        if (fs.rename(src, dst)) {
+          LOG.info("rename succeeded for thread " + threadNumber);
+          successfulRenameCount.incrementAndGet();
+        }
+      } catch (IOException e) {
+        unexpectedError.compareAndSet(null, e);
+        ContractTestUtils.fail("Exception unexpected", e);
+      }
+    }
+  }
+
+   /**
+   * Tests the rename file operation to ensure that when there are multiple
+   * attempts to rename a file to the same destination, only one rename
+   * operation is successful (HADOOP-15086).
+   */
+  @Test
+  public void testMultipleRenameFileOperationsToSameDestination()
+      throws IOException, InterruptedException {
+    final CountDownLatch latch = new CountDownLatch(1);
+    final AtomicInteger successfulRenameCount = new AtomicInteger(0);
+    final AtomicReference<IOException> unexpectedError = new AtomicReference<IOException>();
+    final Path dest = path("dest");
+
+    // Run 10 threads to rename multiple files to the same target path
+    List<Thread> threads = new ArrayList<>();
+
+    for (int i = 0; i < 10; i++) {
+      final int threadNumber = i;
+      Path src = path("test" + threadNumber);
+      threads.add(new Thread(new RenameThread(fs, latch, threadNumber, src, dest, successfulRenameCount, unexpectedError)));
+    }
+
+    // Start each thread
+    for (int i = 0; i < threads.size(); i++) {
+      threads.get(i).start();
+    }
+
+    // Wait for threads to start and wait on latch
+    Thread.sleep(2000);
+
+    // Now start to rename
+    latch.countDown();
+
+    // Wait for all threads to complete
+    for (int i = 0; i < threads.size(); i++) {
+      try {
+        threads.get(i).join();
+      } catch (InterruptedException e) {
+      }
+    }
+
+    if (unexpectedError.get() != null) {
+      throw unexpectedError.get();
+    }
+    assertEquals(1, successfulRenameCount.get());
+    LOG.info("Success, only one rename operation succeeded!");
+  }
+
   @Test
   public void testLazyRenamePendingCanOverwriteExistingFile()
     throws Exception {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d2ceef0f/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/MockStorageInterface.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/MockStorageInterface.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/MockStorageInterface.java
index e0ae7b4..d5f6437 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/MockStorageInterface.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/MockStorageInterface.java
@@ -425,7 +425,14 @@ public class MockStorageInterface extends StorageInterface {
 
     @Override
     public void startCopyFromBlob(CloudBlobWrapper sourceBlob, BlobRequestOptions options,
-        OperationContext opContext) throws StorageException, URISyntaxException {
+        OperationContext opContext, boolean overwriteDestination) throws StorageException, URISyntaxException {
+      if (!overwriteDestination && backingStore.exists(convertUriToDecodedString(uri))) {
+        throw new StorageException("BlobAlreadyExists",
+            "The blob already exists.",
+            HttpURLConnection.HTTP_CONFLICT,
+            null,
+            null);
+      }
       backingStore.copy(convertUriToDecodedString(sourceBlob.getUri()), convertUriToDecodedString(uri));
       //TODO: set the backingStore.properties.CopyState and
       //      update azureNativeFileSystemStore.waitForCopyToComplete


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org