You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by bi...@apache.org on 2021/04/12 23:48:29 UTC
[hadoop] branch trunk updated: HADOOP-16948. Support infinite lease
dirs. (#1925)
This is an automated email from the ASF dual-hosted git repository.
billie pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new c1fde4f HADOOP-16948. Support infinite lease dirs. (#1925)
c1fde4f is described below
commit c1fde4fe94f268c6d5515b421ac47345dca8163d
Author: billierinaldi <bi...@apache.org>
AuthorDate: Mon Apr 12 19:47:59 2021 -0400
HADOOP-16948. Support infinite lease dirs. (#1925)
* HADOOP-16948. Support single writer dirs.
* HADOOP-16948. Fix findbugs and checkstyle problems.
* HADOOP-16948. Fix remaining checkstyle problems.
* HADOOP-16948. Add DurationInfo, retry policy for acquiring lease, and javadocs
* HADOOP-16948. Convert ABFS client to use an executor for lease ops
* HADOOP-16948. Fix ABFS lease test for non-HNS
* HADOOP-16948. Fix checkstyle and javadoc
* HADOOP-16948. Address review comments
* HADOOP-16948. Use daemon threads for ABFS lease ops
* HADOOP-16948. Make lease duration configurable
* HADOOP-16948. Add error messages to test assertions
* HADOOP-16948. Remove extra isSingleWriterKey call
* HADOOP-16948. Use only infinite lease duration due to cost of renewal ops
* HADOOP-16948. Remove acquire/renew/release lease methods
* HADOOP-16948. Rename single writer dirs to infinite lease dirs
* HADOOP-16948. Fix checkstyle
* HADOOP-16948. Wait for acquire lease future
* HADOOP-16948. Add unit test for acquire lease failure
---
.../hadoop/fs/azurebfs/AbfsConfiguration.java | 35 +++
.../hadoop/fs/azurebfs/AzureBlobFileSystem.java | 21 ++
.../fs/azurebfs/AzureBlobFileSystemStore.java | 90 +++++-
.../fs/azurebfs/constants/AbfsHttpConstants.java | 5 +
.../fs/azurebfs/constants/ConfigurationKeys.java | 9 +
.../constants/FileSystemConfigurations.java | 7 +
.../constants/HttpHeaderConfigurations.java | 5 +
.../ConfigurationValidationAnnotations.java | 16 +
.../exceptions/AzureBlobFileSystemException.java | 4 +
.../services/AppendRequestParameters.java | 8 +-
.../IntegerConfigurationBasicValidator.java | 13 +-
.../hadoop/fs/azurebfs/services/AbfsClient.java | 131 +++++++-
.../hadoop/fs/azurebfs/services/AbfsErrors.java | 53 ++++
.../hadoop/fs/azurebfs/services/AbfsLease.java | 188 ++++++++++++
.../fs/azurebfs/services/AbfsOutputStream.java | 37 ++-
.../azurebfs/services/AbfsOutputStreamContext.java | 18 ++
.../fs/azurebfs/services/AbfsRestOperation.java | 1 +
.../azurebfs/services/AbfsRestOperationType.java | 3 +-
.../hadoop-azure/src/site/markdown/abfs.md | 16 +
.../fs/azurebfs/ITestAzureBlobFileSystemLease.java | 336 +++++++++++++++++++++
.../diagnostics/TestConfigurationValidators.java | 29 +-
.../fs/azurebfs/services/TestAbfsOutputStream.java | 49 +--
22 files changed, 1032 insertions(+), 42 deletions(-)
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
index f36cc7d..0a8224a 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants;
import org.apache.hadoop.fs.azurebfs.constants.AuthConfigurations;
import org.apache.hadoop.fs.azurebfs.contracts.annotations.ConfigurationValidationAnnotations.IntegerConfigurationValidatorAnnotation;
+import org.apache.hadoop.fs.azurebfs.contracts.annotations.ConfigurationValidationAnnotations.IntegerWithOutlierConfigurationValidatorAnnotation;
import org.apache.hadoop.fs.azurebfs.contracts.annotations.ConfigurationValidationAnnotations.LongConfigurationValidatorAnnotation;
import org.apache.hadoop.fs.azurebfs.contracts.annotations.ConfigurationValidationAnnotations.StringConfigurationValidatorAnnotation;
import org.apache.hadoop.fs.azurebfs.contracts.annotations.ConfigurationValidationAnnotations.Base64StringConfigurationValidatorAnnotation;
@@ -208,6 +209,15 @@ public class AbfsConfiguration{
DefaultValue = DEFAULT_FS_AZURE_APPEND_BLOB_DIRECTORIES)
private String azureAppendBlobDirs;
+ @StringConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_INFINITE_LEASE_KEY,
+ DefaultValue = DEFAULT_FS_AZURE_INFINITE_LEASE_DIRECTORIES)
+ private String azureInfiniteLeaseDirs;
+
+ @IntegerConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_LEASE_THREADS,
+ MinValue = MIN_LEASE_THREADS,
+ DefaultValue = DEFAULT_LEASE_THREADS)
+ private int numLeaseThreads;
+
@BooleanConfigurationValidatorAnnotation(ConfigurationKey = AZURE_CREATE_REMOTE_FILESYSTEM_DURING_INITIALIZATION,
DefaultValue = DEFAULT_AZURE_CREATE_REMOTE_FILESYSTEM_DURING_INITIALIZATION)
private boolean createRemoteFileSystemDuringInitialization;
@@ -296,6 +306,8 @@ public class AbfsConfiguration{
field.setAccessible(true);
if (field.isAnnotationPresent(IntegerConfigurationValidatorAnnotation.class)) {
field.set(this, validateInt(field));
+ } else if (field.isAnnotationPresent(IntegerWithOutlierConfigurationValidatorAnnotation.class)) {
+ field.set(this, validateIntWithOutlier(field));
} else if (field.isAnnotationPresent(LongConfigurationValidatorAnnotation.class)) {
field.set(this, validateLong(field));
} else if (field.isAnnotationPresent(StringConfigurationValidatorAnnotation.class)) {
@@ -634,6 +646,14 @@ public class AbfsConfiguration{
return this.azureAppendBlobDirs;
}
+ public String getAzureInfiniteLeaseDirs() {
+ return this.azureInfiniteLeaseDirs;
+ }
+
+ public int getNumLeaseThreads() {
+ return this.numLeaseThreads;
+ }
+
public boolean getCreateRemoteFileSystemDuringInitialization() {
// we do not support creating the filesystem when AuthType is SAS
return this.createRemoteFileSystemDuringInitialization
@@ -843,6 +863,21 @@ public class AbfsConfiguration{
validator.ThrowIfInvalid()).validate(value);
}
+ int validateIntWithOutlier(Field field) throws IllegalAccessException, InvalidConfigurationValueException {
+ IntegerWithOutlierConfigurationValidatorAnnotation validator =
+ field.getAnnotation(IntegerWithOutlierConfigurationValidatorAnnotation.class);
+ String value = get(validator.ConfigurationKey());
+
+ // validate
+ return new IntegerConfigurationBasicValidator(
+ validator.OutlierValue(),
+ validator.MinValue(),
+ validator.MaxValue(),
+ validator.DefaultValue(),
+ validator.ConfigurationKey(),
+ validator.ThrowIfInvalid()).validate(value);
+ }
+
long validateLong(Field field) throws IllegalAccessException, InvalidConfigurationValueException {
LongConfigurationValidatorAnnotation validator = field.getAnnotation(LongConfigurationValidatorAnnotation.class);
String value = rawConfig.get(validator.ConfigurationKey());
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java
index d8a2ed7..30108ed 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java
@@ -87,6 +87,7 @@ import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.functional.RemoteIterators;
+import org.apache.hadoop.util.DurationInfo;
import org.apache.hadoop.util.LambdaUtils;
import org.apache.hadoop.util.Progressable;
@@ -506,6 +507,26 @@ public class AzureBlobFileSystem extends FileSystem {
}
/**
+ * Break the current lease on an ABFS file if it exists. A lease that is broken cannot be
+ * renewed. A new lease may be obtained on the file immediately.
+ *
+ * @param f file name
+ * @throws IOException on any exception while breaking the lease
+ */
+ public void breakLease(final Path f) throws IOException {
+ LOG.debug("AzureBlobFileSystem.breakLease path: {}", f);
+
+ Path qualifiedPath = makeQualified(f);
+
+ try (DurationInfo ignored = new DurationInfo(LOG, false, "Break lease for %s",
+ qualifiedPath)) {
+ abfsStore.breakLease(qualifiedPath);
+ } catch(AzureBlobFileSystemException ex) {
+ checkException(f, ex);
+ }
+ }
+
+ /**
* Qualify a path to one which uses this FileSystem and, if relative,
* made absolute.
* @param path to qualify.
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
index 75419c2..fa7e12b 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
@@ -39,6 +39,7 @@ import java.text.SimpleDateFormat;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
@@ -48,10 +49,14 @@ import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
+import java.util.WeakHashMap;
+import java.util.concurrent.ExecutionException;
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
import org.apache.hadoop.thirdparty.com.google.common.base.Strings;
+import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.Futures;
+import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListenableFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -100,6 +105,7 @@ import org.apache.hadoop.fs.azurebfs.services.AbfsPermission;
import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation;
import org.apache.hadoop.fs.azurebfs.services.AuthType;
import org.apache.hadoop.fs.azurebfs.services.ExponentialRetryPolicy;
+import org.apache.hadoop.fs.azurebfs.services.AbfsLease;
import org.apache.hadoop.fs.azurebfs.services.SharedKeyCredentials;
import org.apache.hadoop.fs.azurebfs.services.AbfsPerfTracker;
import org.apache.hadoop.fs.azurebfs.services.AbfsPerfInfo;
@@ -145,8 +151,11 @@ public class AzureBlobFileSystemStore implements Closeable, ListingSupport {
private static final String XMS_PROPERTIES_ENCODING = "ISO-8859-1";
private static final int GET_SET_AGGREGATE_COUNT = 2;
+ private final Map<AbfsLease, Object> leaseRefs;
+
private final AbfsConfiguration abfsConfiguration;
private final Set<String> azureAtomicRenameDirSet;
+ private Set<String> azureInfiniteLeaseDirSet;
private Trilean isNamespaceEnabled;
private final AuthType authType;
private final UserGroupInformation userGroupInformation;
@@ -167,6 +176,8 @@ public class AzureBlobFileSystemStore implements Closeable, ListingSupport {
final String fileSystemName = authorityParts[0];
final String accountName = authorityParts[1];
+ leaseRefs = Collections.synchronizedMap(new WeakHashMap<>());
+
try {
this.abfsConfiguration = new AbfsConfiguration(configuration, accountName);
} catch (IllegalAccessException exception) {
@@ -195,6 +206,7 @@ public class AzureBlobFileSystemStore implements Closeable, ListingSupport {
this.azureAtomicRenameDirSet = new HashSet<>(Arrays.asList(
abfsConfiguration.getAzureAtomicRenameDirs().split(AbfsHttpConstants.COMMA)));
+ updateInfiniteLeaseDirs();
this.authType = abfsConfiguration.getAuthType(accountName);
boolean usingOauth = (authType == AuthType.OAuth);
boolean useHttps = (usingOauth || abfsConfiguration.isHttpsAlwaysUsed()) ? true : isSecureScheme;
@@ -246,7 +258,24 @@ public class AzureBlobFileSystemStore implements Closeable, ListingSupport {
@Override
public void close() throws IOException {
- IOUtils.cleanupWithLogger(LOG, client);
+ List<ListenableFuture<?>> futures = new ArrayList<>();
+ for (AbfsLease lease : leaseRefs.keySet()) {
+ if (lease == null) {
+ continue;
+ }
+ ListenableFuture<?> future = client.submit(() -> lease.free());
+ futures.add(future);
+ }
+ try {
+ Futures.allAsList(futures).get();
+ } catch (InterruptedException e) {
+ LOG.error("Interrupted freeing leases", e);
+ Thread.currentThread().interrupt();
+ } catch (ExecutionException e) {
+ LOG.error("Error freeing leases", e);
+ } finally {
+ IOUtils.cleanupWithLogger(LOG, client);
+ }
}
byte[] encodeAttribute(String value) throws UnsupportedEncodingException {
@@ -496,12 +525,14 @@ public class AzureBlobFileSystemStore implements Closeable, ListingSupport {
}
perfInfo.registerResult(op.getResult()).registerSuccess(true);
+ AbfsLease lease = maybeCreateLease(relativePath);
+
return new AbfsOutputStream(
client,
statistics,
relativePath,
0,
- populateAbfsOutputStreamContext(isAppendBlob));
+ populateAbfsOutputStreamContext(isAppendBlob, lease));
}
}
@@ -573,7 +604,8 @@ public class AzureBlobFileSystemStore implements Closeable, ListingSupport {
return op;
}
- private AbfsOutputStreamContext populateAbfsOutputStreamContext(boolean isAppendBlob) {
+ private AbfsOutputStreamContext populateAbfsOutputStreamContext(boolean isAppendBlob,
+ AbfsLease lease) {
int bufferSize = abfsConfiguration.getWriteBufferSize();
if (isAppendBlob && bufferSize > FileSystemConfigurations.APPENDBLOB_MAX_WRITE_BUFFER_SIZE) {
bufferSize = FileSystemConfigurations.APPENDBLOB_MAX_WRITE_BUFFER_SIZE;
@@ -587,6 +619,7 @@ public class AzureBlobFileSystemStore implements Closeable, ListingSupport {
.withAppendBlob(isAppendBlob)
.withWriteMaxConcurrentRequestCount(abfsConfiguration.getWriteMaxConcurrentRequestCount())
.withMaxWriteRequestsToQueue(abfsConfiguration.getMaxWriteRequestsToQueue())
+ .withLease(lease)
.build();
}
@@ -705,15 +738,29 @@ public class AzureBlobFileSystemStore implements Closeable, ListingSupport {
isAppendBlob = true;
}
+ AbfsLease lease = maybeCreateLease(relativePath);
+
return new AbfsOutputStream(
client,
statistics,
relativePath,
offset,
- populateAbfsOutputStreamContext(isAppendBlob));
+ populateAbfsOutputStreamContext(isAppendBlob, lease));
}
}
+ /**
+ * Break any current lease on an ABFS file.
+ *
+ * @param path file name
+ * @throws AzureBlobFileSystemException on any exception while breaking the lease
+ */
+ public void breakLease(final Path path) throws AzureBlobFileSystemException {
+ LOG.debug("lease path: {}", path);
+
+ client.breakLease(getRelativePath(path));
+ }
+
public void rename(final Path source, final Path destination) throws
AzureBlobFileSystemException {
final Instant startAggregate = abfsPerfTracker.getLatencyInstant();
@@ -1347,6 +1394,13 @@ public class AzureBlobFileSystemStore implements Closeable, ListingSupport {
return isKeyForDirectorySet(key, azureAtomicRenameDirSet);
}
+ public boolean isInfiniteLeaseKey(String key) {
+ if (azureInfiniteLeaseDirSet.isEmpty()) {
+ return false;
+ }
+ return isKeyForDirectorySet(key, azureInfiniteLeaseDirSet);
+ }
+
/**
* A on-off operation to initialize AbfsClient for AzureBlobFileSystem
* Operations.
@@ -1636,4 +1690,32 @@ public class AzureBlobFileSystemStore implements Closeable, ListingSupport {
this.isNamespaceEnabled = isNamespaceEnabled;
}
+ private void updateInfiniteLeaseDirs() {
+ this.azureInfiniteLeaseDirSet = new HashSet<>(Arrays.asList(
+ abfsConfiguration.getAzureInfiniteLeaseDirs().split(AbfsHttpConstants.COMMA)));
+ // remove the empty string, since isKeyForDirectory returns true for empty strings
+ // and we don't want to default to enabling infinite lease dirs
+ this.azureInfiniteLeaseDirSet.remove("");
+ }
+
+ private AbfsLease maybeCreateLease(String relativePath)
+ throws AzureBlobFileSystemException {
+ boolean enableInfiniteLease = isInfiniteLeaseKey(relativePath);
+ if (!enableInfiniteLease) {
+ return null;
+ }
+ AbfsLease lease = new AbfsLease(client, relativePath);
+ leaseRefs.put(lease, null);
+ return lease;
+ }
+
+ @VisibleForTesting
+ boolean areLeasesFreed() {
+ for (AbfsLease lease : leaseRefs.keySet()) {
+ if (lease != null && !lease.isFreed()) {
+ return false;
+ }
+ }
+ return true;
+ }
}
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java
index 184657e..5cf7ec5 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java
@@ -39,6 +39,11 @@ public final class AbfsHttpConstants {
public static final String GET_ACCESS_CONTROL = "getAccessControl";
public static final String CHECK_ACCESS = "checkAccess";
public static final String GET_STATUS = "getStatus";
+ public static final String ACQUIRE_LEASE_ACTION = "acquire";
+ public static final String BREAK_LEASE_ACTION = "break";
+ public static final String RELEASE_LEASE_ACTION = "release";
+ public static final String RENEW_LEASE_ACTION = "renew";
+ public static final String DEFAULT_LEASE_BREAK_PERIOD = "0";
public static final String DEFAULT_TIMEOUT = "90";
public static final String APPEND_BLOB_TYPE = "appendblob";
public static final String TOKEN_VERSION = "2";
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
index 02b143c..4fe1d1c 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
@@ -87,6 +87,15 @@ public final class ConfigurationKeys {
/** Provides a config to provide comma separated path prefixes on which Appendblob based files are created
* Default is empty. **/
public static final String FS_AZURE_APPEND_BLOB_KEY = "fs.azure.appendblob.directories";
+ /** Provides a config to provide comma separated path prefixes which support infinite leases.
+ * Files under these paths will be leased when created or opened for writing and the lease will
+ * be released when the file is closed. The lease may be broken with the breakLease method on
+ * AzureBlobFileSystem. Default is empty.
+ * **/
+ public static final String FS_AZURE_INFINITE_LEASE_KEY = "fs.azure.infinite-lease.directories";
+ /** Provides a number of threads to use for lease operations for infinite lease directories.
+ * Must be set to a minimum of 1 if infinite lease directories are to be used. Default is 0. **/
+ public static final String FS_AZURE_LEASE_THREADS = "fs.azure.lease.threads";
public static final String FS_AZURE_READ_AHEAD_QUEUE_DEPTH = "fs.azure.readaheadqueue.depth";
public static final String FS_AZURE_ALWAYS_READ_BUFFER_SIZE = "fs.azure.read.alwaysReadBufferSize";
public static final String FS_AZURE_READ_AHEAD_BLOCK_SIZE = "fs.azure.read.readahead.blocksize";
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
index d90f525..040b18a 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
@@ -78,6 +78,13 @@ public final class FileSystemConfigurations {
public static final boolean DEFAULT_FS_AZURE_ENABLE_CONDITIONAL_CREATE_OVERWRITE = true;
public static final boolean DEFAULT_FS_AZURE_ENABLE_MKDIR_OVERWRITE = true;
public static final String DEFAULT_FS_AZURE_APPEND_BLOB_DIRECTORIES = "";
+ public static final String DEFAULT_FS_AZURE_INFINITE_LEASE_DIRECTORIES = "";
+ public static final int DEFAULT_LEASE_THREADS = 0;
+ public static final int MIN_LEASE_THREADS = 0;
+ public static final int DEFAULT_LEASE_DURATION = -1;
+ public static final int INFINITE_LEASE_DURATION = -1;
+ public static final int MIN_LEASE_DURATION = 15;
+ public static final int MAX_LEASE_DURATION = 60;
public static final int DEFAULT_READ_AHEAD_QUEUE_DEPTH = -1;
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/HttpHeaderConfigurations.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/HttpHeaderConfigurations.java
index 27ddcee..2325538 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/HttpHeaderConfigurations.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/HttpHeaderConfigurations.java
@@ -60,6 +60,11 @@ public final class HttpHeaderConfigurations {
public static final String X_MS_UMASK = "x-ms-umask";
public static final String X_MS_NAMESPACE_ENABLED = "x-ms-namespace-enabled";
public static final String X_MS_ABFS_CLIENT_LATENCY = "x-ms-abfs-client-latency";
+ public static final String X_MS_LEASE_ACTION = "x-ms-lease-action";
+ public static final String X_MS_LEASE_DURATION = "x-ms-lease-duration";
+ public static final String X_MS_LEASE_ID = "x-ms-lease-id";
+ public static final String X_MS_PROPOSED_LEASE_ID = "x-ms-proposed-lease-id";
+ public static final String X_MS_LEASE_BREAK_PERIOD = "x-ms-lease-break-period";
private HttpHeaderConfigurations() {}
}
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/annotations/ConfigurationValidationAnnotations.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/annotations/ConfigurationValidationAnnotations.java
index 82c571a..9fbe5a2 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/annotations/ConfigurationValidationAnnotations.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/annotations/ConfigurationValidationAnnotations.java
@@ -46,6 +46,22 @@ public class ConfigurationValidationAnnotations {
boolean ThrowIfInvalid() default false;
}
+ @Target({ ElementType.FIELD })
+ @Retention(RetentionPolicy.RUNTIME)
+ public @interface IntegerWithOutlierConfigurationValidatorAnnotation {
+ String ConfigurationKey();
+
+ int MaxValue() default Integer.MAX_VALUE;
+
+ int MinValue() default Integer.MIN_VALUE;
+
+ int OutlierValue() default Integer.MIN_VALUE;
+
+ int DefaultValue();
+
+ boolean ThrowIfInvalid() default false;
+ }
+
/**
* Describes the requirements when validating the annotated long field.
*/
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/exceptions/AzureBlobFileSystemException.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/exceptions/AzureBlobFileSystemException.java
index 9b1bead..d829c5a 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/exceptions/AzureBlobFileSystemException.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/exceptions/AzureBlobFileSystemException.java
@@ -37,6 +37,10 @@ public abstract class AzureBlobFileSystemException extends IOException {
super(message, innerException);
}
+ public AzureBlobFileSystemException(final String message, final Throwable innerThrowable) {
+ super(message, innerThrowable);
+ }
+
@Override
public String toString() {
if (this.getMessage() == null && this.getCause() == null) {
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/AppendRequestParameters.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/AppendRequestParameters.java
index fb4d29f..7369bfa 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/AppendRequestParameters.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/AppendRequestParameters.java
@@ -33,17 +33,20 @@ public class AppendRequestParameters {
private final int length;
private final Mode mode;
private final boolean isAppendBlob;
+ private final String leaseId;
public AppendRequestParameters(final long position,
final int offset,
final int length,
final Mode mode,
- final boolean isAppendBlob) {
+ final boolean isAppendBlob,
+ final String leaseId) {
this.position = position;
this.offset = offset;
this.length = length;
this.mode = mode;
this.isAppendBlob = isAppendBlob;
+ this.leaseId = leaseId;
}
public long getPosition() {
@@ -66,4 +69,7 @@ public class AppendRequestParameters {
return this.isAppendBlob;
}
+ public String getLeaseId() {
+ return this.leaseId;
+ }
}
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/diagnostics/IntegerConfigurationBasicValidator.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/diagnostics/IntegerConfigurationBasicValidator.java
index 26c7d2f..9d4beb7 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/diagnostics/IntegerConfigurationBasicValidator.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/diagnostics/IntegerConfigurationBasicValidator.java
@@ -31,11 +31,18 @@ import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidConfigurationVa
public class IntegerConfigurationBasicValidator extends ConfigurationBasicValidator<Integer> implements ConfigurationValidator {
private final int min;
private final int max;
+ private final int outlier;
public IntegerConfigurationBasicValidator(final int min, final int max, final int defaultVal, final String configKey, final boolean throwIfInvalid) {
+ this(min, min, max, defaultVal, configKey, throwIfInvalid);
+ }
+
+ public IntegerConfigurationBasicValidator(final int outlier, final int min, final int max,
+ final int defaultVal, final String configKey, final boolean throwIfInvalid) {
super(configKey, defaultVal, throwIfInvalid);
this.min = min;
this.max = max;
+ this.outlier = outlier;
}
public Integer validate(final String configValue) throws InvalidConfigurationValueException {
@@ -47,10 +54,14 @@ public class IntegerConfigurationBasicValidator extends ConfigurationBasicValida
try {
result = Integer.parseInt(configValue);
// throw an exception if a 'within bounds' value is missing
- if (getThrowIfInvalid() && (result < this.min || result > this.max)) {
+ if (getThrowIfInvalid() && (result != outlier) && (result < this.min || result > this.max)) {
throw new InvalidConfigurationValueException(getConfigKey());
}
+ if (result == outlier) {
+ return result;
+ }
+
// set the value to the nearest bound if it's out of bounds
if (result < this.min) {
return this.min;
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
index 92b24f0..7c8a211 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
@@ -29,16 +29,27 @@ import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.thirdparty.com.google.common.base.Strings;
-import org.apache.hadoop.security.ssl.DelegatingSSLSocketFactory;
-import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants;
-import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
-import org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams;
+import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.FutureCallback;
+import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.Futures;
+import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListenableFuture;
+import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListenableScheduledFuture;
+import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListeningScheduledExecutorService;
+import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.MoreExecutors;
+import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants;
+import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
+import org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidUriException;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.SASTokenProviderException;
@@ -49,6 +60,8 @@ import org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters;
import org.apache.hadoop.fs.azurebfs.oauth2.AccessTokenProvider;
import org.apache.hadoop.fs.azurebfs.utils.DateTimeUtils;
import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.security.ssl.DelegatingSSLSocketFactory;
+import org.apache.hadoop.util.concurrent.HadoopExecutors;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.*;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_DELETE_CONSIDERED_IDEMPOTENT;
@@ -76,6 +89,8 @@ public class AbfsClient implements Closeable {
private SASTokenProvider sasTokenProvider;
private final AbfsCounters abfsCounters;
+ private final ListeningScheduledExecutorService executorService;
+
private AbfsClient(final URL baseUrl, final SharedKeyCredentials sharedKeyCredentials,
final AbfsConfiguration abfsConfiguration,
final AbfsClientContext abfsClientContext) {
@@ -106,6 +121,11 @@ public class AbfsClient implements Closeable {
this.userAgent = initializeUserAgent(abfsConfiguration, sslProviderName);
this.abfsPerfTracker = abfsClientContext.getAbfsPerfTracker();
this.abfsCounters = abfsClientContext.getAbfsCounters();
+
+ ThreadFactory tf =
+ new ThreadFactoryBuilder().setNameFormat("AbfsClient Lease Ops").setDaemon(true).build();
+ this.executorService = MoreExecutors.listeningDecorator(
+ HadoopExecutors.newScheduledThreadPool(this.abfsConfiguration.getNumLeaseThreads(), tf));
}
public AbfsClient(final URL baseUrl, final SharedKeyCredentials sharedKeyCredentials,
@@ -129,6 +149,7 @@ public class AbfsClient implements Closeable {
if (tokenProvider instanceof Closeable) {
IOUtils.cleanupWithLogger(LOG, (Closeable) tokenProvider);
}
+ HadoopExecutors.shutdown(executorService, LOG, 0, TimeUnit.SECONDS);
}
public String getFileSystem() {
@@ -317,6 +338,83 @@ public class AbfsClient implements Closeable {
return op;
}
+ public AbfsRestOperation acquireLease(final String path, int duration) throws AzureBlobFileSystemException {
+ final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
+
+ requestHeaders.add(new AbfsHttpHeader(X_MS_LEASE_ACTION, ACQUIRE_LEASE_ACTION));
+ requestHeaders.add(new AbfsHttpHeader(X_MS_LEASE_DURATION, Integer.toString(duration)));
+ requestHeaders.add(new AbfsHttpHeader(X_MS_PROPOSED_LEASE_ID, UUID.randomUUID().toString()));
+
+ final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
+
+ final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
+ final AbfsRestOperation op = new AbfsRestOperation(
+ AbfsRestOperationType.LeasePath,
+ this,
+ HTTP_METHOD_POST,
+ url,
+ requestHeaders);
+ op.execute();
+ return op;
+ }
+
+ public AbfsRestOperation renewLease(final String path, final String leaseId) throws AzureBlobFileSystemException {
+ final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
+
+ requestHeaders.add(new AbfsHttpHeader(X_MS_LEASE_ACTION, RENEW_LEASE_ACTION));
+ requestHeaders.add(new AbfsHttpHeader(X_MS_LEASE_ID, leaseId));
+
+ final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
+
+ final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
+ final AbfsRestOperation op = new AbfsRestOperation(
+ AbfsRestOperationType.LeasePath,
+ this,
+ HTTP_METHOD_POST,
+ url,
+ requestHeaders);
+ op.execute();
+ return op;
+ }
+
+ public AbfsRestOperation releaseLease(final String path, final String leaseId) throws AzureBlobFileSystemException {
+ final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
+
+ requestHeaders.add(new AbfsHttpHeader(X_MS_LEASE_ACTION, RELEASE_LEASE_ACTION));
+ requestHeaders.add(new AbfsHttpHeader(X_MS_LEASE_ID, leaseId));
+
+ final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
+
+ final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
+ final AbfsRestOperation op = new AbfsRestOperation(
+ AbfsRestOperationType.LeasePath,
+ this,
+ HTTP_METHOD_POST,
+ url,
+ requestHeaders);
+ op.execute();
+ return op;
+ }
+
+ public AbfsRestOperation breakLease(final String path) throws AzureBlobFileSystemException {
+ final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
+
+ requestHeaders.add(new AbfsHttpHeader(X_MS_LEASE_ACTION, BREAK_LEASE_ACTION));
+ requestHeaders.add(new AbfsHttpHeader(X_MS_LEASE_BREAK_PERIOD, DEFAULT_LEASE_BREAK_PERIOD));
+
+ final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
+
+ final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
+ final AbfsRestOperation op = new AbfsRestOperation(
+ AbfsRestOperationType.LeasePath,
+ this,
+ HTTP_METHOD_POST,
+ url,
+ requestHeaders);
+ op.execute();
+ return op;
+ }
+
public AbfsRestOperation renamePath(String source, final String destination, final String continuation)
throws AzureBlobFileSystemException {
final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
@@ -416,6 +514,9 @@ public class AbfsClient implements Closeable {
// PUT and specify the real method in the X-Http-Method-Override header.
requestHeaders.add(new AbfsHttpHeader(X_HTTP_METHOD_OVERRIDE,
HTTP_METHOD_PATCH));
+ if (reqParams.getLeaseId() != null) {
+ requestHeaders.add(new AbfsHttpHeader(X_MS_LEASE_ID, reqParams.getLeaseId()));
+ }
final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
abfsUriQueryBuilder.addQuery(QUERY_PARAM_ACTION, APPEND_ACTION);
@@ -492,13 +593,16 @@ public class AbfsClient implements Closeable {
}
public AbfsRestOperation flush(final String path, final long position, boolean retainUncommittedData,
- boolean isClose, final String cachedSasToken)
+ boolean isClose, final String cachedSasToken, final String leaseId)
throws AzureBlobFileSystemException {
final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
// JDK7 does not support PATCH, so to workaround the issue we will use
// PUT and specify the real method in the X-Http-Method-Override header.
requestHeaders.add(new AbfsHttpHeader(X_HTTP_METHOD_OVERRIDE,
HTTP_METHOD_PATCH));
+ if (leaseId != null) {
+ requestHeaders.add(new AbfsHttpHeader(X_MS_LEASE_ID, leaseId));
+ }
final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
abfsUriQueryBuilder.addQuery(QUERY_PARAM_ACTION, FLUSH_ACTION);
@@ -1003,4 +1107,21 @@ public class AbfsClient implements Closeable {
protected AbfsCounters getAbfsCounters() {
return abfsCounters;
}
+
+ public int getNumLeaseThreads() {
+ return abfsConfiguration.getNumLeaseThreads();
+ }
+
+ public <V> ListenableScheduledFuture<V> schedule(Callable<V> callable, long delay,
+ TimeUnit timeUnit) {
+ return executorService.schedule(callable, delay, timeUnit);
+ }
+
+ public ListenableFuture<?> submit(Runnable runnable) {
+ return executorService.submit(runnable);
+ }
+
+ public <V> void addCallback(ListenableFuture<V> future, FutureCallback<V> callback) {
+ Futures.addCallback(future, callback, executorService);
+ }
}
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsErrors.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsErrors.java
new file mode 100644
index 0000000..e15795e
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsErrors.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.azurebfs.services;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_LEASE_THREADS;
+
+/**
+ * ABFS error constants.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public final class AbfsErrors {
+ public static final String ERR_WRITE_WITHOUT_LEASE = "Attempted to write to file without lease";
+ public static final String ERR_LEASE_EXPIRED = "A lease ID was specified, but the lease for the"
+ + " resource has expired";
+ public static final String ERR_NO_LEASE_ID_SPECIFIED = "There is currently a lease on the "
+ + "resource and no lease ID was specified in the request";
+ public static final String ERR_PARALLEL_ACCESS_DETECTED = "Parallel access to the create path "
+ + "detected. Failing request to honor single writer semantics";
+ public static final String ERR_ACQUIRING_LEASE = "Unable to acquire lease";
+ public static final String ERR_LEASE_ALREADY_PRESENT = "There is already a lease present";
+ public static final String ERR_LEASE_NOT_PRESENT = "There is currently no lease on the resource";
+ public static final String ERR_LEASE_ID_NOT_PRESENT = "The lease ID is not present with the "
+ + "specified lease operation";
+ public static final String ERR_LEASE_DID_NOT_MATCH = "The lease ID specified did not match the "
+ + "lease ID for the resource with the specified lease operation";
+ public static final String ERR_LEASE_BROKEN = "The lease ID matched, but the lease has been "
+ + "broken explicitly and cannot be renewed";
+ public static final String ERR_LEASE_FUTURE_EXISTS = "There is already an existing lease "
+ + "operation";
+ public static final String ERR_NO_LEASE_THREADS = "Lease desired but no lease threads "
+ + "configured, set " + FS_AZURE_LEASE_THREADS;
+
+ private AbfsErrors() {}
+}
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsLease.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsLease.java
new file mode 100644
index 0000000..97a8b02
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsLease.java
@@ -0,0 +1,188 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.FutureCallback;
+import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListenableScheduledFuture;
+import org.apache.hadoop.thirdparty.org.checkerframework.checker.nullness.qual.Nullable;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
+import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
+import org.apache.hadoop.io.retry.RetryPolicies;
+import org.apache.hadoop.io.retry.RetryPolicy;
+
+import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.INFINITE_LEASE_DURATION;
+import static org.apache.hadoop.fs.azurebfs.services.AbfsErrors.ERR_ACQUIRING_LEASE;
+import static org.apache.hadoop.fs.azurebfs.services.AbfsErrors.ERR_LEASE_FUTURE_EXISTS;
+import static org.apache.hadoop.fs.azurebfs.services.AbfsErrors.ERR_NO_LEASE_THREADS;
+
+/**
+ * AbfsLease manages an Azure blob lease. It acquires an infinite lease on instantiation and
+ * releases the lease when free() is called. Use it to prevent writes to the blob by other
+ * processes that don't have the lease.
+ *
+ * Creating a new Lease object blocks the caller until the Azure blob lease is acquired. It will
+ * retry a fixed number of times before failing if there is a problem acquiring the lease.
+ *
+ * Call free() to release the Lease. If the holder process dies, AzureBlobFileSystem breakLease
+ * will need to be called before another client will be able to write to the file.
+ */
+public final class AbfsLease {
+ private static final Logger LOG = LoggerFactory.getLogger(AbfsLease.class);
+
+ // Number of retries for acquiring lease
+ static final int DEFAULT_LEASE_ACQUIRE_MAX_RETRIES = 7;
+ // Retry interval for acquiring lease in secs
+ static final int DEFAULT_LEASE_ACQUIRE_RETRY_INTERVAL = 10;
+
+ private final AbfsClient client;
+ private final String path;
+
+ // Lease status variables
+ private volatile boolean leaseFreed;
+ private volatile String leaseID = null;
+ private volatile Throwable exception = null;
+ private volatile int acquireRetryCount = 0;
+ private volatile ListenableScheduledFuture<AbfsRestOperation> future = null;
+
+ public static class LeaseException extends AzureBlobFileSystemException {
+ public LeaseException(Throwable t) {
+ super(ERR_ACQUIRING_LEASE + ": " + t, t);
+ }
+
+ public LeaseException(String s) {
+ super(s);
+ }
+ }
+
+ public AbfsLease(AbfsClient client, String path) throws AzureBlobFileSystemException {
+ this(client, path, DEFAULT_LEASE_ACQUIRE_MAX_RETRIES, DEFAULT_LEASE_ACQUIRE_RETRY_INTERVAL);
+ }
+
+ @VisibleForTesting
+ public AbfsLease(AbfsClient client, String path, int acquireMaxRetries,
+ int acquireRetryInterval) throws AzureBlobFileSystemException {
+ this.leaseFreed = false;
+ this.client = client;
+ this.path = path;
+
+ if (client.getNumLeaseThreads() < 1) {
+ throw new LeaseException(ERR_NO_LEASE_THREADS);
+ }
+
+ // Try to get the lease a specified number of times, else throw an error
+ RetryPolicy retryPolicy = RetryPolicies.retryUpToMaximumCountWithFixedSleep(
+ acquireMaxRetries, acquireRetryInterval, TimeUnit.SECONDS);
+ acquireLease(retryPolicy, 0, acquireRetryInterval, 0);
+
+ while (leaseID == null && exception == null) {
+ try {
+ future.get();
+ } catch (Exception e) {
+ LOG.debug("Got exception waiting for acquire lease future. Checking if lease ID or "
+ + "exception have been set", e);
+ }
+ }
+ if (exception != null) {
+ LOG.error("Failed to acquire lease on {}", path);
+ throw new LeaseException(exception);
+ }
+
+ LOG.debug("Acquired lease {} on {}", leaseID, path);
+ }
+
+ private void acquireLease(RetryPolicy retryPolicy, int numRetries, int retryInterval, long delay)
+ throws LeaseException {
+ LOG.debug("Attempting to acquire lease on {}, retry {}", path, numRetries);
+ if (future != null && !future.isDone()) {
+ throw new LeaseException(ERR_LEASE_FUTURE_EXISTS);
+ }
+ future = client.schedule(() -> client.acquireLease(path, INFINITE_LEASE_DURATION),
+ delay, TimeUnit.SECONDS);
+ client.addCallback(future, new FutureCallback<AbfsRestOperation>() {
+ @Override
+ public void onSuccess(@Nullable AbfsRestOperation op) {
+ leaseID = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_LEASE_ID);
+ LOG.debug("Acquired lease {} on {}", leaseID, path);
+ }
+
+ @Override
+ public void onFailure(Throwable throwable) {
+ try {
+ if (RetryPolicy.RetryAction.RetryDecision.RETRY
+ == retryPolicy.shouldRetry(null, numRetries, 0, true).action) {
+ LOG.debug("Failed to acquire lease on {}, retrying: {}", path, throwable);
+ acquireRetryCount++;
+ acquireLease(retryPolicy, numRetries + 1, retryInterval, retryInterval);
+ } else {
+ exception = throwable;
+ }
+ } catch (Exception e) {
+ exception = throwable;
+ }
+ }
+ });
+ }
+
+ /**
+ * Cancel future and free the lease. If an exception occurs while releasing the lease, the error
+ * will be logged. If the lease cannot be released, AzureBlobFileSystem breakLease will need to
+ * be called before another client will be able to write to the file.
+ */
+ public void free() {
+ if (leaseFreed) {
+ return;
+ }
+ try {
+ LOG.debug("Freeing lease: path {}, lease id {}", path, leaseID);
+ if (future != null && !future.isDone()) {
+ future.cancel(true);
+ }
+ client.releaseLease(path, leaseID);
+ } catch (IOException e) {
+ LOG.warn("Exception when trying to release lease {} on {}. Lease will need to be broken: {}",
+ leaseID, path, e.getMessage());
+ } finally {
+ // Even if releasing the lease fails (e.g. because the file was deleted),
+ // make sure to record that we freed the lease
+ leaseFreed = true;
+ LOG.debug("Freed lease {} on {}", leaseID, path);
+ }
+ }
+
+ public boolean isFreed() {
+ return leaseFreed;
+ }
+
+ public String getLeaseID() {
+ return leaseID;
+ }
+
+ @VisibleForTesting
+ public int getAcquireRetryCount() {
+ return acquireRetryCount;
+ }
+}
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java
index 2d02019..80b35ee 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.hadoop.fs.PathIOException;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
import org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters;
@@ -53,6 +54,7 @@ import org.apache.hadoop.fs.FSExceptionMessages;
import org.apache.hadoop.fs.StreamCapabilities;
import org.apache.hadoop.fs.Syncable;
+import static org.apache.hadoop.fs.azurebfs.services.AbfsErrors.ERR_WRITE_WITHOUT_LEASE;
import static org.apache.hadoop.fs.impl.StoreImplementationUtils.isProbeForSyncable;
import static org.apache.hadoop.io.IOUtils.wrapException;
import static org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters.Mode.APPEND_MODE;
@@ -92,6 +94,9 @@ public class AbfsOutputStream extends OutputStream implements Syncable,
// SAS tokens can be re-used until they expire
private CachedSASToken cachedSasToken;
+ private AbfsLease lease;
+ private String leaseId;
+
/**
* Queue storing buffers with the size of the Azure block ready for
* reuse. The pool allows reusing the blocks instead of allocating new
@@ -142,6 +147,10 @@ public class AbfsOutputStream extends OutputStream implements Syncable,
}
this.maxRequestsThatCanBeQueued = abfsOutputStreamContext
.getMaxWriteRequestsToQueue();
+
+ this.lease = abfsOutputStreamContext.getLease();
+ this.leaseId = abfsOutputStreamContext.getLeaseId();
+
this.threadExecutor
= new ThreadPoolExecutor(maxConcurrentRequestCount,
maxConcurrentRequestCount,
@@ -203,6 +212,10 @@ public class AbfsOutputStream extends OutputStream implements Syncable,
throw new IndexOutOfBoundsException();
}
+ if (hasLease() && isLeaseFreed()) {
+ throw new PathIOException(path, ERR_WRITE_WITHOUT_LEASE);
+ }
+
int currentOffset = off;
int writableBytes = bufferSize - bufferIndex;
int numberOfBytesToWrite = length;
@@ -306,6 +319,10 @@ public class AbfsOutputStream extends OutputStream implements Syncable,
// See HADOOP-16785
throw wrapException(path, e.getMessage(), e);
} finally {
+ if (hasLease()) {
+ lease.free();
+ lease = null;
+ }
lastError = new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
buffer = null;
bufferIndex = 0;
@@ -372,7 +389,7 @@ public class AbfsOutputStream extends OutputStream implements Syncable,
try (AbfsPerfInfo perfInfo = new AbfsPerfInfo(tracker,
"writeCurrentBufferToService", "append")) {
AppendRequestParameters reqParams = new AppendRequestParameters(offset, 0,
- bytesLength, APPEND_MODE, true);
+ bytesLength, APPEND_MODE, true, leaseId);
AbfsRestOperation op = client.append(path, bytes, reqParams, cachedSasToken.get());
cachedSasToken.update(op.getSasToken());
if (outputStreamStatistics != null) {
@@ -448,7 +465,7 @@ public class AbfsOutputStream extends OutputStream implements Syncable,
mode = FLUSH_MODE;
}
AppendRequestParameters reqParams = new AppendRequestParameters(
- offset, 0, bytesLength, mode, false);
+ offset, 0, bytesLength, mode, false, leaseId);
AbfsRestOperation op = client.append(path, bytes, reqParams,
cachedSasToken.get());
cachedSasToken.update(op.getSasToken());
@@ -517,7 +534,8 @@ public class AbfsOutputStream extends OutputStream implements Syncable,
AbfsPerfTracker tracker = client.getAbfsPerfTracker();
try (AbfsPerfInfo perfInfo = new AbfsPerfInfo(tracker,
"flushWrittenBytesToServiceInternal", "flush")) {
- AbfsRestOperation op = client.flush(path, offset, retainUncommitedData, isClose, cachedSasToken.get());
+ AbfsRestOperation op = client.flush(path, offset, retainUncommitedData, isClose,
+ cachedSasToken.get(), leaseId);
cachedSasToken.update(op.getSasToken());
perfInfo.registerResult(op.getResult()).registerSuccess(true);
} catch (AzureBlobFileSystemException ex) {
@@ -637,6 +655,19 @@ public class AbfsOutputStream extends OutputStream implements Syncable,
return ioStatistics;
}
+ @VisibleForTesting
+ public boolean isLeaseFreed() {
+ if (lease == null) {
+ return true;
+ }
+ return lease.isFreed();
+ }
+
+ @VisibleForTesting
+ public boolean hasLease() {
+ return lease != null;
+ }
+
/**
* Appending AbfsOutputStream statistics to base toString().
*
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamContext.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamContext.java
index 925cd4f..48f6f54 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamContext.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamContext.java
@@ -39,6 +39,8 @@ public class AbfsOutputStreamContext extends AbfsStreamContext {
private int maxWriteRequestsToQueue;
+ private AbfsLease lease;
+
public AbfsOutputStreamContext(final long sasTokenRenewPeriodForStreamsInSeconds) {
super(sasTokenRenewPeriodForStreamsInSeconds);
}
@@ -94,6 +96,11 @@ public class AbfsOutputStreamContext extends AbfsStreamContext {
return this;
}
+ public AbfsOutputStreamContext withLease(final AbfsLease lease) {
+ this.lease = lease;
+ return this;
+ }
+
public int getWriteBufferSize() {
return writeBufferSize;
}
@@ -125,4 +132,15 @@ public class AbfsOutputStreamContext extends AbfsStreamContext {
public boolean isEnableSmallWriteOptimization() {
return this.enableSmallWriteOptimization;
}
+
+ public AbfsLease getLease() {
+ return this.lease;
+ }
+
+ public String getLeaseId() {
+ if (this.lease == null) {
+ return null;
+ }
+ return this.lease.getLeaseID();
+ }
}
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java
index 584b71f..b046cbc 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java
@@ -131,6 +131,7 @@ public class AbfsRestOperation {
this.url = url;
this.requestHeaders = requestHeaders;
this.hasRequestBody = (AbfsHttpConstants.HTTP_METHOD_PUT.equals(method)
+ || AbfsHttpConstants.HTTP_METHOD_POST.equals(method)
|| AbfsHttpConstants.HTTP_METHOD_PATCH.equals(method));
this.sasToken = sasToken;
this.abfsCounters = client.getAbfsCounters();
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperationType.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperationType.java
index d303186..830297f 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperationType.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperationType.java
@@ -40,5 +40,6 @@ public enum AbfsRestOperationType {
Flush,
ReadFile,
DeletePath,
- CheckAccess
+ CheckAccess,
+ LeasePath,
}
diff --git a/hadoop-tools/hadoop-azure/src/site/markdown/abfs.md b/hadoop-tools/hadoop-azure/src/site/markdown/abfs.md
index 33d4a0f..6be5952 100644
--- a/hadoop-tools/hadoop-azure/src/site/markdown/abfs.md
+++ b/hadoop-tools/hadoop-azure/src/site/markdown/abfs.md
@@ -887,6 +887,22 @@ enabled for your Azure Storage account."
The directories can be specified as comma separated values. By default the value
is "/hbase"
+### <a name="infiniteleaseoptions"></a> Infinite Lease Options
+`fs.azure.infinite-lease.directories`: Directories for infinite lease support
+can be specified comma separated in this config. By default, multiple
+clients will be able to write to the same file simultaneously. When writing
+to files contained within the directories specified in this config, the
+client will obtain a lease on the file that will prevent any other clients
+from writing to the file. When the output stream is closed, the lease will be
+released. To revoke a client's write access for a file, the
+AzureBlobFilesystem breakLease method may be called. If the client dies
+before the file can be closed and the lease released, breakLease will need to
+be called before another client will be able to write to the file.
+
+`fs.azure.lease.threads`: This is the size of the thread pool that will be
+used for lease operations for infinite lease directories. By default the value
+is 0, so it must be set to at least 1 to support infinite lease directories.
+
### <a name="perfoptions"></a> Perf Options
#### <a name="abfstracklatencyoptions"></a> 1. HTTP Request Tracking Options
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemLease.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemLease.java
new file mode 100644
index 0000000..9857da8
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemLease.java
@@ -0,0 +1,336 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.azurebfs;
+
+import java.io.IOException;
+import java.util.concurrent.RejectedExecutionException;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
+import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
+import org.apache.hadoop.fs.azurebfs.services.AbfsLease;
+import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.test.LambdaTestUtils;
+
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.spy;
+
+import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_INFINITE_LEASE_KEY;
+import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_LEASE_THREADS;
+import static org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.FS_AZURE_TEST_NAMESPACE_ENABLED_ACCOUNT;
+import static org.apache.hadoop.fs.azurebfs.services.AbfsErrors.ERR_ACQUIRING_LEASE;
+import static org.apache.hadoop.fs.azurebfs.services.AbfsErrors.ERR_LEASE_EXPIRED;
+import static org.apache.hadoop.fs.azurebfs.services.AbfsErrors.ERR_LEASE_NOT_PRESENT;
+import static org.apache.hadoop.fs.azurebfs.services.AbfsErrors.ERR_NO_LEASE_ID_SPECIFIED;
+import static org.apache.hadoop.fs.azurebfs.services.AbfsErrors.ERR_NO_LEASE_THREADS;
+import static org.apache.hadoop.fs.azurebfs.services.AbfsErrors.ERR_PARALLEL_ACCESS_DETECTED;
+
+/**
+ * Test lease operations.
+ */
+public class ITestAzureBlobFileSystemLease extends AbstractAbfsIntegrationTest {
+ private static final int TEST_EXECUTION_TIMEOUT = 30 * 1000;
+ private static final int LONG_TEST_EXECUTION_TIMEOUT = 90 * 1000;
+ private static final String TEST_FILE = "testfile";
+ private final boolean isHNSEnabled;
+
+ public ITestAzureBlobFileSystemLease() throws Exception {
+ super();
+
+ this.isHNSEnabled = getConfiguration()
+ .getBoolean(FS_AZURE_TEST_NAMESPACE_ENABLED_ACCOUNT, false);
+ }
+
+ private AzureBlobFileSystem getCustomFileSystem(Path infiniteLeaseDirs, int numLeaseThreads) throws Exception {
+ Configuration conf = getRawConfiguration();
+ conf.setBoolean(String.format("fs.%s.impl.disable.cache", getAbfsScheme()), true);
+ conf.set(FS_AZURE_INFINITE_LEASE_KEY, infiniteLeaseDirs.toUri().getPath());
+ conf.setInt(FS_AZURE_LEASE_THREADS, numLeaseThreads);
+ return getFileSystem(conf);
+ }
+
+ @Test(timeout = TEST_EXECUTION_TIMEOUT)
+ public void testNoInfiniteLease() throws IOException {
+ final Path testFilePath = new Path(path(methodName.getMethodName()), TEST_FILE);
+ final AzureBlobFileSystem fs = getFileSystem();
+ fs.mkdirs(testFilePath.getParent());
+ try (FSDataOutputStream out = fs.create(testFilePath)) {
+ Assert.assertFalse("Output stream should not have lease",
+ ((AbfsOutputStream) out.getWrappedStream()).hasLease());
+ }
+ Assert.assertTrue("Store leases were not freed", fs.getAbfsStore().areLeasesFreed());
+ }
+
+ @Test(timeout = TEST_EXECUTION_TIMEOUT)
+ public void testNoLeaseThreads() throws Exception {
+ final Path testFilePath = new Path(path(methodName.getMethodName()), TEST_FILE);
+ final AzureBlobFileSystem fs = getCustomFileSystem(testFilePath.getParent(), 0);
+ fs.mkdirs(testFilePath.getParent());
+ LambdaTestUtils.intercept(IOException.class, ERR_NO_LEASE_THREADS, () -> {
+ try (FSDataOutputStream out = fs.create(testFilePath)) {
+ }
+ return "No failure when lease requested with 0 lease threads";
+ });
+ }
+
+ @Test(timeout = TEST_EXECUTION_TIMEOUT)
+ public void testOneWriter() throws Exception {
+ final Path testFilePath = new Path(path(methodName.getMethodName()), TEST_FILE);
+ final AzureBlobFileSystem fs = getCustomFileSystem(testFilePath.getParent(), 1);
+ fs.mkdirs(testFilePath.getParent());
+
+ FSDataOutputStream out = fs.create(testFilePath);
+ Assert.assertTrue("Output stream should have lease",
+ ((AbfsOutputStream) out.getWrappedStream()).hasLease());
+ out.close();
+ Assert.assertFalse("Output stream should not have lease",
+ ((AbfsOutputStream) out.getWrappedStream()).hasLease());
+ Assert.assertTrue("Store leases were not freed", fs.getAbfsStore().areLeasesFreed());
+ }
+
+ @Test(timeout = TEST_EXECUTION_TIMEOUT)
+ public void testSubDir() throws Exception {
+ final Path testFilePath = new Path(new Path(path(methodName.getMethodName()), "subdir"),
+ TEST_FILE);
+ final AzureBlobFileSystem fs =
+ getCustomFileSystem(testFilePath.getParent().getParent(), 1);
+ fs.mkdirs(testFilePath.getParent().getParent());
+
+ FSDataOutputStream out = fs.create(testFilePath);
+ Assert.assertTrue("Output stream should have lease",
+ ((AbfsOutputStream) out.getWrappedStream()).hasLease());
+ out.close();
+ Assert.assertFalse("Output stream should not have lease",
+ ((AbfsOutputStream) out.getWrappedStream()).hasLease());
+ Assert.assertTrue("Store leases were not freed", fs.getAbfsStore().areLeasesFreed());
+ }
+
+ @Test(timeout = TEST_EXECUTION_TIMEOUT)
+ public void testTwoCreate() throws Exception {
+ final Path testFilePath = new Path(path(methodName.getMethodName()), TEST_FILE);
+ final AzureBlobFileSystem fs = getCustomFileSystem(testFilePath.getParent(), 1);
+ fs.mkdirs(testFilePath.getParent());
+
+ try (FSDataOutputStream out = fs.create(testFilePath)) {
+ LambdaTestUtils.intercept(IOException.class, isHNSEnabled ? ERR_PARALLEL_ACCESS_DETECTED
+ : ERR_NO_LEASE_ID_SPECIFIED, () -> {
+ try (FSDataOutputStream out2 = fs.create(testFilePath)) {
+ }
+ return "Expected second create on infinite lease dir to fail";
+ });
+ }
+ Assert.assertTrue("Store leases were not freed", fs.getAbfsStore().areLeasesFreed());
+ }
+
+ private void twoWriters(AzureBlobFileSystem fs, Path testFilePath, boolean expectException) throws Exception {
+ try (FSDataOutputStream out = fs.create(testFilePath)) {
+ try (FSDataOutputStream out2 = fs.append(testFilePath)) {
+ out2.writeInt(2);
+ out2.hsync();
+ } catch (IOException e) {
+ if (expectException) {
+ GenericTestUtils.assertExceptionContains(ERR_ACQUIRING_LEASE, e);
+ } else {
+ throw e;
+ }
+ }
+ out.writeInt(1);
+ out.hsync();
+ }
+
+ Assert.assertTrue("Store leases were not freed", fs.getAbfsStore().areLeasesFreed());
+ }
+
+ @Test(timeout = TEST_EXECUTION_TIMEOUT)
+ public void testTwoWritersCreateAppendNoInfiniteLease() throws Exception {
+ final Path testFilePath = new Path(path(methodName.getMethodName()), TEST_FILE);
+ final AzureBlobFileSystem fs = getFileSystem();
+ fs.mkdirs(testFilePath.getParent());
+
+ twoWriters(fs, testFilePath, false);
+ }
+
+ @Test(timeout = LONG_TEST_EXECUTION_TIMEOUT)
+ public void testTwoWritersCreateAppendWithInfiniteLeaseEnabled() throws Exception {
+ final Path testFilePath = new Path(path(methodName.getMethodName()), TEST_FILE);
+ final AzureBlobFileSystem fs = getCustomFileSystem(testFilePath.getParent(), 1);
+ fs.mkdirs(testFilePath.getParent());
+
+ twoWriters(fs, testFilePath, true);
+ }
+
+ @Test(timeout = TEST_EXECUTION_TIMEOUT)
+ public void testLeaseFreedOnClose() throws Exception {
+ final Path testFilePath = new Path(path(methodName.getMethodName()), TEST_FILE);
+ final AzureBlobFileSystem fs = getCustomFileSystem(testFilePath.getParent(), 1);
+ fs.mkdirs(testFilePath.getParent());
+
+ FSDataOutputStream out;
+ out = fs.create(testFilePath);
+ out.write(0);
+ Assert.assertTrue("Output stream should have lease",
+ ((AbfsOutputStream) out.getWrappedStream()).hasLease());
+ out.close();
+ Assert.assertFalse("Output stream should not have lease after close",
+ ((AbfsOutputStream) out.getWrappedStream()).hasLease());
+ Assert.assertTrue("Store leases were not freed", fs.getAbfsStore().areLeasesFreed());
+ }
+
+ @Test(timeout = TEST_EXECUTION_TIMEOUT)
+ public void testWriteAfterBreakLease() throws Exception {
+ final Path testFilePath = new Path(path(methodName.getMethodName()), TEST_FILE);
+ final AzureBlobFileSystem fs = getCustomFileSystem(testFilePath.getParent(), 1);
+ fs.mkdirs(testFilePath.getParent());
+
+ FSDataOutputStream out;
+ out = fs.create(testFilePath);
+ out.write(0);
+ out.hsync();
+
+ fs.breakLease(testFilePath);
+
+ LambdaTestUtils.intercept(IOException.class, ERR_LEASE_EXPIRED, () -> {
+ out.write(1);
+ out.hsync();
+ return "Expected exception on write after lease break but got " + out;
+ });
+
+ LambdaTestUtils.intercept(IOException.class, ERR_LEASE_EXPIRED, () -> {
+ out.close();
+ return "Expected exception on close after lease break but got " + out;
+ });
+
+ Assert.assertTrue("Output stream lease should be freed",
+ ((AbfsOutputStream) out.getWrappedStream()).isLeaseFreed());
+
+ try (FSDataOutputStream out2 = fs.append(testFilePath)) {
+ out2.write(2);
+ out2.hsync();
+ }
+
+ Assert.assertTrue("Store leases were not freed", fs.getAbfsStore().areLeasesFreed());
+ }
+
+ @Test(timeout = LONG_TEST_EXECUTION_TIMEOUT)
+ public void testLeaseFreedAfterBreak() throws Exception {
+ final Path testFilePath = new Path(path(methodName.getMethodName()), TEST_FILE);
+ final AzureBlobFileSystem fs = getCustomFileSystem(testFilePath.getParent(), 1);
+ fs.mkdirs(testFilePath.getParent());
+
+ FSDataOutputStream out = fs.create(testFilePath);
+ out.write(0);
+
+ fs.breakLease(testFilePath);
+
+ LambdaTestUtils.intercept(IOException.class, ERR_LEASE_EXPIRED, () -> {
+ out.close();
+ return "Expected exception on close after lease break but got " + out;
+ });
+
+ Assert.assertTrue("Output stream lease should be freed",
+ ((AbfsOutputStream) out.getWrappedStream()).isLeaseFreed());
+
+ Assert.assertTrue("Store leases were not freed", fs.getAbfsStore().areLeasesFreed());
+ }
+
+ @Test(timeout = TEST_EXECUTION_TIMEOUT)
+ public void testInfiniteLease() throws Exception {
+ final Path testFilePath = new Path(path(methodName.getMethodName()), TEST_FILE);
+ final AzureBlobFileSystem fs = getCustomFileSystem(testFilePath.getParent(), 1);
+ fs.mkdirs(testFilePath.getParent());
+
+ try (FSDataOutputStream out = fs.create(testFilePath)) {
+ Assert.assertTrue("Output stream should have lease",
+ ((AbfsOutputStream) out.getWrappedStream()).hasLease());
+ out.write(0);
+ }
+ Assert.assertTrue(fs.getAbfsStore().areLeasesFreed());
+
+ try (FSDataOutputStream out = fs.append(testFilePath)) {
+ Assert.assertTrue("Output stream should have lease",
+ ((AbfsOutputStream) out.getWrappedStream()).hasLease());
+ out.write(1);
+ }
+ Assert.assertTrue("Store leases were not freed", fs.getAbfsStore().areLeasesFreed());
+ }
+
+ @Test(timeout = TEST_EXECUTION_TIMEOUT)
+ public void testFileSystemClose() throws Exception {
+ final Path testFilePath = new Path(path(methodName.getMethodName()), TEST_FILE);
+ final AzureBlobFileSystem fs = getCustomFileSystem(testFilePath.getParent(), 1);
+ fs.mkdirs(testFilePath.getParent());
+
+ FSDataOutputStream out = fs.create(testFilePath);
+ out.write(0);
+ Assert.assertFalse("Store leases should exist", fs.getAbfsStore().areLeasesFreed());
+ fs.close();
+ Assert.assertTrue("Store leases were not freed", fs.getAbfsStore().areLeasesFreed());
+
+ LambdaTestUtils.intercept(IOException.class, isHNSEnabled ? ERR_LEASE_NOT_PRESENT
+ : ERR_LEASE_EXPIRED, () -> {
+ out.close();
+ return "Expected exception on close after closed FS but got " + out;
+ });
+
+ LambdaTestUtils.intercept(RejectedExecutionException.class, () -> {
+ try (FSDataOutputStream out2 = fs.append(testFilePath)) {
+ }
+ return "Expected exception on new append after closed FS";
+ });
+ }
+
+ @Test(timeout = TEST_EXECUTION_TIMEOUT)
+ public void testAcquireRetry() throws Exception {
+ final Path testFilePath = new Path(path(methodName.getMethodName()), TEST_FILE);
+ final AzureBlobFileSystem fs = getCustomFileSystem(testFilePath.getParent(), 1);
+ fs.mkdirs(testFilePath.getParent());
+ fs.createNewFile(testFilePath);
+
+ AbfsLease lease = new AbfsLease(fs.getAbfsClient(), testFilePath.toUri().getPath());
+ Assert.assertNotNull("Did not successfully lease file", lease.getLeaseID());
+ lease.free();
+ Assert.assertEquals("Unexpected acquire retry count", 0, lease.getAcquireRetryCount());
+
+ AbfsClient mockClient = spy(fs.getAbfsClient());
+
+ doThrow(new AbfsLease.LeaseException("failed to acquire 1"))
+ .doThrow(new AbfsLease.LeaseException("failed to acquire 2"))
+ .doCallRealMethod()
+ .when(mockClient).acquireLease(anyString(), anyInt());
+
+ lease = new AbfsLease(mockClient, testFilePath.toUri().getPath(), 5, 1);
+ Assert.assertNotNull("Acquire lease should have retried", lease.getLeaseID());
+ lease.free();
+ Assert.assertEquals("Unexpected acquire retry count", 2, lease.getAcquireRetryCount());
+
+ doThrow(new AbfsLease.LeaseException("failed to acquire"))
+ .when(mockClient).acquireLease(anyString(), anyInt());
+
+ LambdaTestUtils.intercept(AzureBlobFileSystemException.class, () -> {
+ new AbfsLease(mockClient, testFilePath.toUri().getPath(), 5, 1);
+ });
+ }
+}
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/diagnostics/TestConfigurationValidators.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/diagnostics/TestConfigurationValidators.java
index f02eadc..6a02435 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/diagnostics/TestConfigurationValidators.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/diagnostics/TestConfigurationValidators.java
@@ -24,11 +24,14 @@ import org.junit.Test;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidConfigurationValueException;
import org.apache.hadoop.fs.azurebfs.utils.Base64;
-import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.MIN_BUFFER_SIZE;
-import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.MAX_BUFFER_SIZE;
+import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_LEASE_DURATION;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_READ_BUFFER_SIZE;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_WRITE_BUFFER_SIZE;
-
+import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.INFINITE_LEASE_DURATION;
+import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.MAX_BUFFER_SIZE;
+import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.MAX_LEASE_DURATION;
+import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.MIN_BUFFER_SIZE;
+import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.MIN_LEASE_DURATION;
/**
* Test configuration validators.
@@ -59,6 +62,26 @@ public class TestConfigurationValidators extends Assert {
}
@Test
+ public void testIntegerWithOutlierConfigValidator() throws Exception {
+ IntegerConfigurationBasicValidator integerConfigurationValidator = new IntegerConfigurationBasicValidator(
+ INFINITE_LEASE_DURATION, MIN_LEASE_DURATION, MAX_LEASE_DURATION, DEFAULT_LEASE_DURATION, FAKE_KEY,
+ false);
+
+ assertEquals(INFINITE_LEASE_DURATION, (int) integerConfigurationValidator.validate("-1"));
+ assertEquals(DEFAULT_LEASE_DURATION, (int) integerConfigurationValidator.validate(null));
+ assertEquals(MIN_LEASE_DURATION, (int) integerConfigurationValidator.validate("15"));
+ assertEquals(MAX_LEASE_DURATION, (int) integerConfigurationValidator.validate("60"));
+ }
+
+ @Test(expected = InvalidConfigurationValueException.class)
+ public void testIntegerWithOutlierConfigValidatorThrowsIfMissingValidValue() throws Exception {
+ IntegerConfigurationBasicValidator integerConfigurationValidator = new IntegerConfigurationBasicValidator(
+ INFINITE_LEASE_DURATION, MIN_LEASE_DURATION, MAX_LEASE_DURATION, DEFAULT_LEASE_DURATION, FAKE_KEY,
+ true);
+ integerConfigurationValidator.validate("14");
+ }
+
+ @Test
public void testLongConfigValidator() throws Exception {
LongConfigurationBasicValidator longConfigurationValidator = new LongConfigurationBasicValidator(
MIN_BUFFER_SIZE, MAX_BUFFER_SIZE, DEFAULT_WRITE_BUFFER_SIZE, FAKE_KEY, false);
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsOutputStream.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsOutputStream.java
index 1e6b8ef..f4243bc 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsOutputStream.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsOutputStream.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters;
import org.apache.hadoop.conf.Configuration;
import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.ArgumentMatchers.isNull;
import static org.mockito.ArgumentMatchers.refEq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -86,7 +87,7 @@ public final class TestAbfsOutputStream {
AbfsPerfTracker tracker = new AbfsPerfTracker("test", accountName1, abfsConf);
when(client.getAbfsPerfTracker()).thenReturn(tracker);
when(client.append(anyString(), any(byte[].class), any(AppendRequestParameters.class), any())).thenReturn(op);
- when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any())).thenReturn(op);
+ when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any(), isNull())).thenReturn(op);
AbfsOutputStream out = new AbfsOutputStream(client, null, PATH, 0,
populateAbfsOutputStreamContext(BUFFER_SIZE, true, false, false));
@@ -104,9 +105,9 @@ public final class TestAbfsOutputStream {
out.hsync();
AppendRequestParameters firstReqParameters = new AppendRequestParameters(
- 0, 0, WRITE_SIZE, APPEND_MODE, false);
+ 0, 0, WRITE_SIZE, APPEND_MODE, false, null);
AppendRequestParameters secondReqParameters = new AppendRequestParameters(
- WRITE_SIZE, 0, 2 * WRITE_SIZE, APPEND_MODE, false);
+ WRITE_SIZE, 0, 2 * WRITE_SIZE, APPEND_MODE, false, null);
verify(client, times(1)).append(
eq(PATH), any(byte[].class), refEq(firstReqParameters), any());
@@ -133,7 +134,7 @@ public final class TestAbfsOutputStream {
when(client.getAbfsPerfTracker()).thenReturn(tracker);
when(client.append(anyString(), any(byte[].class), any(AppendRequestParameters.class), any())).thenReturn(op);
- when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any())).thenReturn(op);
+ when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any(), isNull())).thenReturn(op);
AbfsOutputStream out = new AbfsOutputStream(client, null, PATH, 0,
populateAbfsOutputStreamContext(BUFFER_SIZE, true, false, false));
@@ -146,9 +147,9 @@ public final class TestAbfsOutputStream {
out.close();
AppendRequestParameters firstReqParameters = new AppendRequestParameters(
- 0, 0, BUFFER_SIZE, APPEND_MODE, false);
+ 0, 0, BUFFER_SIZE, APPEND_MODE, false, null);
AppendRequestParameters secondReqParameters = new AppendRequestParameters(
- BUFFER_SIZE, 0, 5*WRITE_SIZE-BUFFER_SIZE, APPEND_MODE, false);
+ BUFFER_SIZE, 0, 5*WRITE_SIZE-BUFFER_SIZE, APPEND_MODE, false, null);
verify(client, times(1)).append(
eq(PATH), any(byte[].class), refEq(firstReqParameters), any());
@@ -165,7 +166,7 @@ public final class TestAbfsOutputStream {
ArgumentCaptor<String> acFlushSASToken = ArgumentCaptor.forClass(String.class);
verify(client, times(1)).flush(acFlushPath.capture(), acFlushPosition.capture(), acFlushRetainUnCommittedData.capture(), acFlushClose.capture(),
- acFlushSASToken.capture());
+ acFlushSASToken.capture(), isNull());
assertThat(Arrays.asList(PATH)).describedAs("path").isEqualTo(acFlushPath.getAllValues());
assertThat(Arrays.asList(Long.valueOf(5*WRITE_SIZE))).describedAs("position").isEqualTo(acFlushPosition.getAllValues());
assertThat(Arrays.asList(false)).describedAs("RetainUnCommittedData flag").isEqualTo(acFlushRetainUnCommittedData.getAllValues());
@@ -189,7 +190,7 @@ public final class TestAbfsOutputStream {
when(client.getAbfsPerfTracker()).thenReturn(tracker);
when(client.append(anyString(), any(byte[].class), any(AppendRequestParameters.class), any())).thenReturn(op);
- when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any())).thenReturn(op);
+ when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any(), isNull())).thenReturn(op);
when(op.getSasToken()).thenReturn("testToken");
when(op.getResult()).thenReturn(httpOp);
@@ -204,9 +205,9 @@ public final class TestAbfsOutputStream {
out.close();
AppendRequestParameters firstReqParameters = new AppendRequestParameters(
- 0, 0, BUFFER_SIZE, APPEND_MODE, false);
+ 0, 0, BUFFER_SIZE, APPEND_MODE, false, null);
AppendRequestParameters secondReqParameters = new AppendRequestParameters(
- BUFFER_SIZE, 0, BUFFER_SIZE, APPEND_MODE, false);
+ BUFFER_SIZE, 0, BUFFER_SIZE, APPEND_MODE, false, null);
verify(client, times(1)).append(
eq(PATH), any(byte[].class), refEq(firstReqParameters), any());
@@ -223,7 +224,7 @@ public final class TestAbfsOutputStream {
ArgumentCaptor<String> acFlushSASToken = ArgumentCaptor.forClass(String.class);
verify(client, times(1)).flush(acFlushPath.capture(), acFlushPosition.capture(), acFlushRetainUnCommittedData.capture(), acFlushClose.capture(),
- acFlushSASToken.capture());
+ acFlushSASToken.capture(), isNull());
assertThat(Arrays.asList(PATH)).describedAs("path").isEqualTo(acFlushPath.getAllValues());
assertThat(Arrays.asList(Long.valueOf(2*BUFFER_SIZE))).describedAs("position").isEqualTo(acFlushPosition.getAllValues());
assertThat(Arrays.asList(false)).describedAs("RetainUnCommittedData flag").isEqualTo(acFlushRetainUnCommittedData.getAllValues());
@@ -247,7 +248,7 @@ public final class TestAbfsOutputStream {
when(client.getAbfsPerfTracker()).thenReturn(tracker);
when(client.append(anyString(), any(byte[].class), any(AppendRequestParameters.class), any())).thenReturn(op);
- when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any())).thenReturn(op);
+ when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any(), isNull())).thenReturn(op);
when(op.getSasToken()).thenReturn("testToken");
when(op.getResult()).thenReturn(httpOp);
@@ -262,9 +263,9 @@ public final class TestAbfsOutputStream {
Thread.sleep(1000);
AppendRequestParameters firstReqParameters = new AppendRequestParameters(
- 0, 0, BUFFER_SIZE, APPEND_MODE, false);
+ 0, 0, BUFFER_SIZE, APPEND_MODE, false, null);
AppendRequestParameters secondReqParameters = new AppendRequestParameters(
- BUFFER_SIZE, 0, BUFFER_SIZE, APPEND_MODE, false);
+ BUFFER_SIZE, 0, BUFFER_SIZE, APPEND_MODE, false, null);
verify(client, times(1)).append(
eq(PATH), any(byte[].class), refEq(firstReqParameters), any());
@@ -291,7 +292,7 @@ public final class TestAbfsOutputStream {
when(client.getAbfsPerfTracker()).thenReturn(tracker);
when(client.append(anyString(), any(byte[].class), any(AppendRequestParameters.class), any())).thenReturn(op);
- when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any())).thenReturn(op);
+ when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any(), isNull())).thenReturn(op);
AbfsOutputStream out = new AbfsOutputStream(client, null, PATH, 0,
populateAbfsOutputStreamContext(BUFFER_SIZE, true, false, true));
@@ -304,9 +305,9 @@ public final class TestAbfsOutputStream {
Thread.sleep(1000);
AppendRequestParameters firstReqParameters = new AppendRequestParameters(
- 0, 0, BUFFER_SIZE, APPEND_MODE, true);
+ 0, 0, BUFFER_SIZE, APPEND_MODE, true, null);
AppendRequestParameters secondReqParameters = new AppendRequestParameters(
- BUFFER_SIZE, 0, BUFFER_SIZE, APPEND_MODE, true);
+ BUFFER_SIZE, 0, BUFFER_SIZE, APPEND_MODE, true, null);
verify(client, times(1)).append(
eq(PATH), any(byte[].class), refEq(firstReqParameters), any());
@@ -334,7 +335,7 @@ public final class TestAbfsOutputStream {
when(client.getAbfsPerfTracker()).thenReturn(tracker);
when(client.append(anyString(), any(byte[].class), any(AppendRequestParameters.class), any())).thenReturn(op);
- when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any())).thenReturn(op);
+ when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any(), isNull())).thenReturn(op);
AbfsOutputStream out = new AbfsOutputStream(client, null, PATH, 0,
populateAbfsOutputStreamContext(BUFFER_SIZE, true, false, false));
@@ -347,9 +348,9 @@ public final class TestAbfsOutputStream {
out.hflush();
AppendRequestParameters firstReqParameters = new AppendRequestParameters(
- 0, 0, BUFFER_SIZE, APPEND_MODE, false);
+ 0, 0, BUFFER_SIZE, APPEND_MODE, false, null);
AppendRequestParameters secondReqParameters = new AppendRequestParameters(
- BUFFER_SIZE, 0, BUFFER_SIZE, APPEND_MODE, false);
+ BUFFER_SIZE, 0, BUFFER_SIZE, APPEND_MODE, false, null);
verify(client, times(1)).append(
eq(PATH), any(byte[].class), refEq(firstReqParameters), any());
@@ -366,7 +367,7 @@ public final class TestAbfsOutputStream {
ArgumentCaptor<String> acFlushSASToken = ArgumentCaptor.forClass(String.class);
verify(client, times(1)).flush(acFlushPath.capture(), acFlushPosition.capture(), acFlushRetainUnCommittedData.capture(), acFlushClose.capture(),
- acFlushSASToken.capture());
+ acFlushSASToken.capture(), isNull());
assertThat(Arrays.asList(PATH)).describedAs("path").isEqualTo(acFlushPath.getAllValues());
assertThat(Arrays.asList(Long.valueOf(2*BUFFER_SIZE))).describedAs("position").isEqualTo(acFlushPosition.getAllValues());
assertThat(Arrays.asList(false)).describedAs("RetainUnCommittedData flag").isEqualTo(acFlushRetainUnCommittedData.getAllValues());
@@ -388,7 +389,7 @@ public final class TestAbfsOutputStream {
AbfsPerfTracker tracker = new AbfsPerfTracker("test", accountName1, abfsConf);
when(client.getAbfsPerfTracker()).thenReturn(tracker);
when(client.append(anyString(), any(byte[].class), any(AppendRequestParameters.class), any())).thenReturn(op);
- when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any())).thenReturn(op);
+ when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any(), isNull())).thenReturn(op);
AbfsOutputStream out = new AbfsOutputStream(client, null, PATH, 0,
populateAbfsOutputStreamContext(BUFFER_SIZE, true, false, false));
@@ -403,9 +404,9 @@ public final class TestAbfsOutputStream {
Thread.sleep(1000);
AppendRequestParameters firstReqParameters = new AppendRequestParameters(
- 0, 0, BUFFER_SIZE, APPEND_MODE, false);
+ 0, 0, BUFFER_SIZE, APPEND_MODE, false, null);
AppendRequestParameters secondReqParameters = new AppendRequestParameters(
- BUFFER_SIZE, 0, BUFFER_SIZE, APPEND_MODE, false);
+ BUFFER_SIZE, 0, BUFFER_SIZE, APPEND_MODE, false, null);
verify(client, times(1)).append(
eq(PATH), any(byte[].class), refEq(firstReqParameters), any());
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org