You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by st...@apache.org on 2021/04/27 10:16:39 UTC
[hadoop] branch trunk updated: HADOOP-17536. ABFS: Supporting
customer provided encryption key (#2707)
This is an automated email from the ASF dual-hosted git repository.
stevel pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new f54e764 HADOOP-17536. ABFS: Supporting customer provided encryption key (#2707)
f54e764 is described below
commit f54e7646cf5fa4bd8498cab824e4693a5d1a9717
Author: bilaharith <52...@users.noreply.github.com>
AuthorDate: Tue Apr 27 15:45:52 2021 +0530
HADOOP-17536. ABFS: Supporting customer provided encryption key (#2707)
Contributed by bilahari t h
---
.../hadoop/fs/azurebfs/AbfsConfiguration.java | 5 +
.../fs/azurebfs/constants/ConfigurationKeys.java | 2 +
.../constants/FileSystemConfigurations.java | 2 +
.../constants/HttpHeaderConfigurations.java | 5 +
.../hadoop/fs/azurebfs/services/AbfsClient.java | 66 +-
.../fs/azurebfs/ITestCustomerProvidedKey.java | 936 +++++++++++++++++++++
.../azurebfs/constants/TestConfigurationKeys.java | 4 +
.../fs/azurebfs/services/TestAbfsClient.java | 6 +-
8 files changed, 1019 insertions(+), 7 deletions(-)
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
index 0a8224a..1c4a09b 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
@@ -948,6 +948,11 @@ public class AbfsConfiguration{
return this.enableAbfsListIterator;
}
+ public String getClientProvidedEncryptionKey() {
+ String accSpecEncKey = accountConf(FS_AZURE_CLIENT_PROVIDED_ENCRYPTION_KEY);
+ return rawConfig.get(accSpecEncKey, null);
+ }
+
@VisibleForTesting
void setReadBufferSize(int bufferSize) {
this.readBufferSize = bufferSize;
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
index 4fe1d1c..2dbb2b9 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
@@ -143,6 +143,8 @@ public final class ConfigurationKeys {
public static final String AZURE_KEY_ACCOUNT_SHELLKEYPROVIDER_SCRIPT = "fs.azure.shellkeyprovider.script";
/** Setting this true will make the driver use it's own RemoteIterator implementation */
public static final String FS_AZURE_ENABLE_ABFS_LIST_ITERATOR = "fs.azure.enable.abfslistiterator";
+ /** Server side encryption key */
+ public static final String FS_AZURE_CLIENT_PROVIDED_ENCRYPTION_KEY = "fs.azure.client-provided-encryption-key";
/** End point of ABFS account: {@value}. */
public static final String AZURE_ABFS_ENDPOINT = "fs.azure.abfs.endpoint";
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
index 040b18a..dc4caa9 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
@@ -68,6 +68,8 @@ public final class FileSystemConfigurations {
public static final String AZURE_BLOCK_LOCATION_HOST_DEFAULT = "localhost";
public static final int DEFAULT_AZURE_LIST_MAX_RESULTS = 5000;
+ public static final String SERVER_SIDE_ENCRYPTION_ALGORITHM = "AES256";
+
public static final int MAX_CONCURRENT_READ_THREADS = 12;
public static final int MAX_CONCURRENT_WRITE_THREADS = 8;
public static final boolean DEFAULT_READ_TOLERATE_CONCURRENT_APPEND = false;
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/HttpHeaderConfigurations.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/HttpHeaderConfigurations.java
index 2325538..d4065ac 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/HttpHeaderConfigurations.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/HttpHeaderConfigurations.java
@@ -60,6 +60,11 @@ public final class HttpHeaderConfigurations {
public static final String X_MS_UMASK = "x-ms-umask";
public static final String X_MS_NAMESPACE_ENABLED = "x-ms-namespace-enabled";
public static final String X_MS_ABFS_CLIENT_LATENCY = "x-ms-abfs-client-latency";
+ public static final String X_MS_ENCRYPTION_KEY = "x-ms-encryption-key";
+ public static final String X_MS_ENCRYPTION_KEY_SHA256 = "x-ms-encryption-key-sha256";
+ public static final String X_MS_ENCRYPTION_ALGORITHM = "x-ms-encryption-algorithm";
+ public static final String X_MS_REQUEST_SERVER_ENCRYPTED = "x-ms-request-server-encrypted";
+ public static final String X_MS_SERVER_ENCRYPTED = "x-ms-server-encrypted";
public static final String X_MS_LEASE_ACTION = "x-ms-lease-action";
public static final String X_MS_LEASE_DURATION = "x-ms-lease-duration";
public static final String X_MS_LEASE_ID = "x-ms-lease-id";
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
index 7c8a211..c5c218d 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
@@ -25,8 +25,12 @@ import java.net.HttpURLConnection;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLEncoder;
+import java.nio.charset.StandardCharsets;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
import java.time.Instant;
import java.util.ArrayList;
+import java.util.Base64;
import java.util.List;
import java.util.Locale;
import java.util.UUID;
@@ -65,6 +69,7 @@ import org.apache.hadoop.util.concurrent.HadoopExecutors;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.*;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_DELETE_CONSIDERED_IDEMPOTENT;
+import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.SERVER_SIDE_ENCRYPTION_ALGORITHM;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes.HTTPS_SCHEME;
import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.*;
import static org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.*;
@@ -74,6 +79,7 @@ import static org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.*;
*/
public class AbfsClient implements Closeable {
public static final Logger LOG = LoggerFactory.getLogger(AbfsClient.class);
+
private final URL baseUrl;
private final SharedKeyCredentials sharedKeyCredentials;
private final String xMsVersion = "2019-12-12";
@@ -82,6 +88,8 @@ public class AbfsClient implements Closeable {
private final AbfsConfiguration abfsConfiguration;
private final String userAgent;
private final AbfsPerfTracker abfsPerfTracker;
+ private final String clientProvidedEncryptionKey;
+ private final String clientProvidedEncryptionKeySHA;
private final String accountName;
private final AuthType authType;
@@ -93,7 +101,8 @@ public class AbfsClient implements Closeable {
private AbfsClient(final URL baseUrl, final SharedKeyCredentials sharedKeyCredentials,
final AbfsConfiguration abfsConfiguration,
- final AbfsClientContext abfsClientContext) {
+ final AbfsClientContext abfsClientContext)
+ throws IOException {
this.baseUrl = baseUrl;
this.sharedKeyCredentials = sharedKeyCredentials;
String baseUrlString = baseUrl.toString();
@@ -103,6 +112,17 @@ public class AbfsClient implements Closeable {
this.accountName = abfsConfiguration.getAccountName().substring(0, abfsConfiguration.getAccountName().indexOf(AbfsHttpConstants.DOT));
this.authType = abfsConfiguration.getAuthType(accountName);
+ String encryptionKey = this.abfsConfiguration
+ .getClientProvidedEncryptionKey();
+ if (encryptionKey != null) {
+ this.clientProvidedEncryptionKey = getBase64EncodedString(encryptionKey);
+ this.clientProvidedEncryptionKeySHA = getBase64EncodedString(
+ getSHA256Hash(encryptionKey));
+ } else {
+ this.clientProvidedEncryptionKey = null;
+ this.clientProvidedEncryptionKeySHA = null;
+ }
+
String sslProviderName = null;
if (this.baseUrl.toString().startsWith(HTTPS_SCHEME)) {
@@ -131,7 +151,8 @@ public class AbfsClient implements Closeable {
public AbfsClient(final URL baseUrl, final SharedKeyCredentials sharedKeyCredentials,
final AbfsConfiguration abfsConfiguration,
final AccessTokenProvider tokenProvider,
- final AbfsClientContext abfsClientContext) {
+ final AbfsClientContext abfsClientContext)
+ throws IOException {
this(baseUrl, sharedKeyCredentials, abfsConfiguration, abfsClientContext);
this.tokenProvider = tokenProvider;
}
@@ -139,11 +160,29 @@ public class AbfsClient implements Closeable {
public AbfsClient(final URL baseUrl, final SharedKeyCredentials sharedKeyCredentials,
final AbfsConfiguration abfsConfiguration,
final SASTokenProvider sasTokenProvider,
- final AbfsClientContext abfsClientContext) {
+ final AbfsClientContext abfsClientContext)
+ throws IOException {
this(baseUrl, sharedKeyCredentials, abfsConfiguration, abfsClientContext);
this.sasTokenProvider = sasTokenProvider;
}
+ private byte[] getSHA256Hash(String key) throws IOException {
+ try {
+ final MessageDigest digester = MessageDigest.getInstance("SHA-256");
+ return digester.digest(key.getBytes(StandardCharsets.UTF_8));
+ } catch (NoSuchAlgorithmException e) {
+ throw new IOException(e);
+ }
+ }
+
+ private String getBase64EncodedString(String key) {
+ return getBase64EncodedString(key.getBytes(StandardCharsets.UTF_8));
+ }
+
+ private String getBase64EncodedString(byte[] bytes) {
+ return Base64.getEncoder().encodeToString(bytes);
+ }
+
@Override
public void close() throws IOException {
if (tokenProvider instanceof Closeable) {
@@ -180,6 +219,18 @@ public class AbfsClient implements Closeable {
return requestHeaders;
}
+ private void addCustomerProvidedKeyHeaders(
+ final List<AbfsHttpHeader> requestHeaders) {
+ if (clientProvidedEncryptionKey != null) {
+ requestHeaders.add(
+ new AbfsHttpHeader(X_MS_ENCRYPTION_KEY, clientProvidedEncryptionKey));
+ requestHeaders.add(new AbfsHttpHeader(X_MS_ENCRYPTION_KEY_SHA256,
+ clientProvidedEncryptionKeySHA));
+ requestHeaders.add(new AbfsHttpHeader(X_MS_ENCRYPTION_ALGORITHM,
+ SERVER_SIDE_ENCRYPTION_ALGORITHM));
+ }
+ }
+
AbfsUriQueryBuilder createDefaultUriQueryBuilder() {
final AbfsUriQueryBuilder abfsUriQueryBuilder = new AbfsUriQueryBuilder();
abfsUriQueryBuilder.addQuery(QUERY_PARAM_TIMEOUT, DEFAULT_TIMEOUT);
@@ -289,6 +340,9 @@ public class AbfsClient implements Closeable {
final String permission, final String umask,
final boolean isAppendBlob, final String eTag) throws AzureBlobFileSystemException {
final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
+ if (isFile) {
+ addCustomerProvidedKeyHeaders(requestHeaders);
+ }
if (!overwrite) {
requestHeaders.add(new AbfsHttpHeader(IF_NONE_MATCH, AbfsHttpConstants.STAR));
}
@@ -510,6 +564,7 @@ public class AbfsClient implements Closeable {
AppendRequestParameters reqParams, final String cachedSasToken)
throws AzureBlobFileSystemException {
final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
+ addCustomerProvidedKeyHeaders(requestHeaders);
// JDK7 does not support PATCH, so to workaround the issue we will use
// PUT and specify the real method in the X-Http-Method-Override header.
requestHeaders.add(new AbfsHttpHeader(X_HTTP_METHOD_OVERRIDE,
@@ -596,6 +651,7 @@ public class AbfsClient implements Closeable {
boolean isClose, final String cachedSasToken, final String leaseId)
throws AzureBlobFileSystemException {
final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
+ addCustomerProvidedKeyHeaders(requestHeaders);
// JDK7 does not support PATCH, so to workaround the issue we will use
// PUT and specify the real method in the X-Http-Method-Override header.
requestHeaders.add(new AbfsHttpHeader(X_HTTP_METHOD_OVERRIDE,
@@ -627,6 +683,7 @@ public class AbfsClient implements Closeable {
public AbfsRestOperation setPathProperties(final String path, final String properties)
throws AzureBlobFileSystemException {
final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
+ addCustomerProvidedKeyHeaders(requestHeaders);
// JDK7 does not support PATCH, so to workaround the issue we will use
// PUT and specify the real method in the X-Http-Method-Override header.
requestHeaders.add(new AbfsHttpHeader(X_HTTP_METHOD_OVERRIDE,
@@ -660,6 +717,8 @@ public class AbfsClient implements Closeable {
// only traversal (execute) permission is required.
abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_ACTION, AbfsHttpConstants.GET_STATUS);
operation = SASTokenProvider.GET_STATUS_OPERATION;
+ } else {
+ addCustomerProvidedKeyHeaders(requestHeaders);
}
abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_UPN, String.valueOf(abfsConfiguration.isUpnUsed()));
appendSASTokenToQuery(path, operation, abfsUriQueryBuilder);
@@ -678,6 +737,7 @@ public class AbfsClient implements Closeable {
public AbfsRestOperation read(final String path, final long position, final byte[] buffer, final int bufferOffset,
final int bufferLength, final String eTag, String cachedSasToken) throws AzureBlobFileSystemException {
final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
+ addCustomerProvidedKeyHeaders(requestHeaders);
requestHeaders.add(new AbfsHttpHeader(RANGE,
String.format("bytes=%d-%d", position, position + bufferLength - 1)));
requestHeaders.add(new AbfsHttpHeader(IF_MATCH, eTag));
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestCustomerProvidedKey.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestCustomerProvidedKey.java
new file mode 100644
index 0000000..9229905
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestCustomerProvidedKey.java
@@ -0,0 +1,936 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 ("License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.CharBuffer;
+import java.nio.charset.CharacterCodingException;
+import java.nio.charset.Charset;
+import java.nio.charset.CharsetEncoder;
+import java.nio.charset.StandardCharsets;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.EnumSet;
+import java.util.Hashtable;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Random;
+
+import org.apache.hadoop.fs.contract.ContractTestUtils;
+import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
+import org.apache.hadoop.thirdparty.com.google.common.collect.Lists;
+import org.assertj.core.api.Assertions;
+import org.junit.Assume;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.XAttrSetFlag;
+import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants;
+import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
+import org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters;
+import org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters.Mode;
+import org.apache.hadoop.fs.azurebfs.services.AbfsAclHelper;
+import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
+import org.apache.hadoop.fs.azurebfs.services.AbfsHttpHeader;
+import org.apache.hadoop.fs.azurebfs.services.AbfsHttpOperation;
+import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation;
+import org.apache.hadoop.fs.azurebfs.services.AuthType;
+import org.apache.hadoop.fs.azurebfs.utils.Base64;
+import org.apache.hadoop.fs.permission.AclEntry;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.test.LambdaTestUtils;
+
+import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_CREATE_REMOTE_FILESYSTEM_DURING_INITIALIZATION;
+import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_CLIENT_PROVIDED_ENCRYPTION_KEY;
+import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_MB;
+import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_ENCRYPTION_ALGORITHM;
+import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_ENCRYPTION_KEY;
+import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_ENCRYPTION_KEY_SHA256;
+import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_REQUEST_SERVER_ENCRYPTED;
+import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_SERVER_ENCRYPTED;
+import static org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.FS_AZURE_ABFS_ACCOUNT_NAME;
+import static org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.FS_AZURE_ACCOUNT_KEY;
+import static org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.FS_AZURE_TEST_CPK_ENABLED;
+import static org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.FS_AZURE_TEST_CPK_ENABLED_SECONDARY_ACCOUNT;
+import static org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.FS_AZURE_TEST_CPK_ENABLED_SECONDARY_ACCOUNT_KEY;
+import static org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.FS_AZURE_TEST_NAMESPACE_ENABLED_ACCOUNT;
+import static org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.TEST_CONFIGURATION_FILE_NAME;
+import static org.apache.hadoop.fs.azurebfs.utils.AclTestHelpers.aclEntry;
+import static org.apache.hadoop.fs.permission.AclEntryScope.ACCESS;
+import static org.apache.hadoop.fs.permission.AclEntryType.USER;
+import static org.apache.hadoop.fs.permission.FsAction.ALL;
+
+public class ITestCustomerProvidedKey extends AbstractAbfsIntegrationTest {
+ private static final Logger LOG = LoggerFactory
+ .getLogger(ITestCustomerProvidedKey.class);
+
+ private static final String XMS_PROPERTIES_ENCODING = "ISO-8859-1";
+ private static final int INT_512 = 512;
+ private static final int INT_50 = 50;
+ private static final int ENCRYPTION_KEY_LEN = 32;
+ private static final int FILE_SIZE = 10 * ONE_MB;
+ private static final int FILE_SIZE_FOR_COPY_BETWEEN_ACCOUNTS = 24 * ONE_MB;
+
+ public ITestCustomerProvidedKey() throws Exception {
+ boolean isCPKTestsEnabled = getConfiguration()
+ .getBoolean(FS_AZURE_TEST_CPK_ENABLED, false);
+ Assume.assumeTrue(isCPKTestsEnabled);
+ }
+
+ @Test
+ public void testReadWithCPK() throws Exception {
+ final AzureBlobFileSystem fs = getAbfs(true);
+ String fileName = "/" + methodName.getMethodName();
+ createFileAndGetContent(fs, fileName, FILE_SIZE);
+
+ AbfsClient abfsClient = fs.getAbfsClient();
+ int length = FILE_SIZE;
+ byte[] buffer = new byte[length];
+ final AbfsRestOperation op = abfsClient.getPathStatus(fileName, false);
+ final String eTag = op.getResult()
+ .getResponseHeader(HttpHeaderConfigurations.ETAG);
+ AbfsRestOperation abfsRestOperation = abfsClient
+ .read(fileName, 0, buffer, 0, length, eTag, null);
+ assertCPKHeaders(abfsRestOperation, true);
+ assertResponseHeader(abfsRestOperation, true, X_MS_ENCRYPTION_KEY_SHA256,
+ getCPKSha(fs));
+ assertResponseHeader(abfsRestOperation, true, X_MS_SERVER_ENCRYPTED,
+ "true");
+ assertResponseHeader(abfsRestOperation, false,
+ X_MS_REQUEST_SERVER_ENCRYPTED, "");
+
+ // Trying to read with different CPK headers
+ Configuration conf = fs.getConf();
+ String accountName = conf.get(FS_AZURE_ABFS_ACCOUNT_NAME);
+ conf.set(FS_AZURE_CLIENT_PROVIDED_ENCRYPTION_KEY + "." + accountName,
+ "different-1234567890123456789012");
+ try (AzureBlobFileSystem fs2 = (AzureBlobFileSystem) FileSystem.newInstance(conf);
+ FSDataInputStream iStream = fs2.open(new Path(fileName))) {
+ int len = 8 * ONE_MB;
+ byte[] b = new byte[len];
+ LambdaTestUtils.intercept(IOException.class, () -> {
+ iStream.read(b, 0, len);
+ });
+ }
+
+ // Trying to read with no CPK headers
+ conf.unset(FS_AZURE_CLIENT_PROVIDED_ENCRYPTION_KEY + "." + accountName);
+ try (AzureBlobFileSystem fs3 = (AzureBlobFileSystem) FileSystem
+ .get(conf); FSDataInputStream iStream = fs3.open(new Path(fileName))) {
+ int len = 8 * ONE_MB;
+ byte[] b = new byte[len];
+ LambdaTestUtils.intercept(IOException.class, () -> {
+ iStream.read(b, 0, len);
+ });
+ }
+ }
+
+ @Test
+ public void testReadWithoutCPK() throws Exception {
+ final AzureBlobFileSystem fs = getAbfs(false);
+ String fileName = "/" + methodName.getMethodName();
+ createFileAndGetContent(fs, fileName, FILE_SIZE);
+
+ AbfsClient abfsClient = fs.getAbfsClient();
+ int length = INT_512;
+ byte[] buffer = new byte[length * 4];
+ final AbfsRestOperation op = abfsClient.getPathStatus(fileName, false);
+ final String eTag = op.getResult()
+ .getResponseHeader(HttpHeaderConfigurations.ETAG);
+ AbfsRestOperation abfsRestOperation = abfsClient
+ .read(fileName, 0, buffer, 0, length, eTag, null);
+ assertCPKHeaders(abfsRestOperation, false);
+ assertResponseHeader(abfsRestOperation, false, X_MS_ENCRYPTION_KEY_SHA256,
+ getCPKSha(fs));
+ assertResponseHeader(abfsRestOperation, true, X_MS_SERVER_ENCRYPTED,
+ "true");
+ assertResponseHeader(abfsRestOperation, false,
+ X_MS_REQUEST_SERVER_ENCRYPTED, "");
+
+ // Trying to read with CPK headers
+ Configuration conf = fs.getConf();
+ String accountName = conf.get(FS_AZURE_ABFS_ACCOUNT_NAME);
+ conf.set(FS_AZURE_CLIENT_PROVIDED_ENCRYPTION_KEY + "." + accountName,
+ "12345678901234567890123456789012");
+
+ try (AzureBlobFileSystem fs2 = (AzureBlobFileSystem) FileSystem.newInstance(conf);
+ AbfsClient abfsClient2 = fs2.getAbfsClient()) {
+ LambdaTestUtils.intercept(IOException.class, () -> {
+ abfsClient2.read(fileName, 0, buffer, 0, length, eTag, null);
+ });
+ }
+ }
+
+ @Test
+ public void testAppendWithCPK() throws Exception {
+ final AzureBlobFileSystem fs = getAbfs(true);
+ final String fileName = "/" + methodName.getMethodName();
+ createFileAndGetContent(fs, fileName, FILE_SIZE);
+
+ // Trying to append with correct CPK headers
+ AppendRequestParameters appendRequestParameters =
+ new AppendRequestParameters(
+ 0, 0, 5, Mode.APPEND_MODE, false, null);
+ byte[] buffer = getRandomBytesArray(5);
+ AbfsClient abfsClient = fs.getAbfsClient();
+ AbfsRestOperation abfsRestOperation = abfsClient
+ .append(fileName, buffer, appendRequestParameters, null);
+ assertCPKHeaders(abfsRestOperation, true);
+ assertResponseHeader(abfsRestOperation, true, X_MS_ENCRYPTION_KEY_SHA256,
+ getCPKSha(fs));
+ assertResponseHeader(abfsRestOperation, false, X_MS_SERVER_ENCRYPTED, "");
+ assertResponseHeader(abfsRestOperation, true, X_MS_REQUEST_SERVER_ENCRYPTED,
+ "true");
+
+ // Trying to append with different CPK headers
+ Configuration conf = fs.getConf();
+ String accountName = conf.get(FS_AZURE_ABFS_ACCOUNT_NAME);
+ conf.set(FS_AZURE_CLIENT_PROVIDED_ENCRYPTION_KEY + "." + accountName,
+ "different-1234567890123456789012");
+ try (AzureBlobFileSystem fs2 = (AzureBlobFileSystem) FileSystem.newInstance(conf);
+ AbfsClient abfsClient2 = fs2.getAbfsClient()) {
+ LambdaTestUtils.intercept(IOException.class, () -> {
+ abfsClient2.append(fileName, buffer, appendRequestParameters, null);
+ });
+ }
+
+ // Trying to append with no CPK headers
+ conf.unset(FS_AZURE_CLIENT_PROVIDED_ENCRYPTION_KEY + "." + accountName);
+ try (AzureBlobFileSystem fs3 = (AzureBlobFileSystem) FileSystem
+ .get(conf); AbfsClient abfsClient3 = fs3.getAbfsClient()) {
+ LambdaTestUtils.intercept(IOException.class, () -> {
+ abfsClient3.append(fileName, buffer, appendRequestParameters, null);
+ });
+ }
+ }
+
+ @Test
+ public void testAppendWithoutCPK() throws Exception {
+ final AzureBlobFileSystem fs = getAbfs(false);
+ final String fileName = "/" + methodName.getMethodName();
+ createFileAndGetContent(fs, fileName, FILE_SIZE);
+
+ // Trying to append without CPK headers
+ AppendRequestParameters appendRequestParameters =
+ new AppendRequestParameters(
+ 0, 0, 5, Mode.APPEND_MODE, false, null);
+ byte[] buffer = getRandomBytesArray(5);
+ AbfsClient abfsClient = fs.getAbfsClient();
+ AbfsRestOperation abfsRestOperation = abfsClient
+ .append(fileName, buffer, appendRequestParameters, null);
+ assertCPKHeaders(abfsRestOperation, false);
+ assertResponseHeader(abfsRestOperation, false, X_MS_ENCRYPTION_KEY_SHA256,
+ "");
+ assertResponseHeader(abfsRestOperation, false, X_MS_SERVER_ENCRYPTED, "");
+ assertResponseHeader(abfsRestOperation, true, X_MS_REQUEST_SERVER_ENCRYPTED,
+ "true");
+
+ // Trying to append with CPK headers
+ Configuration conf = fs.getConf();
+ String accountName = conf.get(FS_AZURE_ABFS_ACCOUNT_NAME);
+ conf.set(FS_AZURE_CLIENT_PROVIDED_ENCRYPTION_KEY + "." + accountName,
+ "12345678901234567890123456789012");
+ try (AzureBlobFileSystem fs2 = (AzureBlobFileSystem) FileSystem.newInstance(conf);
+ AbfsClient abfsClient2 = fs2.getAbfsClient()) {
+ LambdaTestUtils.intercept(IOException.class, () -> {
+ abfsClient2.append(fileName, buffer, appendRequestParameters, null);
+ });
+ }
+ }
+
+ @Test
+ public void testSetGetXAttr() throws Exception {
+ final AzureBlobFileSystem fs = getAbfs(true);
+ String fileName = methodName.getMethodName();
+ createFileAndGetContent(fs, fileName, FILE_SIZE);
+
+ String valSent = "testValue";
+ String attrName = "testXAttr";
+
+ // set get and verify
+ fs.setXAttr(new Path(fileName), attrName,
+ valSent.getBytes(StandardCharsets.UTF_8),
+ EnumSet.of(XAttrSetFlag.CREATE));
+ byte[] valBytes = fs.getXAttr(new Path(fileName), attrName);
+ String valRecieved = new String(valBytes);
+ assertEquals(valSent, valRecieved);
+
+ // set new value get and verify
+ valSent = "new value";
+ fs.setXAttr(new Path(fileName), attrName,
+ valSent.getBytes(StandardCharsets.UTF_8),
+ EnumSet.of(XAttrSetFlag.REPLACE));
+ valBytes = fs.getXAttr(new Path(fileName), attrName);
+ valRecieved = new String(valBytes);
+ assertEquals(valSent, valRecieved);
+
+ // Read without CPK header
+ LambdaTestUtils.intercept(IOException.class, () -> {
+ getAbfs(false).getXAttr(new Path(fileName), attrName);
+ });
+
+ // Wrong CPK
+ LambdaTestUtils.intercept(IOException.class, () -> {
+ getSameFSWithWrongCPK(fs).getXAttr(new Path(fileName), attrName);
+ });
+ }
+
+ @Test
+ public void testCopyBetweenAccounts() throws Exception {
+ String accountName = getRawConfiguration()
+ .get(FS_AZURE_TEST_CPK_ENABLED_SECONDARY_ACCOUNT);
+ String accountKey = getRawConfiguration()
+ .get(FS_AZURE_TEST_CPK_ENABLED_SECONDARY_ACCOUNT_KEY);
+ Assume.assumeTrue(accountName != null && !accountName.isEmpty());
+ Assume.assumeTrue(accountKey != null && !accountKey.isEmpty());
+ String fileSystemName = "cpkfs";
+
+ // Create fs1 and a file with CPK
+ AzureBlobFileSystem fs1 = getAbfs(true);
+ int fileSize = FILE_SIZE_FOR_COPY_BETWEEN_ACCOUNTS;
+ byte[] fileContent = getRandomBytesArray(fileSize);
+ Path testFilePath = createFileWithContent(fs1, "fs1-file.txt", fileContent);
+
+ // Create fs2 with different CPK
+ Configuration conf = new Configuration();
+ conf.addResource(TEST_CONFIGURATION_FILE_NAME);
+ conf.setBoolean(AZURE_CREATE_REMOTE_FILESYSTEM_DURING_INITIALIZATION, true);
+ conf.unset(FS_AZURE_ABFS_ACCOUNT_NAME);
+ conf.set(FS_AZURE_ABFS_ACCOUNT_NAME, accountName);
+ conf.set(FS_AZURE_ACCOUNT_KEY + "." + accountName, accountKey);
+ conf.set(FS_AZURE_CLIENT_PROVIDED_ENCRYPTION_KEY + "." + accountName,
+ "123456789012345678901234567890ab");
+ conf.set("fs.defaultFS", "abfs://" + fileSystemName + "@" + accountName);
+ AzureBlobFileSystem fs2 = (AzureBlobFileSystem) FileSystem.newInstance(conf);
+
+ // Read from fs1 and write to fs2, fs1 and fs2 are having different CPK
+ Path fs2DestFilePath = new Path("fs2-dest-file.txt");
+ FSDataOutputStream ops = fs2.create(fs2DestFilePath);
+ try (FSDataInputStream iStream = fs1.open(testFilePath)) {
+ long totalBytesRead = 0;
+ do {
+ int length = 8 * ONE_MB;
+ byte[] buffer = new byte[length];
+ int bytesRead = iStream.read(buffer, 0, length);
+ totalBytesRead += bytesRead;
+ ops.write(buffer);
+ } while (totalBytesRead < fileContent.length);
+ ops.close();
+ }
+
+ // Trying to read fs2DestFilePath with different CPK headers
+ conf.unset(FS_AZURE_CLIENT_PROVIDED_ENCRYPTION_KEY + "." + accountName);
+ conf.set(FS_AZURE_CLIENT_PROVIDED_ENCRYPTION_KEY + "." + accountName,
+ "different-1234567890123456789012");
+ try (AzureBlobFileSystem fs3 = (AzureBlobFileSystem) FileSystem
+ .get(conf); FSDataInputStream iStream = fs3.open(fs2DestFilePath)) {
+ int length = 8 * ONE_MB;
+ byte[] buffer = new byte[length];
+ LambdaTestUtils.intercept(IOException.class, () -> {
+ iStream.read(buffer, 0, length);
+ });
+ }
+
+ // Trying to read fs2DestFilePath with no CPK headers
+ conf.unset(FS_AZURE_CLIENT_PROVIDED_ENCRYPTION_KEY + "." + accountName);
+ try (AzureBlobFileSystem fs4 = (AzureBlobFileSystem) FileSystem
+ .get(conf); FSDataInputStream iStream = fs4.open(fs2DestFilePath)) {
+ int length = 8 * ONE_MB;
+ byte[] buffer = new byte[length];
+ LambdaTestUtils.intercept(IOException.class, () -> {
+ iStream.read(buffer, 0, length);
+ });
+ }
+
+ // Read fs2DestFilePath and verify the content with the initial random
+ // bytes created and wrote into the source file at fs1
+ try (FSDataInputStream iStream = fs2.open(fs2DestFilePath)) {
+ long totalBytesRead = 0;
+ int pos = 0;
+ do {
+ int length = 8 * ONE_MB;
+ byte[] buffer = new byte[length];
+ int bytesRead = iStream.read(buffer, 0, length);
+ totalBytesRead += bytesRead;
+ for (int i = 0; i < bytesRead; i++) {
+ assertEquals(fileContent[pos + i], buffer[i]);
+ }
+ pos = pos + bytesRead;
+ } while (totalBytesRead < fileContent.length);
+ }
+ }
+
+ @Test
+ public void testListPathWithCPK() throws Exception {
+ testListPath(true);
+ }
+
+ @Test
+ public void testListPathWithoutCPK() throws Exception {
+ testListPath(false);
+ }
+
+ private void testListPath(final boolean isWithCPK) throws Exception {
+ final AzureBlobFileSystem fs = getAbfs(isWithCPK);
+ String testDirName = "/" + methodName.getMethodName();
+ final Path testPath = new Path(testDirName);
+ fs.mkdirs(testPath);
+ createFileAndGetContent(fs, testDirName + "/aaa", FILE_SIZE);
+ createFileAndGetContent(fs, testDirName + "/bbb", FILE_SIZE);
+ AbfsClient abfsClient = fs.getAbfsClient();
+ AbfsRestOperation abfsRestOperation = abfsClient
+ .listPath(testDirName, false, INT_50, null);
+ assertListstatus(fs, abfsRestOperation, testPath);
+
+ // Trying with different CPK headers
+ Configuration conf = fs.getConf();
+ String accountName = conf.get(FS_AZURE_ABFS_ACCOUNT_NAME);
+ conf.set(FS_AZURE_CLIENT_PROVIDED_ENCRYPTION_KEY + "." + accountName,
+ "different-1234567890123456789012");
+ AzureBlobFileSystem fs2 = (AzureBlobFileSystem) FileSystem.newInstance(conf);
+ AbfsClient abfsClient2 = fs2.getAbfsClient();
+ abfsRestOperation = abfsClient2.listPath(testDirName, false, INT_50, null);
+ assertListstatus(fs, abfsRestOperation, testPath);
+
+ if (isWithCPK) {
+ // Trying with no CPK headers
+ conf.unset(FS_AZURE_CLIENT_PROVIDED_ENCRYPTION_KEY + "." + accountName);
+ AzureBlobFileSystem fs3 = (AzureBlobFileSystem) FileSystem.get(conf);
+ AbfsClient abfsClient3 = fs3.getAbfsClient();
+ abfsRestOperation = abfsClient3
+ .listPath(testDirName, false, INT_50, null);
+ assertListstatus(fs, abfsRestOperation, testPath);
+ }
+ }
+
+ private void assertListstatus(AzureBlobFileSystem fs,
+ AbfsRestOperation abfsRestOperation, Path testPath) throws IOException {
+ assertCPKHeaders(abfsRestOperation, false);
+ assertNoCPKResponseHeadersPresent(abfsRestOperation);
+
+ FileStatus[] listStatuses = fs.listStatus(testPath);
+ Assertions.assertThat(listStatuses.length)
+ .describedAs("listStatuses should have 2 entries").isEqualTo(2);
+
+ listStatuses = getSameFSWithWrongCPK(fs).listStatus(testPath);
+ Assertions.assertThat(listStatuses.length)
+ .describedAs("listStatuses should have 2 entries").isEqualTo(2);
+ }
+
+ @Test
+ public void testCreatePathWithCPK() throws Exception {
+ testCreatePath(true);
+ }
+
+ @Test
+ public void testCreatePathWithoutCPK() throws Exception {
+ testCreatePath(false);
+ }
+
+ private void testCreatePath(final boolean isWithCPK) throws Exception {
+ final AzureBlobFileSystem fs = getAbfs(isWithCPK);
+ final String testFileName = "/" + methodName.getMethodName();
+ createFileAndGetContent(fs, testFileName, FILE_SIZE);
+
+ AbfsClient abfsClient = fs.getAbfsClient();
+ FsPermission permission = new FsPermission(FsAction.EXECUTE,
+ FsAction.EXECUTE, FsAction.EXECUTE);
+ FsPermission umask = new FsPermission(FsAction.NONE, FsAction.NONE,
+ FsAction.NONE);
+ boolean isNamespaceEnabled = fs.getIsNamespaceEnabled();
+ AbfsRestOperation abfsRestOperation = abfsClient
+ .createPath(testFileName, true, true,
+ isNamespaceEnabled ? getOctalNotation(permission) : null,
+ isNamespaceEnabled ? getOctalNotation(umask) : null, false, null);
+ assertCPKHeaders(abfsRestOperation, isWithCPK);
+ assertResponseHeader(abfsRestOperation, isWithCPK,
+ X_MS_ENCRYPTION_KEY_SHA256, getCPKSha(fs));
+ assertResponseHeader(abfsRestOperation, false, X_MS_SERVER_ENCRYPTED, "");
+ assertResponseHeader(abfsRestOperation, true, X_MS_REQUEST_SERVER_ENCRYPTED,
+ "true");
+
+ FileStatus[] listStatuses = fs.listStatus(new Path(testFileName));
+ Assertions.assertThat(listStatuses.length)
+ .describedAs("listStatuses should have 1 entry").isEqualTo(1);
+
+ listStatuses = getSameFSWithWrongCPK(fs).listStatus(new Path(testFileName));
+ Assertions.assertThat(listStatuses.length)
+ .describedAs("listStatuses should have 1 entry").isEqualTo(1);
+ }
+
+ @Test
+ public void testRenamePathWithCPK() throws Exception {
+ testRenamePath(true);
+ }
+
+ @Test
+ public void testRenamePathWithoutCPK() throws Exception {
+ testRenamePath(false);
+ }
+
+ private void testRenamePath(final boolean isWithCPK) throws Exception {
+ final AzureBlobFileSystem fs = getAbfs(isWithCPK);
+ final String testFileName = "/" + methodName.getMethodName();
+ createFileAndGetContent(fs, testFileName, FILE_SIZE);
+
+ FileStatus fileStatusBeforeRename = fs
+ .getFileStatus(new Path(testFileName));
+
+ String newName = "/newName";
+ AbfsClient abfsClient = fs.getAbfsClient();
+ AbfsRestOperation abfsRestOperation = abfsClient
+ .renamePath(testFileName, newName, null);
+ assertCPKHeaders(abfsRestOperation, false);
+ assertNoCPKResponseHeadersPresent(abfsRestOperation);
+
+ LambdaTestUtils.intercept(FileNotFoundException.class,
+ (() -> fs.getFileStatus(new Path(testFileName))));
+
+ FileStatus fileStatusAfterRename = fs.getFileStatus(new Path(newName));
+ Assertions.assertThat(fileStatusAfterRename.getLen())
+ .describedAs("File size has to be same before and after rename")
+ .isEqualTo(fileStatusBeforeRename.getLen());
+ }
+
+ @Test
+ public void testFlushWithCPK() throws Exception {
+ testFlush(true);
+ }
+
+ @Test
+ public void testFlushWithoutCPK() throws Exception {
+ testFlush(false);
+ }
+
+ private void testFlush(final boolean isWithCPK) throws Exception {
+ final AzureBlobFileSystem fs = getAbfs(isWithCPK);
+ final String testFileName = "/" + methodName.getMethodName();
+ fs.create(new Path(testFileName));
+ AbfsClient abfsClient = fs.getAbfsClient();
+ String expectedCPKSha = getCPKSha(fs);
+
+ byte[] fileContent = getRandomBytesArray(FILE_SIZE);
+ Path testFilePath = new Path(testFileName + "1");
+ FSDataOutputStream oStream = fs.create(testFilePath);
+ oStream.write(fileContent);
+
+ // Trying to read with different CPK headers
+ Configuration conf = fs.getConf();
+ String accountName = conf.get(FS_AZURE_ABFS_ACCOUNT_NAME);
+ conf.set(FS_AZURE_CLIENT_PROVIDED_ENCRYPTION_KEY + "." + accountName,
+ "different-1234567890123456789012");
+ try (AzureBlobFileSystem fs2 = (AzureBlobFileSystem) FileSystem.newInstance(conf);
+ AbfsClient abfsClient2 = fs2.getAbfsClient()) {
+ LambdaTestUtils.intercept(IOException.class, () -> {
+ abfsClient2.flush(testFileName, 0, false, false, null, null);
+ });
+ }
+
+ // Trying to read with no CPK headers
+ if (isWithCPK) {
+ conf.unset(FS_AZURE_CLIENT_PROVIDED_ENCRYPTION_KEY + "." + accountName);
+ try (AzureBlobFileSystem fs3 = (AzureBlobFileSystem) FileSystem
+ .get(conf); AbfsClient abfsClient3 = fs3.getAbfsClient()) {
+ LambdaTestUtils.intercept(IOException.class, () -> {
+ abfsClient3.flush(testFileName, 0, false, false, null, null);
+ });
+ }
+ }
+
+ // With correct CPK
+ AbfsRestOperation abfsRestOperation = abfsClient
+ .flush(testFileName, 0, false, false, null, null);
+ assertCPKHeaders(abfsRestOperation, isWithCPK);
+ assertResponseHeader(abfsRestOperation, isWithCPK,
+ X_MS_ENCRYPTION_KEY_SHA256, expectedCPKSha);
+ assertResponseHeader(abfsRestOperation, false, X_MS_SERVER_ENCRYPTED, "");
+ assertResponseHeader(abfsRestOperation, true, X_MS_REQUEST_SERVER_ENCRYPTED,
+ isWithCPK + "");
+ }
+
+ @Test
+ public void testSetPathPropertiesWithCPK() throws Exception {
+ testSetPathProperties(true);
+ }
+
+ @Test
+ public void testSetPathPropertiesWithoutCPK() throws Exception {
+ testSetPathProperties(false);
+ }
+
+ private void testSetPathProperties(final boolean isWithCPK) throws Exception {
+ final AzureBlobFileSystem fs = getAbfs(isWithCPK);
+ final String testFileName = "/" + methodName.getMethodName();
+ createFileAndGetContent(fs, testFileName, FILE_SIZE);
+
+ AbfsClient abfsClient = fs.getAbfsClient();
+ final Hashtable<String, String> properties = new Hashtable<>();
+ properties.put("key", "val");
+ AbfsRestOperation abfsRestOperation = abfsClient
+ .setPathProperties(testFileName,
+ convertXmsPropertiesToCommaSeparatedString(properties));
+ assertCPKHeaders(abfsRestOperation, isWithCPK);
+ assertResponseHeader(abfsRestOperation, isWithCPK,
+ X_MS_ENCRYPTION_KEY_SHA256, getCPKSha(fs));
+ assertResponseHeader(abfsRestOperation, false, X_MS_SERVER_ENCRYPTED, "");
+ assertResponseHeader(abfsRestOperation, true, X_MS_REQUEST_SERVER_ENCRYPTED,
+ "true");
+ }
+
+ @Test
+ public void testGetPathStatusFileWithCPK() throws Exception {
+ testGetPathStatusFile(true);
+ }
+
+ @Test
+ public void testGetPathStatusFileWithoutCPK() throws Exception {
+ testGetPathStatusFile(false);
+ }
+
+ private void testGetPathStatusFile(final boolean isWithCPK) throws Exception {
+ final AzureBlobFileSystem fs = getAbfs(isWithCPK);
+ final String testFileName = "/" + methodName.getMethodName();
+ createFileAndGetContent(fs, testFileName, FILE_SIZE);
+
+ AbfsClient abfsClient = fs.getAbfsClient();
+ AbfsRestOperation abfsRestOperation = abfsClient
+ .getPathStatus(testFileName, false);
+ assertCPKHeaders(abfsRestOperation, false);
+ assertResponseHeader(abfsRestOperation, isWithCPK,
+ X_MS_ENCRYPTION_KEY_SHA256, getCPKSha(fs));
+ assertResponseHeader(abfsRestOperation, true, X_MS_SERVER_ENCRYPTED,
+ "true");
+ assertResponseHeader(abfsRestOperation, false,
+ X_MS_REQUEST_SERVER_ENCRYPTED, "");
+
+ abfsRestOperation = abfsClient.getPathStatus(testFileName, true);
+ assertCPKHeaders(abfsRestOperation, isWithCPK);
+ assertResponseHeader(abfsRestOperation, isWithCPK,
+ X_MS_ENCRYPTION_KEY_SHA256, getCPKSha(fs));
+ assertResponseHeader(abfsRestOperation, true, X_MS_SERVER_ENCRYPTED,
+ "true");
+ assertResponseHeader(abfsRestOperation, false,
+ X_MS_REQUEST_SERVER_ENCRYPTED, "");
+ }
+
+ @Test
+ public void testDeletePathWithCPK() throws Exception {
+ testDeletePath(false);
+ }
+
+ @Test
+ public void testDeletePathWithoutCPK() throws Exception {
+ testDeletePath(false);
+ }
+
+ private void testDeletePath(final boolean isWithCPK) throws Exception {
+ final AzureBlobFileSystem fs = getAbfs(isWithCPK);
+ final String testFileName = "/" + methodName.getMethodName();
+ createFileAndGetContent(fs, testFileName, FILE_SIZE);
+
+ FileStatus[] listStatuses = fs.listStatus(new Path(testFileName));
+ Assertions.assertThat(listStatuses.length)
+ .describedAs("listStatuses should have 1 entry").isEqualTo(1);
+
+ AbfsClient abfsClient = fs.getAbfsClient();
+ AbfsRestOperation abfsRestOperation = abfsClient
+ .deletePath(testFileName, false, null);
+ assertCPKHeaders(abfsRestOperation, false);
+ assertNoCPKResponseHeadersPresent(abfsRestOperation);
+
+ Assertions.assertThatThrownBy(() -> fs.listStatus(new Path(testFileName)))
+ .isInstanceOf(FileNotFoundException.class);
+ }
+
+ @Test
+ public void testSetPermissionWithCPK() throws Exception {
+ testSetPermission(true);
+ }
+
+ @Test
+ public void testSetPermissionWithoutCPK() throws Exception {
+ testSetPermission(false);
+ }
+
+ private void testSetPermission(final boolean isWithCPK) throws Exception {
+ final AzureBlobFileSystem fs = getAbfs(isWithCPK);
+ final String testFileName = "/" + methodName.getMethodName();
+ Assume.assumeTrue(fs.getIsNamespaceEnabled());
+ createFileAndGetContent(fs, testFileName, FILE_SIZE);
+ AbfsClient abfsClient = fs.getAbfsClient();
+ FsPermission permission = new FsPermission(FsAction.EXECUTE,
+ FsAction.EXECUTE, FsAction.EXECUTE);
+ AbfsRestOperation abfsRestOperation = abfsClient
+ .setPermission(testFileName, permission.toString());
+ assertCPKHeaders(abfsRestOperation, false);
+ assertNoCPKResponseHeadersPresent(abfsRestOperation);
+ }
+
+ @Test
+ public void testSetAclWithCPK() throws Exception {
+ testSetAcl(true);
+ }
+
+ @Test
+ public void testSetAclWithoutCPK() throws Exception {
+ testSetAcl(false);
+ }
+
+ private void testSetAcl(final boolean isWithCPK) throws Exception {
+ final AzureBlobFileSystem fs = getAbfs(isWithCPK);
+ final String testFileName = "/" + methodName.getMethodName();
+ Assume.assumeTrue(fs.getIsNamespaceEnabled());
+ createFileAndGetContent(fs, testFileName, FILE_SIZE);
+ AbfsClient abfsClient = fs.getAbfsClient();
+
+ List<AclEntry> aclSpec = Lists.newArrayList(aclEntry(ACCESS, USER, ALL));
+ final Map<String, String> aclEntries = AbfsAclHelper
+ .deserializeAclSpec(AclEntry.aclSpecToString(aclSpec));
+
+ AbfsRestOperation abfsRestOperation = abfsClient
+ .setAcl(testFileName, AbfsAclHelper.serializeAclSpec(aclEntries));
+ assertCPKHeaders(abfsRestOperation, false);
+ assertNoCPKResponseHeadersPresent(abfsRestOperation);
+ }
+
+ @Test
+ public void testGetAclWithCPK() throws Exception {
+ testGetAcl(true);
+ }
+
+ @Test
+ public void testGetAclWithoutCPK() throws Exception {
+ testGetAcl(false);
+ }
+
+ private void testGetAcl(final boolean isWithCPK) throws Exception {
+ final AzureBlobFileSystem fs = getAbfs(isWithCPK);
+ final String testFileName = "/" + methodName.getMethodName();
+ Assume.assumeTrue(fs.getIsNamespaceEnabled());
+ createFileAndGetContent(fs, testFileName, FILE_SIZE);
+ AbfsClient abfsClient = fs.getAbfsClient();
+ AbfsRestOperation abfsRestOperation = abfsClient.getAclStatus(testFileName);
+ assertCPKHeaders(abfsRestOperation, false);
+ assertNoCPKResponseHeadersPresent(abfsRestOperation);
+ }
+
+ @Test
+ public void testCheckAccessWithCPK() throws Exception {
+ testCheckAccess(true);
+ }
+
+ @Test
+ public void testCheckAccessWithoutCPK() throws Exception {
+ testCheckAccess(false);
+ }
+
+ private void testCheckAccess(final boolean isWithCPK) throws Exception {
+ boolean isHNSEnabled = getConfiguration()
+ .getBoolean(FS_AZURE_TEST_NAMESPACE_ENABLED_ACCOUNT, false);
+ Assume.assumeTrue(FS_AZURE_TEST_NAMESPACE_ENABLED_ACCOUNT + " is false",
+ isHNSEnabled);
+ Assume.assumeTrue("AuthType has to be OAuth",
+ getAuthType() == AuthType.OAuth);
+
+ final AzureBlobFileSystem fs = getAbfs(isWithCPK);
+ final String testFileName = "/" + methodName.getMethodName();
+ fs.create(new Path(testFileName));
+ AbfsClient abfsClient = fs.getAbfsClient();
+ AbfsRestOperation abfsRestOperation = abfsClient
+ .checkAccess(testFileName, "rwx");
+ assertCPKHeaders(abfsRestOperation, false);
+ assertNoCPKResponseHeadersPresent(abfsRestOperation);
+ }
+
+ private byte[] createFileAndGetContent(AzureBlobFileSystem fs,
+ String fileName, int fileSize) throws IOException {
+ byte[] fileContent = getRandomBytesArray(fileSize);
+ Path testFilePath = createFileWithContent(fs, fileName, fileContent);
+ ContractTestUtils.verifyFileContents(fs, testFilePath, fileContent);
+ return fileContent;
+ }
+
+ private void assertCPKHeaders(AbfsRestOperation abfsRestOperation,
+ boolean isCPKHeaderExpected) {
+ assertHeader(abfsRestOperation, X_MS_ENCRYPTION_KEY, isCPKHeaderExpected);
+ assertHeader(abfsRestOperation, X_MS_ENCRYPTION_KEY_SHA256,
+ isCPKHeaderExpected);
+ assertHeader(abfsRestOperation, X_MS_ENCRYPTION_ALGORITHM,
+ isCPKHeaderExpected);
+ }
+
+ private void assertNoCPKResponseHeadersPresent(
+ AbfsRestOperation abfsRestOperation) {
+ assertResponseHeader(abfsRestOperation, false, X_MS_SERVER_ENCRYPTED, "");
+ assertResponseHeader(abfsRestOperation, false,
+ X_MS_REQUEST_SERVER_ENCRYPTED, "");
+ assertResponseHeader(abfsRestOperation, false, X_MS_ENCRYPTION_KEY_SHA256,
+ "");
+ }
+
+ private void assertResponseHeader(AbfsRestOperation abfsRestOperation,
+ boolean isHeaderExpected, String headerName, String expectedValue) {
+ final AbfsHttpOperation result = abfsRestOperation.getResult();
+ final String value = result.getResponseHeader(headerName);
+ if (isHeaderExpected) {
+ Assertions.assertThat(value).isEqualTo(expectedValue);
+ } else {
+ Assertions.assertThat(value).isNull();
+ }
+ }
+
+ private void assertHeader(AbfsRestOperation abfsRestOperation,
+ String headerName, boolean isCPKHeaderExpected) {
+ assertTrue(abfsRestOperation != null);
+ Optional<AbfsHttpHeader> header = abfsRestOperation.getRequestHeaders()
+ .stream().filter(abfsHttpHeader -> abfsHttpHeader.getName()
+ .equalsIgnoreCase(headerName)).findFirst();
+ String desc;
+ if (isCPKHeaderExpected) {
+ desc =
+ "CPK header " + headerName + " is expected, but the same is absent.";
+ } else {
+ desc = "CPK header " + headerName
+ + " is not expected, but the same is present.";
+ }
+ Assertions.assertThat(header.isPresent()).describedAs(desc)
+ .isEqualTo(isCPKHeaderExpected);
+ }
+
+ private byte[] getSHA256Hash(String key) throws IOException {
+ try {
+ final MessageDigest digester = MessageDigest.getInstance("SHA-256");
+ return digester.digest(key.getBytes(StandardCharsets.UTF_8));
+ } catch (NoSuchAlgorithmException e) {
+ throw new IOException(e);
+ }
+ }
+
+ private String getCPKSha(final AzureBlobFileSystem abfs) throws IOException {
+ Configuration conf = abfs.getConf();
+ String accountName = conf.get(FS_AZURE_ABFS_ACCOUNT_NAME);
+ String encryptionKey = conf
+ .get(FS_AZURE_CLIENT_PROVIDED_ENCRYPTION_KEY + "." + accountName);
+ if (encryptionKey == null || encryptionKey.isEmpty()) {
+ return "";
+ }
+ return getBase64EncodedString(getSHA256Hash(encryptionKey));
+ }
+
+ private String getBase64EncodedString(byte[] bytes) {
+ return java.util.Base64.getEncoder().encodeToString(bytes);
+ }
+
+ private Path createFileWithContent(FileSystem fs, String fileName,
+ byte[] fileContent) throws IOException {
+ Path testFilePath = new Path(fileName);
+ try (FSDataOutputStream oStream = fs.create(testFilePath)) {
+ oStream.write(fileContent);
+ oStream.flush();
+ }
+ return testFilePath;
+ }
+
+ private String convertXmsPropertiesToCommaSeparatedString(
+ final Hashtable<String, String> properties)
+ throws CharacterCodingException {
+ StringBuilder commaSeparatedProperties = new StringBuilder();
+ final CharsetEncoder encoder = Charset.forName(XMS_PROPERTIES_ENCODING)
+ .newEncoder();
+ for (Map.Entry<String, String> propertyEntry : properties.entrySet()) {
+ String key = propertyEntry.getKey();
+ String value = propertyEntry.getValue();
+ Boolean canEncodeValue = encoder.canEncode(value);
+ if (!canEncodeValue) {
+ throw new CharacterCodingException();
+ }
+ String encodedPropertyValue = Base64
+ .encode(encoder.encode(CharBuffer.wrap(value)).array());
+ commaSeparatedProperties.append(key).append(AbfsHttpConstants.EQUAL)
+ .append(encodedPropertyValue);
+ commaSeparatedProperties.append(AbfsHttpConstants.COMMA);
+ }
+ if (commaSeparatedProperties.length() != 0) {
+ commaSeparatedProperties
+ .deleteCharAt(commaSeparatedProperties.length() - 1);
+ }
+ return commaSeparatedProperties.toString();
+ }
+
+ private String getOctalNotation(FsPermission fsPermission) {
+ Preconditions.checkNotNull(fsPermission, "fsPermission");
+ return String
+ .format(AbfsHttpConstants.PERMISSION_FORMAT, fsPermission.toOctal());
+ }
+
+ private byte[] getRandomBytesArray(int length) {
+ final byte[] b = new byte[length];
+ new Random().nextBytes(b);
+ return b;
+ }
+
+ private AzureBlobFileSystem getAbfs(boolean withCPK) throws IOException {
+ return getAbfs(withCPK, "12345678901234567890123456789012");
+ }
+
+ private AzureBlobFileSystem getAbfs(boolean withCPK, String cpk)
+ throws IOException {
+ Configuration conf = getRawConfiguration();
+ if (withCPK) {
+ conf.set(FS_AZURE_CLIENT_PROVIDED_ENCRYPTION_KEY + "." + getAccountName(),
+ cpk);
+ } else {
+ conf.unset(
+ FS_AZURE_CLIENT_PROVIDED_ENCRYPTION_KEY + "." + getAccountName());
+ }
+ return (AzureBlobFileSystem) FileSystem.newInstance(conf);
+ }
+
+ private AzureBlobFileSystem getSameFSWithWrongCPK(
+ final AzureBlobFileSystem fs) throws IOException {
+ AbfsConfiguration abfsConf = fs.getAbfsStore().getAbfsConfiguration();
+ Configuration conf = abfsConf.getRawConfiguration();
+ String accountName = conf.get(FS_AZURE_ABFS_ACCOUNT_NAME);
+ String cpk = conf
+ .get(FS_AZURE_CLIENT_PROVIDED_ENCRYPTION_KEY + "." + accountName);
+ if (cpk == null || cpk.isEmpty()) {
+ cpk = "01234567890123456789012345678912";
+ }
+ cpk = "different-" + cpk;
+ String differentCpk = cpk.substring(0, ENCRYPTION_KEY_LEN - 1);
+ conf.set(FS_AZURE_CLIENT_PROVIDED_ENCRYPTION_KEY + "." + accountName,
+ differentCpk);
+ conf.set("fs.defaultFS",
+ "abfs://" + getFileSystemName() + "@" + accountName);
+ AzureBlobFileSystem sameFSWithDifferentCPK =
+ (AzureBlobFileSystem) FileSystem.newInstance(conf);
+ return sameFSWithDifferentCPK;
+ }
+
+}
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/constants/TestConfigurationKeys.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/constants/TestConfigurationKeys.java
index 72ea766..565eb38 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/constants/TestConfigurationKeys.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/constants/TestConfigurationKeys.java
@@ -28,6 +28,7 @@ public final class TestConfigurationKeys {
public static final String FS_AZURE_CONTRACT_TEST_URI = "fs.contract.test.fs.abfs";
public static final String FS_AZURE_TEST_NAMESPACE_ENABLED_ACCOUNT = "fs.azure.test.namespace.enabled";
public static final String FS_AZURE_TEST_APPENDBLOB_ENABLED = "fs.azure.test.appendblob.enabled";
+ public static final String FS_AZURE_TEST_CPK_ENABLED = "fs.azure.test.cpk.enabled";
public static final String FS_AZURE_BLOB_DATA_CONTRIBUTOR_CLIENT_ID = "fs.azure.account.oauth2.contributor.client.id";
public static final String FS_AZURE_BLOB_DATA_CONTRIBUTOR_CLIENT_SECRET = "fs.azure.account.oauth2.contributor.client.secret";
@@ -54,6 +55,9 @@ public final class TestConfigurationKeys {
public static final String FS_AZURE_TEST_APP_SECRET = "fs.azure.test.app.secret";
+ public static final String FS_AZURE_TEST_CPK_ENABLED_SECONDARY_ACCOUNT = "fs.azure.test.cpk-enabled-secondary-account";
+ public static final String FS_AZURE_TEST_CPK_ENABLED_SECONDARY_ACCOUNT_KEY = "fs.azure.test.cpk-enabled-secondary-account.key";
+
public static final String TEST_CONFIGURATION_FILE_NAME = "azure-test.xml";
public static final String TEST_CONTAINER_PREFIX = "abfs-testcontainer-";
public static final int TEST_TIMEOUT = 15 * 60 * 1000;
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsClient.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsClient.java
index 4facc10..a725bf3 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsClient.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsClient.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.fs.azurebfs.services;
import java.io.IOException;
import java.lang.reflect.Field;
-import java.net.MalformedURLException;
import java.net.URL;
import java.util.List;
import java.util.regex.Pattern;
@@ -103,7 +102,7 @@ public final class TestAbfsClient {
}
private String getUserAgentString(AbfsConfiguration config,
- boolean includeSSLProvider) throws MalformedURLException {
+ boolean includeSSLProvider) throws IOException {
AbfsClientContext abfsClientContext = new AbfsClientContextBuilder().build();
AbfsClient client = new AbfsClient(new URL("https://azure.com"), null,
config, (AccessTokenProvider) null, abfsClientContext);
@@ -250,8 +249,7 @@ public final class TestAbfsClient {
public static AbfsClient createTestClientFromCurrentContext(
AbfsClient baseAbfsClientInstance,
- AbfsConfiguration abfsConfig)
- throws AzureBlobFileSystemException {
+ AbfsConfiguration abfsConfig) throws IOException {
AuthType currentAuthType = abfsConfig.getAuthType(
abfsConfig.getAccountName());
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org