You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by st...@apache.org on 2018/01/08 15:14:13 UTC
[1/2] hadoop git commit: HADOOP-15086. NativeAzureFileSystem file
rename is not atomic. Contributed by Thomas Marquardt (Backported to branch-2
via /HADOOP-15156)
Repository: hadoop
Updated Branches:
refs/heads/branch-2 37a822410 -> d2ceef0f2
refs/heads/branch-2.9 8e0a5b151 -> a35267b47
HADOOP-15086. NativeAzureFileSystem file rename is not atomic.
Contributed by Thomas Marquardt
(Backported to branch-2 via /HADOOP-15156)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/a35267b4
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/a35267b4
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/a35267b4
Branch: refs/heads/branch-2.9
Commit: a35267b47abf62d2ab32bc6860d2080284323042
Parents: 8e0a5b1
Author: Steve Loughran <st...@apache.org>
Authored: Mon Jan 8 15:12:41 2018 +0000
Committer: Steve Loughran <st...@apache.org>
Committed: Mon Jan 8 15:12:41 2018 +0000
----------------------------------------------------------------------
.../fs/azure/AzureNativeFileSystemStore.java | 16 ++-
.../hadoop/fs/azure/NativeAzureFileSystem.java | 25 +++--
.../fs/azure/NativeAzureFileSystemHelper.java | 18 +++
.../hadoop/fs/azure/NativeFileSystemStore.java | 4 +
.../fs/azure/SecureStorageInterfaceImpl.java | 8 +-
.../hadoop/fs/azure/StorageInterface.java | 2 +-
.../hadoop/fs/azure/StorageInterfaceImpl.java | 8 +-
.../azure/ITestNativeAzureFileSystemLive.java | 109 +++++++++++++++++++
.../hadoop/fs/azure/MockStorageInterface.java | 9 +-
9 files changed, 182 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a35267b4/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java
index 730373c..7111dcf 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java
@@ -2605,12 +2605,18 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
@Override
public void rename(String srcKey, String dstKey) throws IOException {
- rename(srcKey, dstKey, false, null);
+ rename(srcKey, dstKey, false, null, true);
}
@Override
public void rename(String srcKey, String dstKey, boolean acquireLease,
- SelfRenewingLease existingLease) throws IOException {
+ SelfRenewingLease existingLease) throws IOException {
+ rename(srcKey, dstKey, acquireLease, existingLease, true);
+ }
+
+ @Override
+ public void rename(String srcKey, String dstKey, boolean acquireLease,
+ SelfRenewingLease existingLease, boolean overwriteDestination) throws IOException {
LOG.debug("Moving {} to {}", srcKey, dstKey);
@@ -2672,7 +2678,8 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
// a more intensive exponential retry policy when the cluster is getting
// throttled.
try {
- dstBlob.startCopyFromBlob(srcBlob, null, getInstrumentedContext());
+ dstBlob.startCopyFromBlob(srcBlob, null,
+ getInstrumentedContext(), overwriteDestination);
} catch (StorageException se) {
if (se.getHttpStatusCode() == HttpURLConnection.HTTP_UNAVAILABLE) {
int copyBlobMinBackoff = sessionConfiguration.getInt(
@@ -2695,7 +2702,8 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
options.setRetryPolicyFactory(new RetryExponentialRetry(
copyBlobMinBackoff, copyBlobDeltaBackoff, copyBlobMaxBackoff,
copyBlobMaxRetries));
- dstBlob.startCopyFromBlob(srcBlob, options, getInstrumentedContext());
+ dstBlob.startCopyFromBlob(srcBlob, options,
+ getInstrumentedContext(), overwriteDestination);
} else {
throw se;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a35267b4/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java
index d89a523..af42849 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java
@@ -3287,16 +3287,27 @@ public class NativeAzureFileSystem extends FileSystem {
} else if (!srcMetadata.isDir()) {
LOG.debug("Source {} found as a file, renaming.", src);
try {
- store.rename(srcKey, dstKey);
+ // HADOOP-15086 - file rename must ensure that the destination does
+ // not exist. The fix is targeted to this call only to avoid
+ // regressions. Other call sites are attempting to rename temporary
+ // files, redo a failed rename operation, or rename a directory
+ // recursively; for these cases the destination may exist.
+ store.rename(srcKey, dstKey, false, null,
+ false);
} catch(IOException ex) {
-
Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex);
- if (innerException instanceof StorageException
- && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
-
- LOG.debug("BlobNotFoundException encountered. Failing rename", src);
- return false;
+ if (innerException instanceof StorageException) {
+ if (NativeAzureFileSystemHelper.isFileNotFoundException(
+ (StorageException) innerException)) {
+ LOG.debug("BlobNotFoundException encountered. Failing rename", src);
+ return false;
+ }
+ if (NativeAzureFileSystemHelper.isBlobAlreadyExistsConflict(
+ (StorageException) innerException)) {
+ LOG.debug("Destination BlobAlreadyExists. Failing rename", src);
+ return false;
+ }
}
throw ex;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a35267b4/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystemHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystemHelper.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystemHelper.java
index 57af1f8..754f343 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystemHelper.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystemHelper.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.fs.azure;
import java.io.EOFException;
import java.io.IOException;
+import java.net.HttpURLConnection;
import java.util.Map;
import com.google.common.base.Preconditions;
@@ -96,6 +97,23 @@ final class NativeAzureFileSystemHelper {
}
/*
+ * Determines if a conditional request failed because the blob already
+ * exists.
+ *
+ * @param e - the storage exception thrown by the failed operation.
+ *
+ * @return true if a conditional request failed because the blob already
+ * exists; otherwise, returns false.
+ */
+ static boolean isBlobAlreadyExistsConflict(StorageException e) {
+ if (e.getHttpStatusCode() == HttpURLConnection.HTTP_CONFLICT
+ && StorageErrorCodeStrings.BLOB_ALREADY_EXISTS.equals(e.getErrorCode())) {
+ return true;
+ }
+ return false;
+ }
+
+ /*
* Helper method that logs stack traces from all live threads.
*/
public static void logAllLiveStackTraces() {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a35267b4/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeFileSystemStore.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeFileSystemStore.java
index 57a729d..b67ab1b 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeFileSystemStore.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeFileSystemStore.java
@@ -91,6 +91,10 @@ interface NativeFileSystemStore {
void rename(String srcKey, String dstKey, boolean acquireLease, SelfRenewingLease existingLease)
throws IOException;
+ void rename(String srcKey, String dstKey, boolean acquireLease,
+ SelfRenewingLease existingLease, boolean overwriteDestination)
+ throws IOException;
+
/**
* Delete all keys with the given prefix. Used for testing.
*
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a35267b4/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SecureStorageInterfaceImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SecureStorageInterfaceImpl.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SecureStorageInterfaceImpl.java
index 7c2722e..0f54249 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SecureStorageInterfaceImpl.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SecureStorageInterfaceImpl.java
@@ -503,10 +503,14 @@ public class SecureStorageInterfaceImpl extends StorageInterface {
@Override
public void startCopyFromBlob(CloudBlobWrapper sourceBlob, BlobRequestOptions options,
- OperationContext opContext)
+ OperationContext opContext, boolean overwriteDestination)
throws StorageException, URISyntaxException {
+ AccessCondition dstAccessCondition =
+ overwriteDestination
+ ? null
+ : AccessCondition.generateIfNotExistsCondition();
getBlob().startCopy(sourceBlob.getBlob().getQualifiedUri(),
- null, null, options, opContext);
+ null, dstAccessCondition, options, opContext);
}
@Override
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a35267b4/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterface.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterface.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterface.java
index e03d731..dbb3849 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterface.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterface.java
@@ -406,7 +406,7 @@ abstract class StorageInterface {
*
*/
public abstract void startCopyFromBlob(CloudBlobWrapper sourceBlob,
- BlobRequestOptions options, OperationContext opContext)
+ BlobRequestOptions options, OperationContext opContext, boolean overwriteDestination)
throws StorageException, URISyntaxException;
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a35267b4/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterfaceImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterfaceImpl.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterfaceImpl.java
index 41a4dbb..e600f9e 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterfaceImpl.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterfaceImpl.java
@@ -425,10 +425,14 @@ class StorageInterfaceImpl extends StorageInterface {
@Override
public void startCopyFromBlob(CloudBlobWrapper sourceBlob, BlobRequestOptions options,
- OperationContext opContext)
+ OperationContext opContext, boolean overwriteDestination)
throws StorageException, URISyntaxException {
+ AccessCondition dstAccessCondition =
+ overwriteDestination
+ ? null
+ : AccessCondition.generateIfNotExistsCondition();
getBlob().startCopy(sourceBlob.getBlob().getQualifiedUri(),
- null, null, options, opContext);
+ null, dstAccessCondition, options, opContext);
}
@Override
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a35267b4/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestNativeAzureFileSystemLive.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestNativeAzureFileSystemLive.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestNativeAzureFileSystemLive.java
index f969968..702ad66 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestNativeAzureFileSystemLive.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestNativeAzureFileSystemLive.java
@@ -18,8 +18,17 @@
package org.apache.hadoop.fs.azure;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.List;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
@@ -40,6 +49,106 @@ public class ITestNativeAzureFileSystemLive extends
return AzureBlobStorageTestAccount.create();
}
+ /**
+ * Implements the thread start routine for the test
+ * testMultipleRenameFileOperationsToSameDestination.
+ */
+ private static class RenameThread implements Runnable {
+
+ private final FileSystem fs;
+ private final CountDownLatch latch;
+ private final int threadNumber;
+ private final Path src;
+ private final Path dst;
+ private final AtomicInteger successfulRenameCount;
+ private final AtomicReference<IOException> unexpectedError;
+
+ RenameThread(FileSystem fs,
+ CountDownLatch latch,
+ int threadNumber,
+ Path src,
+ Path dst,
+ AtomicInteger successfulRenameCount,
+ AtomicReference<IOException> unexpectedError) {
+ this.fs = fs;
+ this.latch = latch;
+ this.threadNumber = threadNumber;
+ this.src = src;
+ this.dst = dst;
+ this.successfulRenameCount = successfulRenameCount;
+ this.unexpectedError = unexpectedError;
+ }
+
+ @Override
+ public void run() {
+ try {
+ latch.await(Long.MAX_VALUE, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ }
+ try {
+ try (OutputStream output = fs.create(src)) {
+ output.write(("Source file number " + threadNumber).getBytes());
+ }
+
+ if (fs.rename(src, dst)) {
+ LOG.info("rename succeeded for thread " + threadNumber);
+ successfulRenameCount.incrementAndGet();
+ }
+ } catch (IOException e) {
+ unexpectedError.compareAndSet(null, e);
+ ContractTestUtils.fail("Exception unexpected", e);
+ }
+ }
+ }
+
+ /**
+ * Tests the rename file operation to ensure that when there are multiple
+ * attempts to rename a file to the same destination, only one rename
+ * operation is successful (HADOOP-15086).
+ */
+ @Test
+ public void testMultipleRenameFileOperationsToSameDestination()
+ throws IOException, InterruptedException {
+ final CountDownLatch latch = new CountDownLatch(1);
+ final AtomicInteger successfulRenameCount = new AtomicInteger(0);
+ final AtomicReference<IOException> unexpectedError = new AtomicReference<IOException>();
+ final Path dest = path("dest");
+
+ // Run 10 threads to rename multiple files to the same target path
+ List<Thread> threads = new ArrayList<>();
+
+ for (int i = 0; i < 10; i++) {
+ final int threadNumber = i;
+ Path src = path("test" + threadNumber);
+ threads.add(new Thread(new RenameThread(fs, latch, threadNumber, src, dest, successfulRenameCount, unexpectedError)));
+ }
+
+ // Start each thread
+ for (int i = 0; i < threads.size(); i++) {
+ threads.get(i).start();
+ }
+
+ // Wait for threads to start and wait on latch
+ Thread.sleep(2000);
+
+ // Now start to rename
+ latch.countDown();
+
+ // Wait for all threads to complete
+ for (int i = 0; i < threads.size(); i++) {
+ try {
+ threads.get(i).join();
+ } catch (InterruptedException e) {
+ }
+ }
+
+ if (unexpectedError.get() != null) {
+ throw unexpectedError.get();
+ }
+ assertEquals(1, successfulRenameCount.get());
+ LOG.info("Success, only one rename operation succeeded!");
+ }
+
@Test
public void testLazyRenamePendingCanOverwriteExistingFile()
throws Exception {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a35267b4/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/MockStorageInterface.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/MockStorageInterface.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/MockStorageInterface.java
index e0ae7b4..d5f6437 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/MockStorageInterface.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/MockStorageInterface.java
@@ -425,7 +425,14 @@ public class MockStorageInterface extends StorageInterface {
@Override
public void startCopyFromBlob(CloudBlobWrapper sourceBlob, BlobRequestOptions options,
- OperationContext opContext) throws StorageException, URISyntaxException {
+ OperationContext opContext, boolean overwriteDestination) throws StorageException, URISyntaxException {
+ if (!overwriteDestination && backingStore.exists(convertUriToDecodedString(uri))) {
+ throw new StorageException("BlobAlreadyExists",
+ "The blob already exists.",
+ HttpURLConnection.HTTP_CONFLICT,
+ null,
+ null);
+ }
backingStore.copy(convertUriToDecodedString(sourceBlob.getUri()), convertUriToDecodedString(uri));
//TODO: set the backingStore.properties.CopyState and
// update azureNativeFileSystemStore.waitForCopyToComplete
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org
[2/2] hadoop git commit: HADOOP-15086. NativeAzureFileSystem file
rename is not atomic. Contributed by Thomas Marquardt (Backported to branch-2
via /HADOOP-15156)
Posted by st...@apache.org.
HADOOP-15086. NativeAzureFileSystem file rename is not atomic.
Contributed by Thomas Marquardt
(Backported to branch-2 via /HADOOP-15156)
(cherry picked from commit a35267b47abf62d2ab32bc6860d2080284323042)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/d2ceef0f
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/d2ceef0f
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/d2ceef0f
Branch: refs/heads/branch-2
Commit: d2ceef0f206b5c2cdcbf01ab91c50ee044addd5f
Parents: 37a8224
Author: Steve Loughran <st...@apache.org>
Authored: Mon Jan 8 15:13:50 2018 +0000
Committer: Steve Loughran <st...@apache.org>
Committed: Mon Jan 8 15:13:50 2018 +0000
----------------------------------------------------------------------
.../fs/azure/AzureNativeFileSystemStore.java | 16 ++-
.../hadoop/fs/azure/NativeAzureFileSystem.java | 25 +++--
.../fs/azure/NativeAzureFileSystemHelper.java | 18 +++
.../hadoop/fs/azure/NativeFileSystemStore.java | 4 +
.../fs/azure/SecureStorageInterfaceImpl.java | 8 +-
.../hadoop/fs/azure/StorageInterface.java | 2 +-
.../hadoop/fs/azure/StorageInterfaceImpl.java | 8 +-
.../azure/ITestNativeAzureFileSystemLive.java | 109 +++++++++++++++++++
.../hadoop/fs/azure/MockStorageInterface.java | 9 +-
9 files changed, 182 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d2ceef0f/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java
index 730373c..7111dcf 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java
@@ -2605,12 +2605,18 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
@Override
public void rename(String srcKey, String dstKey) throws IOException {
- rename(srcKey, dstKey, false, null);
+ rename(srcKey, dstKey, false, null, true);
}
@Override
public void rename(String srcKey, String dstKey, boolean acquireLease,
- SelfRenewingLease existingLease) throws IOException {
+ SelfRenewingLease existingLease) throws IOException {
+ rename(srcKey, dstKey, acquireLease, existingLease, true);
+ }
+
+ @Override
+ public void rename(String srcKey, String dstKey, boolean acquireLease,
+ SelfRenewingLease existingLease, boolean overwriteDestination) throws IOException {
LOG.debug("Moving {} to {}", srcKey, dstKey);
@@ -2672,7 +2678,8 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
// a more intensive exponential retry policy when the cluster is getting
// throttled.
try {
- dstBlob.startCopyFromBlob(srcBlob, null, getInstrumentedContext());
+ dstBlob.startCopyFromBlob(srcBlob, null,
+ getInstrumentedContext(), overwriteDestination);
} catch (StorageException se) {
if (se.getHttpStatusCode() == HttpURLConnection.HTTP_UNAVAILABLE) {
int copyBlobMinBackoff = sessionConfiguration.getInt(
@@ -2695,7 +2702,8 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
options.setRetryPolicyFactory(new RetryExponentialRetry(
copyBlobMinBackoff, copyBlobDeltaBackoff, copyBlobMaxBackoff,
copyBlobMaxRetries));
- dstBlob.startCopyFromBlob(srcBlob, options, getInstrumentedContext());
+ dstBlob.startCopyFromBlob(srcBlob, options,
+ getInstrumentedContext(), overwriteDestination);
} else {
throw se;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d2ceef0f/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java
index d89a523..af42849 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java
@@ -3287,16 +3287,27 @@ public class NativeAzureFileSystem extends FileSystem {
} else if (!srcMetadata.isDir()) {
LOG.debug("Source {} found as a file, renaming.", src);
try {
- store.rename(srcKey, dstKey);
+ // HADOOP-15086 - file rename must ensure that the destination does
+ // not exist. The fix is targeted to this call only to avoid
+ // regressions. Other call sites are attempting to rename temporary
+ // files, redo a failed rename operation, or rename a directory
+ // recursively; for these cases the destination may exist.
+ store.rename(srcKey, dstKey, false, null,
+ false);
} catch(IOException ex) {
-
Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex);
- if (innerException instanceof StorageException
- && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
-
- LOG.debug("BlobNotFoundException encountered. Failing rename", src);
- return false;
+ if (innerException instanceof StorageException) {
+ if (NativeAzureFileSystemHelper.isFileNotFoundException(
+ (StorageException) innerException)) {
+ LOG.debug("BlobNotFoundException encountered. Failing rename", src);
+ return false;
+ }
+ if (NativeAzureFileSystemHelper.isBlobAlreadyExistsConflict(
+ (StorageException) innerException)) {
+ LOG.debug("Destination BlobAlreadyExists. Failing rename", src);
+ return false;
+ }
}
throw ex;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d2ceef0f/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystemHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystemHelper.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystemHelper.java
index 57af1f8..754f343 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystemHelper.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystemHelper.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.fs.azure;
import java.io.EOFException;
import java.io.IOException;
+import java.net.HttpURLConnection;
import java.util.Map;
import com.google.common.base.Preconditions;
@@ -96,6 +97,23 @@ final class NativeAzureFileSystemHelper {
}
/*
+ * Determines if a conditional request failed because the blob already
+ * exists.
+ *
+ * @param e - the storage exception thrown by the failed operation.
+ *
+ * @return true if a conditional request failed because the blob already
+ * exists; otherwise, returns false.
+ */
+ static boolean isBlobAlreadyExistsConflict(StorageException e) {
+ if (e.getHttpStatusCode() == HttpURLConnection.HTTP_CONFLICT
+ && StorageErrorCodeStrings.BLOB_ALREADY_EXISTS.equals(e.getErrorCode())) {
+ return true;
+ }
+ return false;
+ }
+
+ /*
* Helper method that logs stack traces from all live threads.
*/
public static void logAllLiveStackTraces() {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d2ceef0f/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeFileSystemStore.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeFileSystemStore.java
index 57a729d..b67ab1b 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeFileSystemStore.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeFileSystemStore.java
@@ -91,6 +91,10 @@ interface NativeFileSystemStore {
void rename(String srcKey, String dstKey, boolean acquireLease, SelfRenewingLease existingLease)
throws IOException;
+ void rename(String srcKey, String dstKey, boolean acquireLease,
+ SelfRenewingLease existingLease, boolean overwriteDestination)
+ throws IOException;
+
/**
* Delete all keys with the given prefix. Used for testing.
*
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d2ceef0f/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SecureStorageInterfaceImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SecureStorageInterfaceImpl.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SecureStorageInterfaceImpl.java
index 7c2722e..0f54249 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SecureStorageInterfaceImpl.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/SecureStorageInterfaceImpl.java
@@ -503,10 +503,14 @@ public class SecureStorageInterfaceImpl extends StorageInterface {
@Override
public void startCopyFromBlob(CloudBlobWrapper sourceBlob, BlobRequestOptions options,
- OperationContext opContext)
+ OperationContext opContext, boolean overwriteDestination)
throws StorageException, URISyntaxException {
+ AccessCondition dstAccessCondition =
+ overwriteDestination
+ ? null
+ : AccessCondition.generateIfNotExistsCondition();
getBlob().startCopy(sourceBlob.getBlob().getQualifiedUri(),
- null, null, options, opContext);
+ null, dstAccessCondition, options, opContext);
}
@Override
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d2ceef0f/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterface.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterface.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterface.java
index e03d731..dbb3849 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterface.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterface.java
@@ -406,7 +406,7 @@ abstract class StorageInterface {
*
*/
public abstract void startCopyFromBlob(CloudBlobWrapper sourceBlob,
- BlobRequestOptions options, OperationContext opContext)
+ BlobRequestOptions options, OperationContext opContext, boolean overwriteDestination)
throws StorageException, URISyntaxException;
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d2ceef0f/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterfaceImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterfaceImpl.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterfaceImpl.java
index 41a4dbb..e600f9e 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterfaceImpl.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterfaceImpl.java
@@ -425,10 +425,14 @@ class StorageInterfaceImpl extends StorageInterface {
@Override
public void startCopyFromBlob(CloudBlobWrapper sourceBlob, BlobRequestOptions options,
- OperationContext opContext)
+ OperationContext opContext, boolean overwriteDestination)
throws StorageException, URISyntaxException {
+ AccessCondition dstAccessCondition =
+ overwriteDestination
+ ? null
+ : AccessCondition.generateIfNotExistsCondition();
getBlob().startCopy(sourceBlob.getBlob().getQualifiedUri(),
- null, null, options, opContext);
+ null, dstAccessCondition, options, opContext);
}
@Override
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d2ceef0f/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestNativeAzureFileSystemLive.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestNativeAzureFileSystemLive.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestNativeAzureFileSystemLive.java
index f969968..702ad66 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestNativeAzureFileSystemLive.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestNativeAzureFileSystemLive.java
@@ -18,8 +18,17 @@
package org.apache.hadoop.fs.azure;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.List;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
@@ -40,6 +49,106 @@ public class ITestNativeAzureFileSystemLive extends
return AzureBlobStorageTestAccount.create();
}
+ /**
+ * Implements the thread start routine for the test
+ * testMultipleRenameFileOperationsToSameDestination.
+ */
+ private static class RenameThread implements Runnable {
+
+ private final FileSystem fs;
+ private final CountDownLatch latch;
+ private final int threadNumber;
+ private final Path src;
+ private final Path dst;
+ private final AtomicInteger successfulRenameCount;
+ private final AtomicReference<IOException> unexpectedError;
+
+ RenameThread(FileSystem fs,
+ CountDownLatch latch,
+ int threadNumber,
+ Path src,
+ Path dst,
+ AtomicInteger successfulRenameCount,
+ AtomicReference<IOException> unexpectedError) {
+ this.fs = fs;
+ this.latch = latch;
+ this.threadNumber = threadNumber;
+ this.src = src;
+ this.dst = dst;
+ this.successfulRenameCount = successfulRenameCount;
+ this.unexpectedError = unexpectedError;
+ }
+
+ @Override
+ public void run() {
+ try {
+ latch.await(Long.MAX_VALUE, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ }
+ try {
+ try (OutputStream output = fs.create(src)) {
+ output.write(("Source file number " + threadNumber).getBytes());
+ }
+
+ if (fs.rename(src, dst)) {
+ LOG.info("rename succeeded for thread " + threadNumber);
+ successfulRenameCount.incrementAndGet();
+ }
+ } catch (IOException e) {
+ unexpectedError.compareAndSet(null, e);
+ ContractTestUtils.fail("Exception unexpected", e);
+ }
+ }
+ }
+
+ /**
+ * Tests the rename file operation to ensure that when there are multiple
+ * attempts to rename a file to the same destination, only one rename
+ * operation is successful (HADOOP-15086).
+ */
+ @Test
+ public void testMultipleRenameFileOperationsToSameDestination()
+ throws IOException, InterruptedException {
+ final CountDownLatch latch = new CountDownLatch(1);
+ final AtomicInteger successfulRenameCount = new AtomicInteger(0);
+ final AtomicReference<IOException> unexpectedError = new AtomicReference<IOException>();
+ final Path dest = path("dest");
+
+ // Run 10 threads to rename multiple files to the same target path
+ List<Thread> threads = new ArrayList<>();
+
+ for (int i = 0; i < 10; i++) {
+ final int threadNumber = i;
+ Path src = path("test" + threadNumber);
+ threads.add(new Thread(new RenameThread(fs, latch, threadNumber, src, dest, successfulRenameCount, unexpectedError)));
+ }
+
+ // Start each thread
+ for (int i = 0; i < threads.size(); i++) {
+ threads.get(i).start();
+ }
+
+ // Wait for threads to start and wait on latch
+ Thread.sleep(2000);
+
+ // Now start to rename
+ latch.countDown();
+
+ // Wait for all threads to complete
+ for (int i = 0; i < threads.size(); i++) {
+ try {
+ threads.get(i).join();
+ } catch (InterruptedException e) {
+ }
+ }
+
+ if (unexpectedError.get() != null) {
+ throw unexpectedError.get();
+ }
+ assertEquals(1, successfulRenameCount.get());
+ LOG.info("Success, only one rename operation succeeded!");
+ }
+
@Test
public void testLazyRenamePendingCanOverwriteExistingFile()
throws Exception {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d2ceef0f/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/MockStorageInterface.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/MockStorageInterface.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/MockStorageInterface.java
index e0ae7b4..d5f6437 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/MockStorageInterface.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/MockStorageInterface.java
@@ -425,7 +425,14 @@ public class MockStorageInterface extends StorageInterface {
@Override
public void startCopyFromBlob(CloudBlobWrapper sourceBlob, BlobRequestOptions options,
- OperationContext opContext) throws StorageException, URISyntaxException {
+ OperationContext opContext, boolean overwriteDestination) throws StorageException, URISyntaxException {
+ if (!overwriteDestination && backingStore.exists(convertUriToDecodedString(uri))) {
+ throw new StorageException("BlobAlreadyExists",
+ "The blob already exists.",
+ HttpURLConnection.HTTP_CONFLICT,
+ null,
+ null);
+ }
backingStore.copy(convertUriToDecodedString(sourceBlob.getUri()), convertUriToDecodedString(uri));
//TODO: set the backingStore.properties.CopyState and
// update azureNativeFileSystemStore.waitForCopyToComplete
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org