You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2021/07/30 19:31:07 UTC

[GitHub] [nifi] greyp9 commented on a change in pull request #5242: NIFI-6616 added GCP Sensitive Property Provider

greyp9 commented on a change in pull request #5242:
URL: https://github.com/apache/nifi/pull/5242#discussion_r680104530



##########
File path: nifi-registry/nifi-registry-core/nifi-registry-resources/src/main/resources/conf/bootstrap-gcp.conf
##########
@@ -0,0 +1,22 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# GCP KMS is required to be configured for GCP KMS Sensitive Property Provider

Review comment:
       > These GCP KMS settings must all be configured in order to use the GCP KMS Sensitive Property Provider
   
   I see that this is a copy/paste from "bootstrap-aws.conf".  I'd suggest a similar wording update there, although the "toolkit-guide.adoc" documentation suggests that only one AWS setting is required.  I'll defer to @exceptionfactory.
   

##########
File path: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/bootstrap-gcp.conf
##########
@@ -0,0 +1,22 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# GCP KMS is required to be configured for GCP KMS Sensitive Property Provider

Review comment:
       suggest...
   > These GCP KMS settings must all be configured in order to use the GCP KMS Sensitive Property Provider
   
   

##########
File path: nifi-docs/src/main/asciidoc/toolkit-guide.adoc
##########
@@ -524,6 +524,19 @@ This protection scheme uses AWS Key Management Service (https://aws.amazon.com/k
 |`aws.secret.access.key`|The secret access key used to access AWS KMS.|_none_
 |===
 
+==== GCP_KMS
+This protection scheme uses GCP Key Management Service (https://cloud.google.com/security-key-management) for encryption and decryption. GCP KMS configuration properties are to be stored in the `bootstrap-gcp.conf` file, as referenced in the `bootstrap.conf` of NiFi or NiFi Registry. Credentials must be configured as per the following link: https://cloud.google.com/kms/docs/reference/libraries
+
+===== Required properties
+[options="header,footer"]
+|===
+|Property Name|Description|Default
+|`gcp.kms.project`|The project containing the key that the GCP KMS client uses for encryption and decryption.|_none_
+|`gcp.kms.location`|The location of the project containing the key that the GCP KMS client uses for encryption and decryption.|_none_

Review comment:
       suggest:
   - from `The location of the project` to `the geographic region of the project`

##########
File path: nifi-docs/src/main/asciidoc/toolkit-guide.adoc
##########
@@ -456,7 +456,7 @@ The following are available options when targeting NiFi Registry using the `--ni
  * `-v`,`--verbose`                              Sets verbose mode (default false)
  * `-p`,`--password <password>`                  Protect the files using a password-derived key. If an argument is not provided to this flag, interactive mode will be triggered to prompt the user to enter the password.
  * `-k`,`--key <keyhex>`                         Protect the files using a raw hexadecimal key. If an argument is not provided to this flag, interactive mode will be triggered to prompt the user to enter the key.
- * `-S`,`--protectionScheme <protectionScheme>`  Selects the protection scheme for encrypted properties.  Valid values are: [AES_GCM, HASHICORP_VAULT_TRANSIT, AWS_KMS]  (default is AES_GCM)
+ * `-S`,`--protectionScheme <protectionScheme>`  Selects the protection scheme for encrypted properties.  Valid values are: [AES_GCM, HASHICORP_VAULT_TRANSIT, AWS_KMS, GCP_KMS]  (default is AES_GCM)

Review comment:
       Since the valid values reference sections in this document, it might be helpful for these to take the form of anchor hyperlinks.
   
   https://docs.asciidoctor.org/asciidoc/latest/syntax-quick-reference/
   Example 15. Inline anchors

##########
File path: nifi-commons/nifi-sensitive-property-provider/src/main/java/org/apache/nifi/properties/GCPSensitivePropertyProvider.java
##########
@@ -0,0 +1,288 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.properties;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.properties.BootstrapProperties.BootstrapPropertyKey;
+
+import com.google.api.gax.rpc.ApiException;
+import com.google.cloud.kms.v1.CryptoKey;
+import com.google.cloud.kms.v1.CryptoKeyName;
+import com.google.cloud.kms.v1.CryptoKeyVersion;
+import com.google.cloud.kms.v1.DecryptResponse;
+import com.google.cloud.kms.v1.EncryptResponse;
+import com.google.cloud.kms.v1.KeyManagementServiceClient;
+import com.google.protobuf.ByteString;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Base64;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Paths;
+import java.util.Objects;
+
+public class GCPSensitivePropertyProvider extends AbstractSensitivePropertyProvider {
+    private static final Logger logger = LoggerFactory.getLogger(GCPSensitivePropertyProvider.class);
+
+    private static final String GCP_PREFIX = "gcp";
+    private static final String PROJECT_ID_PROPS_NAME = "gcp.kms.project";
+    private static final String LOCATION_ID_PROPS_NAME = "gcp.kms.location";
+    private static final String KEYRING_ID_PROPS_NAME = "gcp.kms.keyring";
+    private static final String KEY_ID_PROPS_NAME = "gcp.kms.key";
+
+    private static final Charset PROPERTY_CHARSET = StandardCharsets.UTF_8;
+
+    private final BootstrapProperties gcpBootstrapProperties;
+    private KeyManagementServiceClient client;
+    private CryptoKeyName keyName;
+
+    GCPSensitivePropertyProvider(final BootstrapProperties bootstrapProperties) {
+        super(bootstrapProperties);
+        Objects.requireNonNull(bootstrapProperties, "The file bootstrap.conf provided to GCP SPP is null");
+        gcpBootstrapProperties = getGCPBootstrapProperties(bootstrapProperties);
+        loadRequiredGCPProperties(gcpBootstrapProperties);
+    }
+
+    /**
+     * Initializes the GCP KMS Client to be used for encrypt, decrypt and other interactions with GCP Cloud KMS.
+     * Note: This does not verify if credentials are valid.
+     */
+    private void initializeClient() {
+        try {
+            client = KeyManagementServiceClient.create();
+        } catch (final IOException e) {
+            final String msg = "Encountered an error initializing GCP Cloud KMS client";
+            throw new SensitivePropertyProtectionException(msg, e);
+        }
+    }
+
+    /**
+     * Validates the key details provided by the user.
+     */
+    private void validate() throws ApiException, SensitivePropertyProtectionException {
+        if (client == null) {
+            final String msg = "The GCP KMS client failed to open, cannot validate key";
+            throw new SensitivePropertyProtectionException(msg);
+        }
+        if (keyName == null) {
+            final String msg = "The GCP KMS key provided is not provided/complete";
+            throw new SensitivePropertyProtectionException(msg);
+        }
+        final CryptoKey key;
+        final CryptoKeyVersion keyVersion;
+        try {
+            key = client.getCryptoKey(keyName);
+            keyVersion = client.getCryptoKeyVersion(key.getPrimary().getName());
+        } catch (final ApiException e) {
+            throw new SensitivePropertyProtectionException("Encountered an error while fetching key details", e);
+        }
+
+        if (keyVersion.getState() != CryptoKeyVersion.CryptoKeyVersionState.ENABLED) {
+            throw new SensitivePropertyProtectionException("The key is not enabled");
+        }
+    }
+
+    /**
+     * Checks if we have the required key properties for GCP Cloud KMS and loads it into {@link #keyName}.
+     * Will load null if key is not present.
+     * Note: This function does not verify if the key is correctly formatted/valid.
+     * @param props the properties representing bootstrap-gcp.conf.
+     */
+    private void loadRequiredGCPProperties(final BootstrapProperties props) {
+        if (props != null) {
+            final String projectId = props.getProperty(PROJECT_ID_PROPS_NAME);
+            final String locationId = props.getProperty(LOCATION_ID_PROPS_NAME);
+            final String keyRingId = props.getProperty(KEYRING_ID_PROPS_NAME);
+            final String keyId = props.getProperty(KEY_ID_PROPS_NAME);
+            if (StringUtils.isNoneBlank(projectId, locationId, keyRingId, keyId)) {
+                keyName = CryptoKeyName.of(projectId, locationId, keyRingId, keyId);
+            }
+        }
+    }
+
+    /**
+     * Checks bootstrap.conf to check if BootstrapPropertyKey.GCP_KMS_SENSITIVE_PROPERTY_PROVIDER_CONF property is
+     * configured to the bootstrap-gcp.conf file. Also will load bootstrap-gcp.conf to {@link #gcpBootstrapProperties}.
+     * @param bootstrapProperties BootstrapProperties object corresponding to bootstrap.conf.
+     * @return BootstrapProperties object corresponding to bootstrap-gcp.conf, null otherwise.
+     */
+    private BootstrapProperties getGCPBootstrapProperties(final BootstrapProperties bootstrapProperties) {
+        final BootstrapProperties cloudBootstrapProperties;
+
+        // Load the bootstrap-gcp.conf file based on path specified in
+        // "nifi.bootstrap.protection.gcp.kms.conf" property of bootstrap.conf
+        final String filePath = bootstrapProperties.getProperty(BootstrapPropertyKey.GCP_KMS_SENSITIVE_PROPERTY_PROVIDER_CONF).orElse(null);
+        if (StringUtils.isBlank(filePath)) {
+            logger.warn("GCP KMS properties file path not configured in bootstrap properties");
+            return null;
+        }
+
+        try {
+            cloudBootstrapProperties = AbstractBootstrapPropertiesLoader.loadBootstrapProperties(
+                    Paths.get(filePath), GCP_PREFIX);
+        } catch (final IOException e) {
+            throw new SensitivePropertyProtectionException("Could not load " + filePath, e);
+        }
+
+        return cloudBootstrapProperties;
+    }
+
+    /**
+     * Checks bootstrap-gcp.conf for the required configurations for Google Cloud KMS encrypt/decrypt operations.
+     * @return true if bootstrap-gcp.conf contains the required properties for GCP KMS SPP, false otherwise.
+     */
+    private boolean hasRequiredGCPProperties() {
+        if (gcpBootstrapProperties == null) {
+            return false;
+        }
+
+        final String projectId = gcpBootstrapProperties.getProperty(PROJECT_ID_PROPS_NAME);
+        final String locationId = gcpBootstrapProperties.getProperty(LOCATION_ID_PROPS_NAME);
+        final String keyRingId = gcpBootstrapProperties.getProperty(KEYRING_ID_PROPS_NAME);
+        final String keyId = gcpBootstrapProperties.getProperty(KEY_ID_PROPS_NAME);
+
+        // Note: the following does not verify if the properties are valid properties, they only verify if
+        // the properties are configured in bootstrap-gcp.conf.
+        return StringUtils.isNoneBlank(projectId, locationId, keyRingId, keyId);
+    }
+
+    @Override
+    public boolean isSupported() {
+        return hasRequiredGCPProperties();
+    }
+
+    @Override
+    protected PropertyProtectionScheme getProtectionScheme() {
+        return PropertyProtectionScheme.GCP_KMS;
+    }
+
+    /**
+     * Returns the name of the underlying implementation.
+     *
+     * @return the name of this sensitive property provider.
+     */
+    @Override
+    public String getName() {
+        return PropertyProtectionScheme.GCP_KMS.getName();
+    }
+
+    /**
+     * Returns the key used to identify the provider implementation in {@code nifi.properties}.
+     *
+     * @return the key to persist in the sibling property.
+     */
+    @Override
+    public String getIdentifierKey() {
+        return PropertyProtectionScheme.GCP_KMS.getIdentifier();
+    }
+
+    /**
+     * Returns the ciphertext blob of this value encrypted using a key stored in GCP KMS.
+     * @return the ciphertext blob to persist in the {@code nifi.properties} file.
+     */
+    private byte[] encrypt(final byte[] input) throws IOException {
+        final EncryptResponse response = client.encrypt(keyName, ByteString.copyFrom(input));
+        return response.getCiphertext().toByteArray();
+    }
+
+    /**
+     * Returns the value corresponding to a ciphertext blob decrypted using a key stored in GCP KMS.
+     * @return the "unprotected" byte[] of this value, which could be used by the application.
+     */
+    private byte[] decrypt(final byte[] input) throws IOException {
+        final DecryptResponse response = client.decrypt(keyName, ByteString.copyFrom(input));
+        return response.getPlaintext().toByteArray();
+    }
+
+    /**
+     * Checks if the client is open and if not, initializes the client and validates the key required for GCP KMS.
+     */
+    private void checkAndInitializeClient() throws SensitivePropertyProtectionException {
+        if (client == null) {
+            try {
+                initializeClient();
+                validate();
+            } catch (final SensitivePropertyProtectionException e) {
+                throw new SensitivePropertyProtectionException("Error initializing the GCP KMS client", e);

Review comment:
       Would it make more sense to allow the original exception to be thrown out of the method?

##########
File path: nifi-commons/nifi-sensitive-property-provider/src/main/java/org/apache/nifi/properties/GCPSensitivePropertyProvider.java
##########
@@ -0,0 +1,288 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.properties;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.properties.BootstrapProperties.BootstrapPropertyKey;
+
+import com.google.api.gax.rpc.ApiException;
+import com.google.cloud.kms.v1.CryptoKey;
+import com.google.cloud.kms.v1.CryptoKeyName;
+import com.google.cloud.kms.v1.CryptoKeyVersion;
+import com.google.cloud.kms.v1.DecryptResponse;
+import com.google.cloud.kms.v1.EncryptResponse;
+import com.google.cloud.kms.v1.KeyManagementServiceClient;
+import com.google.protobuf.ByteString;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Base64;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Paths;
+import java.util.Objects;
+
+public class GCPSensitivePropertyProvider extends AbstractSensitivePropertyProvider {
+    private static final Logger logger = LoggerFactory.getLogger(GCPSensitivePropertyProvider.class);
+
+    private static final String GCP_PREFIX = "gcp";
+    private static final String PROJECT_ID_PROPS_NAME = "gcp.kms.project";
+    private static final String LOCATION_ID_PROPS_NAME = "gcp.kms.location";
+    private static final String KEYRING_ID_PROPS_NAME = "gcp.kms.keyring";
+    private static final String KEY_ID_PROPS_NAME = "gcp.kms.key";
+
+    private static final Charset PROPERTY_CHARSET = StandardCharsets.UTF_8;
+
+    private final BootstrapProperties gcpBootstrapProperties;
+    private KeyManagementServiceClient client;
+    private CryptoKeyName keyName;
+
+    GCPSensitivePropertyProvider(final BootstrapProperties bootstrapProperties) {
+        super(bootstrapProperties);
+        Objects.requireNonNull(bootstrapProperties, "The file bootstrap.conf provided to GCP SPP is null");
+        gcpBootstrapProperties = getGCPBootstrapProperties(bootstrapProperties);
+        loadRequiredGCPProperties(gcpBootstrapProperties);
+    }
+
+    /**
+     * Initializes the GCP KMS Client to be used for encrypt, decrypt and other interactions with GCP Cloud KMS.
+     * Note: This does not verify if credentials are valid.
+     */
+    private void initializeClient() {
+        try {
+            client = KeyManagementServiceClient.create();
+        } catch (final IOException e) {
+            final String msg = "Encountered an error initializing GCP Cloud KMS client";
+            throw new SensitivePropertyProtectionException(msg, e);
+        }
+    }
+
+    /**
+     * Validates the key details provided by the user.
+     */
+    private void validate() throws ApiException, SensitivePropertyProtectionException {
+        if (client == null) {
+            final String msg = "The GCP KMS client failed to open, cannot validate key";
+            throw new SensitivePropertyProtectionException(msg);
+        }
+        if (keyName == null) {
+            final String msg = "The GCP KMS key provided is not provided/complete";
+            throw new SensitivePropertyProtectionException(msg);
+        }
+        final CryptoKey key;
+        final CryptoKeyVersion keyVersion;
+        try {
+            key = client.getCryptoKey(keyName);
+            keyVersion = client.getCryptoKeyVersion(key.getPrimary().getName());
+        } catch (final ApiException e) {
+            throw new SensitivePropertyProtectionException("Encountered an error while fetching key details", e);
+        }
+
+        if (keyVersion.getState() != CryptoKeyVersion.CryptoKeyVersionState.ENABLED) {
+            throw new SensitivePropertyProtectionException("The key is not enabled");
+        }
+    }
+
+    /**
+     * Checks if we have the required key properties for GCP Cloud KMS and loads it into {@link #keyName}.
+     * Will load null if key is not present.
+     * Note: This function does not verify if the key is correctly formatted/valid.
+     * @param props the properties representing bootstrap-gcp.conf.
+     */
+    private void loadRequiredGCPProperties(final BootstrapProperties props) {
+        if (props != null) {
+            final String projectId = props.getProperty(PROJECT_ID_PROPS_NAME);
+            final String locationId = props.getProperty(LOCATION_ID_PROPS_NAME);
+            final String keyRingId = props.getProperty(KEYRING_ID_PROPS_NAME);
+            final String keyId = props.getProperty(KEY_ID_PROPS_NAME);
+            if (StringUtils.isNoneBlank(projectId, locationId, keyRingId, keyId)) {
+                keyName = CryptoKeyName.of(projectId, locationId, keyRingId, keyId);
+            }
+        }
+    }
+
+    /**
+     * Checks bootstrap.conf to check if BootstrapPropertyKey.GCP_KMS_SENSITIVE_PROPERTY_PROVIDER_CONF property is
+     * configured to the bootstrap-gcp.conf file. Also will load bootstrap-gcp.conf to {@link #gcpBootstrapProperties}.
+     * @param bootstrapProperties BootstrapProperties object corresponding to bootstrap.conf.
+     * @return BootstrapProperties object corresponding to bootstrap-gcp.conf, null otherwise.
+     */
+    private BootstrapProperties getGCPBootstrapProperties(final BootstrapProperties bootstrapProperties) {
+        final BootstrapProperties cloudBootstrapProperties;
+
+        // Load the bootstrap-gcp.conf file based on path specified in
+        // "nifi.bootstrap.protection.gcp.kms.conf" property of bootstrap.conf
+        final String filePath = bootstrapProperties.getProperty(BootstrapPropertyKey.GCP_KMS_SENSITIVE_PROPERTY_PROVIDER_CONF).orElse(null);
+        if (StringUtils.isBlank(filePath)) {
+            logger.warn("GCP KMS properties file path not configured in bootstrap properties");
+            return null;
+        }
+
+        try {
+            cloudBootstrapProperties = AbstractBootstrapPropertiesLoader.loadBootstrapProperties(
+                    Paths.get(filePath), GCP_PREFIX);
+        } catch (final IOException e) {
+            throw new SensitivePropertyProtectionException("Could not load " + filePath, e);
+        }
+
+        return cloudBootstrapProperties;
+    }
+
+    /**
+     * Checks bootstrap-gcp.conf for the required configurations for Google Cloud KMS encrypt/decrypt operations.
+     * @return true if bootstrap-gcp.conf contains the required properties for GCP KMS SPP, false otherwise.
+     */
+    private boolean hasRequiredGCPProperties() {
+        if (gcpBootstrapProperties == null) {
+            return false;
+        }
+
+        final String projectId = gcpBootstrapProperties.getProperty(PROJECT_ID_PROPS_NAME);
+        final String locationId = gcpBootstrapProperties.getProperty(LOCATION_ID_PROPS_NAME);
+        final String keyRingId = gcpBootstrapProperties.getProperty(KEYRING_ID_PROPS_NAME);
+        final String keyId = gcpBootstrapProperties.getProperty(KEY_ID_PROPS_NAME);
+
+        // Note: the following does not verify if the properties are valid properties, they only verify if
+        // the properties are configured in bootstrap-gcp.conf.
+        return StringUtils.isNoneBlank(projectId, locationId, keyRingId, keyId);
+    }
+
+    @Override
+    public boolean isSupported() {
+        return hasRequiredGCPProperties();
+    }
+
+    @Override
+    protected PropertyProtectionScheme getProtectionScheme() {
+        return PropertyProtectionScheme.GCP_KMS;
+    }
+
+    /**
+     * Returns the name of the underlying implementation.
+     *
+     * @return the name of this sensitive property provider.
+     */
+    @Override
+    public String getName() {
+        return PropertyProtectionScheme.GCP_KMS.getName();
+    }
+
+    /**
+     * Returns the key used to identify the provider implementation in {@code nifi.properties}.
+     *
+     * @return the key to persist in the sibling property.
+     */
+    @Override
+    public String getIdentifierKey() {
+        return PropertyProtectionScheme.GCP_KMS.getIdentifier();
+    }
+
+    /**
+     * Returns the ciphertext blob of this value encrypted using a key stored in GCP KMS.
+     * @return the ciphertext blob to persist in the {@code nifi.properties} file.
+     */
+    private byte[] encrypt(final byte[] input) throws IOException {
+        final EncryptResponse response = client.encrypt(keyName, ByteString.copyFrom(input));
+        return response.getCiphertext().toByteArray();
+    }
+
+    /**
+     * Returns the value corresponding to a ciphertext blob decrypted using a key stored in GCP KMS.
+     * @return the "unprotected" byte[] of this value, which could be used by the application.
+     */
+    private byte[] decrypt(final byte[] input) throws IOException {
+        final DecryptResponse response = client.decrypt(keyName, ByteString.copyFrom(input));
+        return response.getPlaintext().toByteArray();
+    }
+
+    /**
+     * Checks if the client is open and if not, initializes the client and validates the key required for GCP KMS.
+     */
+    private void checkAndInitializeClient() throws SensitivePropertyProtectionException {
+        if (client == null) {
+            try {
+                initializeClient();
+                validate();
+            } catch (final SensitivePropertyProtectionException e) {
+                throw new SensitivePropertyProtectionException("Error initializing the GCP KMS client", e);
+            }
+        }
+    }
+
+    /**
+     * Returns the "protected" form of this value. This is a form which can safely be persisted in the {@code nifi.properties} file without compromising the value.
+     * An encryption-based provider would return a cipher text, while a remote-lookup provider could return a unique ID to retrieve the secured value.
+     *
+     * @param unprotectedValue the sensitive value.
+     * @param context The context of the value (ignored in this implementation)
+     * @return the value to persist in the {@code nifi.properties} file.
+     */
+    @Override
+    public String protect(final String unprotectedValue, final ProtectedPropertyContext context) throws SensitivePropertyProtectionException {
+        if (StringUtils.isBlank(unprotectedValue)) {
+            throw new IllegalArgumentException("Cannot encrypt a blank value");
+        }
+
+        checkAndInitializeClient();
+
+        try {
+            byte[] plainBytes = unprotectedValue.getBytes(PROPERTY_CHARSET);
+            byte[] cipherBytes = encrypt(plainBytes);
+            return Base64.getEncoder().encodeToString(cipherBytes);
+        } catch (final IOException | ApiException e) {
+            throw new SensitivePropertyProtectionException("Encrypt failed", e);
+        }
+    }
+
+    /**
+     * Returns the "unprotected" form of this value. This is the raw sensitive value which is used by the application logic.
+     * An encryption-based provider would decrypt a cipher text and return the plaintext, while a remote-lookup provider could retrieve the secured value.

Review comment:
       > cipher text ... plaintext
   
   Mismatch between one-word form and two-word form.

##########
File path: nifi-commons/nifi-sensitive-property-provider/src/test/java/org/apache/nifi/properties/GCPSensitivePropertyProviderIT.java
##########
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.properties;
+
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.internal.util.io.IOUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Properties;
+
+/**
+ * To run this test, make sure to first configure sensitive credential information as in the following link
+ * https://cloud.google.com/kms/docs/reference/libraries#cloud-console
+ *
+ * Create a project, keyring and key in the web console.
+ *
+ * Take note of the project name, location, keyring name and key name.
+ *
+ * Then, set the system properties as follows:
+ * -Dgcp.kms.project="project"
+ * -Dgcp.kms.location="location"
+ * -Dgcp.kms.keyring="key ring name"
+ * -Dgcp.kms.key="key name"
+ * when running the integration tests
+ */
+
+public class GCPSensitivePropertyProviderIT {
+    private static final String SAMPLE_PLAINTEXT = "GCPSensitivePropertyProviderIT SAMPLE-PLAINTEXT";
+    private static final String PROJECT_ID_PROPS_NAME = "gcp.kms.project";
+    private static final String LOCATION_ID_PROPS_NAME = "gcp.kms.location";
+    private static final String KEYRING_ID_PROPS_NAME = "gcp.kms.keyring";
+    private static final String KEY_ID_PROPS_NAME = "gcp.kms.key";
+    private static final String BOOTSTRAP_GCP_FILE_PROPS_NAME = "nifi.bootstrap.protection.gcp.kms.conf";
+
+    private static final String EMPTY_PROPERTY = "";
+
+    private static GCPSensitivePropertyProvider spp;
+
+    private static BootstrapProperties props;
+
+    private static Path mockBootstrapConf, mockGCPBootstrapConf;
+
+    private static final Logger logger = LoggerFactory.getLogger(GCPSensitivePropertyProviderIT.class);
+
+    private static void initializeBootstrapProperties() throws IOException{
+        mockBootstrapConf = Files.createTempFile("bootstrap", ".conf").toAbsolutePath();
+        mockGCPBootstrapConf = Files.createTempFile("bootstrap-gcp", ".conf").toAbsolutePath();
+        IOUtil.writeText(BOOTSTRAP_GCP_FILE_PROPS_NAME + "=" + mockGCPBootstrapConf.toAbsolutePath(), mockBootstrapConf.toFile());
+
+        final Properties bootstrapProperties = new Properties();
+        try (final InputStream inputStream = Files.newInputStream(mockBootstrapConf)) {
+            bootstrapProperties.load(inputStream);
+            props = new BootstrapProperties("nifi", bootstrapProperties, mockBootstrapConf);
+        }
+
+        String projectId = System.getProperty(PROJECT_ID_PROPS_NAME, EMPTY_PROPERTY);
+        String locationId = System.getProperty(LOCATION_ID_PROPS_NAME, EMPTY_PROPERTY);
+        String keyringId = System.getProperty(KEYRING_ID_PROPS_NAME, EMPTY_PROPERTY);
+        String keyId = System.getProperty(KEY_ID_PROPS_NAME, EMPTY_PROPERTY);
+
+        StringBuilder bootstrapConfText = new StringBuilder();
+        bootstrapConfText.append(PROJECT_ID_PROPS_NAME + "=" + projectId);
+        bootstrapConfText.append("\n" + LOCATION_ID_PROPS_NAME + "=" + locationId);

Review comment:
       Would it be better to use System.getProperty("line.separator") here?

##########
File path: nifi-commons/nifi-sensitive-property-provider/src/main/java/org/apache/nifi/properties/GCPSensitivePropertyProvider.java
##########
@@ -0,0 +1,288 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.properties;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.properties.BootstrapProperties.BootstrapPropertyKey;
+
+import com.google.api.gax.rpc.ApiException;
+import com.google.cloud.kms.v1.CryptoKey;
+import com.google.cloud.kms.v1.CryptoKeyName;
+import com.google.cloud.kms.v1.CryptoKeyVersion;
+import com.google.cloud.kms.v1.DecryptResponse;
+import com.google.cloud.kms.v1.EncryptResponse;
+import com.google.cloud.kms.v1.KeyManagementServiceClient;
+import com.google.protobuf.ByteString;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Base64;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Paths;
+import java.util.Objects;
+
+public class GCPSensitivePropertyProvider extends AbstractSensitivePropertyProvider {
+    private static final Logger logger = LoggerFactory.getLogger(GCPSensitivePropertyProvider.class);
+
+    private static final String GCP_PREFIX = "gcp";
+    private static final String PROJECT_ID_PROPS_NAME = "gcp.kms.project";
+    private static final String LOCATION_ID_PROPS_NAME = "gcp.kms.location";
+    private static final String KEYRING_ID_PROPS_NAME = "gcp.kms.keyring";
+    private static final String KEY_ID_PROPS_NAME = "gcp.kms.key";
+
+    private static final Charset PROPERTY_CHARSET = StandardCharsets.UTF_8;
+
+    private final BootstrapProperties gcpBootstrapProperties;
+    private KeyManagementServiceClient client;
+    private CryptoKeyName keyName;
+
+    GCPSensitivePropertyProvider(final BootstrapProperties bootstrapProperties) {
+        super(bootstrapProperties);
+        Objects.requireNonNull(bootstrapProperties, "The file bootstrap.conf provided to GCP SPP is null");
+        gcpBootstrapProperties = getGCPBootstrapProperties(bootstrapProperties);
+        loadRequiredGCPProperties(gcpBootstrapProperties);
+    }
+
+    /**
+     * Initializes the GCP KMS Client to be used for encrypt, decrypt and other interactions with GCP Cloud KMS.
+     * Note: This does not verify if credentials are valid.
+     */
+    private void initializeClient() {
+        try {
+            client = KeyManagementServiceClient.create();
+        } catch (final IOException e) {
+            final String msg = "Encountered an error initializing GCP Cloud KMS client";
+            throw new SensitivePropertyProtectionException(msg, e);
+        }
+    }
+
+    /**
+     * Validates the key details provided by the user.
+     */
+    private void validate() throws ApiException, SensitivePropertyProtectionException {
+        if (client == null) {
+            final String msg = "The GCP KMS client failed to open, cannot validate key";
+            throw new SensitivePropertyProtectionException(msg);
+        }
+        if (keyName == null) {
+            final String msg = "The GCP KMS key provided is not provided/complete";
+            throw new SensitivePropertyProtectionException(msg);
+        }
+        final CryptoKey key;
+        final CryptoKeyVersion keyVersion;
+        try {
+            key = client.getCryptoKey(keyName);
+            keyVersion = client.getCryptoKeyVersion(key.getPrimary().getName());
+        } catch (final ApiException e) {
+            throw new SensitivePropertyProtectionException("Encountered an error while fetching key details", e);
+        }
+
+        if (keyVersion.getState() != CryptoKeyVersion.CryptoKeyVersionState.ENABLED) {
+            throw new SensitivePropertyProtectionException("The key is not enabled");
+        }
+    }
+
+    /**
+     * Checks if we have the required key properties for GCP Cloud KMS and loads it into {@link #keyName}.
+     * Will load null if key is not present.
+     * Note: This function does not verify if the key is correctly formatted/valid.
+     * @param props the properties representing bootstrap-gcp.conf.
+     */
+    private void loadRequiredGCPProperties(final BootstrapProperties props) {
+        if (props != null) {
+            final String projectId = props.getProperty(PROJECT_ID_PROPS_NAME);
+            final String locationId = props.getProperty(LOCATION_ID_PROPS_NAME);
+            final String keyRingId = props.getProperty(KEYRING_ID_PROPS_NAME);
+            final String keyId = props.getProperty(KEY_ID_PROPS_NAME);
+            if (StringUtils.isNoneBlank(projectId, locationId, keyRingId, keyId)) {
+                keyName = CryptoKeyName.of(projectId, locationId, keyRingId, keyId);
+            }
+        }
+    }
+
+    /**
+     * Checks bootstrap.conf to check if BootstrapPropertyKey.GCP_KMS_SENSITIVE_PROPERTY_PROVIDER_CONF property is
+     * configured to the bootstrap-gcp.conf file. Also will load bootstrap-gcp.conf to {@link #gcpBootstrapProperties}.
+     * @param bootstrapProperties BootstrapProperties object corresponding to bootstrap.conf.
+     * @return BootstrapProperties object corresponding to bootstrap-gcp.conf, null otherwise.
+     */
+    private BootstrapProperties getGCPBootstrapProperties(final BootstrapProperties bootstrapProperties) {
+        final BootstrapProperties cloudBootstrapProperties;
+
+        // Load the bootstrap-gcp.conf file based on path specified in
+        // "nifi.bootstrap.protection.gcp.kms.conf" property of bootstrap.conf
+        final String filePath = bootstrapProperties.getProperty(BootstrapPropertyKey.GCP_KMS_SENSITIVE_PROPERTY_PROVIDER_CONF).orElse(null);
+        if (StringUtils.isBlank(filePath)) {
+            logger.warn("GCP KMS properties file path not configured in bootstrap properties");
+            return null;
+        }
+
+        try {
+            cloudBootstrapProperties = AbstractBootstrapPropertiesLoader.loadBootstrapProperties(
+                    Paths.get(filePath), GCP_PREFIX);
+        } catch (final IOException e) {
+            throw new SensitivePropertyProtectionException("Could not load " + filePath, e);
+        }
+
+        return cloudBootstrapProperties;
+    }
+
+    /**
+     * Checks bootstrap-gcp.conf for the required configurations for Google Cloud KMS encrypt/decrypt operations.
+     * @return true if bootstrap-gcp.conf contains the required properties for GCP KMS SPP, false otherwise.
+     */
+    private boolean hasRequiredGCPProperties() {
+        if (gcpBootstrapProperties == null) {
+            return false;
+        }
+
+        final String projectId = gcpBootstrapProperties.getProperty(PROJECT_ID_PROPS_NAME);
+        final String locationId = gcpBootstrapProperties.getProperty(LOCATION_ID_PROPS_NAME);
+        final String keyRingId = gcpBootstrapProperties.getProperty(KEYRING_ID_PROPS_NAME);
+        final String keyId = gcpBootstrapProperties.getProperty(KEY_ID_PROPS_NAME);
+
+        // Note: the following does not verify if the properties are valid properties, they only verify if
+        // the properties are configured in bootstrap-gcp.conf.
+        return StringUtils.isNoneBlank(projectId, locationId, keyRingId, keyId);
+    }
+
+    @Override
+    public boolean isSupported() {
+        return hasRequiredGCPProperties();
+    }
+
+    @Override
+    protected PropertyProtectionScheme getProtectionScheme() {
+        return PropertyProtectionScheme.GCP_KMS;
+    }
+
+    /**
+     * Returns the name of the underlying implementation.
+     *
+     * @return the name of this sensitive property provider.
+     */
+    @Override
+    public String getName() {
+        return PropertyProtectionScheme.GCP_KMS.getName();
+    }
+
+    /**
+     * Returns the key used to identify the provider implementation in {@code nifi.properties}.
+     *
+     * @return the key to persist in the sibling property.
+     */
+    @Override
+    public String getIdentifierKey() {
+        return PropertyProtectionScheme.GCP_KMS.getIdentifier();
+    }
+
+    /**
+     * Returns the ciphertext blob of this value encrypted using a key stored in GCP KMS.
+     * @return the ciphertext blob to persist in the {@code nifi.properties} file.
+     */
+    private byte[] encrypt(final byte[] input) throws IOException {
+        final EncryptResponse response = client.encrypt(keyName, ByteString.copyFrom(input));
+        return response.getCiphertext().toByteArray();
+    }
+
+    /**
+     * Returns the value corresponding to a ciphertext blob decrypted using a key stored in GCP KMS.
+     * @return the "unprotected" byte[] of this value, which could be used by the application.
+     */
+    private byte[] decrypt(final byte[] input) throws IOException {
+        final DecryptResponse response = client.decrypt(keyName, ByteString.copyFrom(input));
+        return response.getPlaintext().toByteArray();
+    }
+
+    /**
+     * Checks if the client is open and if not, initializes the client and validates the key required for GCP KMS.
+     */
+    private void checkAndInitializeClient() throws SensitivePropertyProtectionException {
+        if (client == null) {
+            try {
+                initializeClient();
+                validate();
+            } catch (final SensitivePropertyProtectionException e) {
+                throw new SensitivePropertyProtectionException("Error initializing the GCP KMS client", e);
+            }
+        }
+    }
+
+    /**
+     * Returns the "protected" form of this value. This is a form which can safely be persisted in the {@code nifi.properties} file without compromising the value.
+     * An encryption-based provider would return a cipher text, while a remote-lookup provider could return a unique ID to retrieve the secured value.
+     *
+     * @param unprotectedValue the sensitive value.
+     * @param context The context of the value (ignored in this implementation)
+     * @return the value to persist in the {@code nifi.properties} file.
+     */
+    @Override
+    public String protect(final String unprotectedValue, final ProtectedPropertyContext context) throws SensitivePropertyProtectionException {
+        if (StringUtils.isBlank(unprotectedValue)) {
+            throw new IllegalArgumentException("Cannot encrypt a blank value");
+        }
+
+        checkAndInitializeClient();
+
+        try {
+            byte[] plainBytes = unprotectedValue.getBytes(PROPERTY_CHARSET);
+            byte[] cipherBytes = encrypt(plainBytes);
+            return Base64.getEncoder().encodeToString(cipherBytes);
+        } catch (final IOException | ApiException e) {
+            throw new SensitivePropertyProtectionException("Encrypt failed", e);
+        }
+    }
+
+    /**
+     * Returns the "unprotected" form of this value. This is the raw sensitive value which is used by the application logic.
+     * An encryption-based provider would decrypt a cipher text and return the plaintext, while a remote-lookup provider could retrieve the secured value.
+     *
+     * @param protectedValue the protected value read from the {@code nifi.properties} file.
+     * @param context The context of the value (ignored in this implementation)
+     * @return the raw value to be used by the application.
+     */
+    @Override
+    public String unprotect(final String protectedValue, final ProtectedPropertyContext context) throws SensitivePropertyProtectionException {
+        if (StringUtils.isBlank(protectedValue)) {
+            throw new IllegalArgumentException("Cannot decrypt an empty/blank cipher");
+        }

Review comment:
       > Cannot decrypt an empty/blank ciphertext
   or
   > Cannot decrypt an empty/blank value
   

##########
File path: nifi-commons/nifi-sensitive-property-provider/src/main/java/org/apache/nifi/properties/GCPSensitivePropertyProvider.java
##########
@@ -0,0 +1,288 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.properties;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.properties.BootstrapProperties.BootstrapPropertyKey;
+
+import com.google.api.gax.rpc.ApiException;
+import com.google.cloud.kms.v1.CryptoKey;
+import com.google.cloud.kms.v1.CryptoKeyName;
+import com.google.cloud.kms.v1.CryptoKeyVersion;
+import com.google.cloud.kms.v1.DecryptResponse;
+import com.google.cloud.kms.v1.EncryptResponse;
+import com.google.cloud.kms.v1.KeyManagementServiceClient;
+import com.google.protobuf.ByteString;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Base64;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Paths;
+import java.util.Objects;
+
+public class GCPSensitivePropertyProvider extends AbstractSensitivePropertyProvider {
+    private static final Logger logger = LoggerFactory.getLogger(GCPSensitivePropertyProvider.class);
+
+    private static final String GCP_PREFIX = "gcp";
+    private static final String PROJECT_ID_PROPS_NAME = "gcp.kms.project";
+    private static final String LOCATION_ID_PROPS_NAME = "gcp.kms.location";
+    private static final String KEYRING_ID_PROPS_NAME = "gcp.kms.keyring";
+    private static final String KEY_ID_PROPS_NAME = "gcp.kms.key";
+
+    private static final Charset PROPERTY_CHARSET = StandardCharsets.UTF_8;
+
+    private final BootstrapProperties gcpBootstrapProperties;
+    private KeyManagementServiceClient client;
+    private CryptoKeyName keyName;
+
+    GCPSensitivePropertyProvider(final BootstrapProperties bootstrapProperties) {
+        super(bootstrapProperties);
+        Objects.requireNonNull(bootstrapProperties, "The file bootstrap.conf provided to GCP SPP is null");
+        gcpBootstrapProperties = getGCPBootstrapProperties(bootstrapProperties);
+        loadRequiredGCPProperties(gcpBootstrapProperties);
+    }
+
+    /**
+     * Initializes the GCP KMS Client to be used for encrypt, decrypt and other interactions with GCP Cloud KMS.
+     * Note: This does not verify if credentials are valid.
+     */
+    private void initializeClient() {
+        try {
+            client = KeyManagementServiceClient.create();
+        } catch (final IOException e) {
+            final String msg = "Encountered an error initializing GCP Cloud KMS client";
+            throw new SensitivePropertyProtectionException(msg, e);
+        }
+    }
+
+    /**
+     * Validates the key details provided by the user.
+     */
+    private void validate() throws ApiException, SensitivePropertyProtectionException {
+        if (client == null) {
+            final String msg = "The GCP KMS client failed to open, cannot validate key";
+            throw new SensitivePropertyProtectionException(msg);
+        }
+        if (keyName == null) {
+            final String msg = "The GCP KMS key provided is not provided/complete";
+            throw new SensitivePropertyProtectionException(msg);
+        }
+        final CryptoKey key;
+        final CryptoKeyVersion keyVersion;
+        try {
+            key = client.getCryptoKey(keyName);
+            keyVersion = client.getCryptoKeyVersion(key.getPrimary().getName());
+        } catch (final ApiException e) {
+            throw new SensitivePropertyProtectionException("Encountered an error while fetching key details", e);
+        }
+
+        if (keyVersion.getState() != CryptoKeyVersion.CryptoKeyVersionState.ENABLED) {
+            throw new SensitivePropertyProtectionException("The key is not enabled");
+        }
+    }
+
+    /**
+     * Checks if we have the required key properties for GCP Cloud KMS and loads it into {@link #keyName}.
+     * Will load null if key is not present.
+     * Note: This function does not verify if the key is correctly formatted/valid.
+     * @param props the properties representing bootstrap-gcp.conf.
+     */
+    private void loadRequiredGCPProperties(final BootstrapProperties props) {
+        if (props != null) {
+            final String projectId = props.getProperty(PROJECT_ID_PROPS_NAME);
+            final String locationId = props.getProperty(LOCATION_ID_PROPS_NAME);
+            final String keyRingId = props.getProperty(KEYRING_ID_PROPS_NAME);
+            final String keyId = props.getProperty(KEY_ID_PROPS_NAME);
+            if (StringUtils.isNoneBlank(projectId, locationId, keyRingId, keyId)) {
+                keyName = CryptoKeyName.of(projectId, locationId, keyRingId, keyId);
+            }
+        }
+    }
+
+    /**
+     * Checks bootstrap.conf to check if BootstrapPropertyKey.GCP_KMS_SENSITIVE_PROPERTY_PROVIDER_CONF property is
+     * configured to the bootstrap-gcp.conf file. Also will load bootstrap-gcp.conf to {@link #gcpBootstrapProperties}.
+     * @param bootstrapProperties BootstrapProperties object corresponding to bootstrap.conf.
+     * @return BootstrapProperties object corresponding to bootstrap-gcp.conf, null otherwise.
+     */
+    private BootstrapProperties getGCPBootstrapProperties(final BootstrapProperties bootstrapProperties) {
+        final BootstrapProperties cloudBootstrapProperties;
+
+        // Load the bootstrap-gcp.conf file based on path specified in
+        // "nifi.bootstrap.protection.gcp.kms.conf" property of bootstrap.conf
+        final String filePath = bootstrapProperties.getProperty(BootstrapPropertyKey.GCP_KMS_SENSITIVE_PROPERTY_PROVIDER_CONF).orElse(null);
+        if (StringUtils.isBlank(filePath)) {
+            logger.warn("GCP KMS properties file path not configured in bootstrap properties");
+            return null;
+        }
+
+        try {
+            cloudBootstrapProperties = AbstractBootstrapPropertiesLoader.loadBootstrapProperties(
+                    Paths.get(filePath), GCP_PREFIX);
+        } catch (final IOException e) {
+            throw new SensitivePropertyProtectionException("Could not load " + filePath, e);
+        }
+
+        return cloudBootstrapProperties;
+    }
+
+    /**
+     * Checks bootstrap-gcp.conf for the required configurations for Google Cloud KMS encrypt/decrypt operations.
+     * @return true if bootstrap-gcp.conf contains the required properties for GCP KMS SPP, false otherwise.
+     */
+    private boolean hasRequiredGCPProperties() {
+        if (gcpBootstrapProperties == null) {
+            return false;
+        }
+
+        final String projectId = gcpBootstrapProperties.getProperty(PROJECT_ID_PROPS_NAME);
+        final String locationId = gcpBootstrapProperties.getProperty(LOCATION_ID_PROPS_NAME);
+        final String keyRingId = gcpBootstrapProperties.getProperty(KEYRING_ID_PROPS_NAME);
+        final String keyId = gcpBootstrapProperties.getProperty(KEY_ID_PROPS_NAME);
+
+        // Note: the following does not verify if the properties are valid properties, they only verify if
+        // the properties are configured in bootstrap-gcp.conf.
+        return StringUtils.isNoneBlank(projectId, locationId, keyRingId, keyId);
+    }
+
+    @Override
+    public boolean isSupported() {
+        return hasRequiredGCPProperties();
+    }
+
+    @Override
+    protected PropertyProtectionScheme getProtectionScheme() {
+        return PropertyProtectionScheme.GCP_KMS;
+    }
+
+    /**
+     * Returns the name of the underlying implementation.
+     *
+     * @return the name of this sensitive property provider.
+     */
+    @Override
+    public String getName() {
+        return PropertyProtectionScheme.GCP_KMS.getName();
+    }
+
+    /**
+     * Returns the key used to identify the provider implementation in {@code nifi.properties}.
+     *
+     * @return the key to persist in the sibling property.
+     */
+    @Override
+    public String getIdentifierKey() {
+        return PropertyProtectionScheme.GCP_KMS.getIdentifier();
+    }
+
+    /**
+     * Returns the ciphertext blob of this value encrypted using a key stored in GCP KMS.
+     * @return the ciphertext blob to persist in the {@code nifi.properties} file.
+     */
+    private byte[] encrypt(final byte[] input) throws IOException {
+        final EncryptResponse response = client.encrypt(keyName, ByteString.copyFrom(input));
+        return response.getCiphertext().toByteArray();
+    }
+
+    /**
+     * Returns the value corresponding to a ciphertext blob decrypted using a key stored in GCP KMS.
+     * @return the "unprotected" byte[] of this value, which could be used by the application.
+     */
+    private byte[] decrypt(final byte[] input) throws IOException {
+        final DecryptResponse response = client.decrypt(keyName, ByteString.copyFrom(input));
+        return response.getPlaintext().toByteArray();
+    }
+
+    /**
+     * Checks if the client is open and if not, initializes the client and validates the key required for GCP KMS.
+     */
+    private void checkAndInitializeClient() throws SensitivePropertyProtectionException {
+        if (client == null) {
+            try {
+                initializeClient();
+                validate();
+            } catch (final SensitivePropertyProtectionException e) {
+                throw new SensitivePropertyProtectionException("Error initializing the GCP KMS client", e);
+            }
+        }
+    }
+
+    /**
+     * Returns the "protected" form of this value. This is a form which can safely be persisted in the {@code nifi.properties} file without compromising the value.
+     * An encryption-based provider would return a cipher text, while a remote-lookup provider could return a unique ID to retrieve the secured value.

Review comment:
       > The GCP implementation uses its native KMS to generate an encryption key, which is used to protect the sensitive value.

##########
File path: nifi-commons/nifi-sensitive-property-provider/pom.xml
##########
@@ -23,7 +23,19 @@
     <artifactId>nifi-sensitive-property-provider</artifactId>
     <properties>
         <aws.sdk.version>2.17.1</aws.sdk.version>
+        <gcp.sdk.version>20.6.0</gcp.sdk.version>
     </properties>
+    <dependencyManagement>
+        <dependencies>
+            <dependency>

Review comment:
       What's the benefit of declaring this dependency, rather than just the library dependency on lines 79-82?

##########
File path: nifi-commons/nifi-sensitive-property-provider/src/main/java/org/apache/nifi/properties/GCPSensitivePropertyProvider.java
##########
@@ -0,0 +1,288 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.properties;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.properties.BootstrapProperties.BootstrapPropertyKey;
+
+import com.google.api.gax.rpc.ApiException;
+import com.google.cloud.kms.v1.CryptoKey;
+import com.google.cloud.kms.v1.CryptoKeyName;
+import com.google.cloud.kms.v1.CryptoKeyVersion;
+import com.google.cloud.kms.v1.DecryptResponse;
+import com.google.cloud.kms.v1.EncryptResponse;
+import com.google.cloud.kms.v1.KeyManagementServiceClient;
+import com.google.protobuf.ByteString;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Base64;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Paths;
+import java.util.Objects;
+
+public class GCPSensitivePropertyProvider extends AbstractSensitivePropertyProvider {
+    private static final Logger logger = LoggerFactory.getLogger(GCPSensitivePropertyProvider.class);
+
+    private static final String GCP_PREFIX = "gcp";
+    private static final String PROJECT_ID_PROPS_NAME = "gcp.kms.project";
+    private static final String LOCATION_ID_PROPS_NAME = "gcp.kms.location";
+    private static final String KEYRING_ID_PROPS_NAME = "gcp.kms.keyring";
+    private static final String KEY_ID_PROPS_NAME = "gcp.kms.key";
+
+    private static final Charset PROPERTY_CHARSET = StandardCharsets.UTF_8;
+
+    private final BootstrapProperties gcpBootstrapProperties;
+    private KeyManagementServiceClient client;
+    private CryptoKeyName keyName;
+
+    GCPSensitivePropertyProvider(final BootstrapProperties bootstrapProperties) {
+        super(bootstrapProperties);
+        Objects.requireNonNull(bootstrapProperties, "The file bootstrap.conf provided to GCP SPP is null");
+        gcpBootstrapProperties = getGCPBootstrapProperties(bootstrapProperties);
+        loadRequiredGCPProperties(gcpBootstrapProperties);
+    }
+
+    /**
+     * Initializes the GCP KMS Client to be used for encrypt, decrypt and other interactions with GCP Cloud KMS.
+     * Note: This does not verify if credentials are valid.
+     */
+    private void initializeClient() {
+        try {
+            client = KeyManagementServiceClient.create();
+        } catch (final IOException e) {
+            final String msg = "Encountered an error initializing GCP Cloud KMS client";
+            throw new SensitivePropertyProtectionException(msg, e);
+        }
+    }
+
+    /**
+     * Validates the key details provided by the user.
+     */
+    private void validate() throws ApiException, SensitivePropertyProtectionException {
+        if (client == null) {
+            final String msg = "The GCP KMS client failed to open, cannot validate key";
+            throw new SensitivePropertyProtectionException(msg);
+        }
+        if (keyName == null) {
+            final String msg = "The GCP KMS key provided is not provided/complete";
+            throw new SensitivePropertyProtectionException(msg);
+        }
+        final CryptoKey key;
+        final CryptoKeyVersion keyVersion;
+        try {
+            key = client.getCryptoKey(keyName);
+            keyVersion = client.getCryptoKeyVersion(key.getPrimary().getName());
+        } catch (final ApiException e) {
+            throw new SensitivePropertyProtectionException("Encountered an error while fetching key details", e);
+        }
+
+        if (keyVersion.getState() != CryptoKeyVersion.CryptoKeyVersionState.ENABLED) {
+            throw new SensitivePropertyProtectionException("The key is not enabled");
+        }
+    }
+
+    /**
+     * Checks if we have the required key properties for GCP Cloud KMS and loads it into {@link #keyName}.
+     * Will load null if key is not present.
+     * Note: This function does not verify if the key is correctly formatted/valid.
+     * @param props the properties representing bootstrap-gcp.conf.
+     */
+    private void loadRequiredGCPProperties(final BootstrapProperties props) {
+        if (props != null) {
+            final String projectId = props.getProperty(PROJECT_ID_PROPS_NAME);
+            final String locationId = props.getProperty(LOCATION_ID_PROPS_NAME);
+            final String keyRingId = props.getProperty(KEYRING_ID_PROPS_NAME);
+            final String keyId = props.getProperty(KEY_ID_PROPS_NAME);
+            if (StringUtils.isNoneBlank(projectId, locationId, keyRingId, keyId)) {
+                keyName = CryptoKeyName.of(projectId, locationId, keyRingId, keyId);
+            }
+        }
+    }
+
+    /**
+     * Checks bootstrap.conf to check if BootstrapPropertyKey.GCP_KMS_SENSITIVE_PROPERTY_PROVIDER_CONF property is
+     * configured to the bootstrap-gcp.conf file. Also will load bootstrap-gcp.conf to {@link #gcpBootstrapProperties}.
+     * @param bootstrapProperties BootstrapProperties object corresponding to bootstrap.conf.
+     * @return BootstrapProperties object corresponding to bootstrap-gcp.conf, null otherwise.
+     */
+    private BootstrapProperties getGCPBootstrapProperties(final BootstrapProperties bootstrapProperties) {
+        final BootstrapProperties cloudBootstrapProperties;
+
+        // Load the bootstrap-gcp.conf file based on path specified in
+        // "nifi.bootstrap.protection.gcp.kms.conf" property of bootstrap.conf
+        final String filePath = bootstrapProperties.getProperty(BootstrapPropertyKey.GCP_KMS_SENSITIVE_PROPERTY_PROVIDER_CONF).orElse(null);
+        if (StringUtils.isBlank(filePath)) {
+            logger.warn("GCP KMS properties file path not configured in bootstrap properties");
+            return null;
+        }
+
+        try {
+            cloudBootstrapProperties = AbstractBootstrapPropertiesLoader.loadBootstrapProperties(
+                    Paths.get(filePath), GCP_PREFIX);
+        } catch (final IOException e) {
+            throw new SensitivePropertyProtectionException("Could not load " + filePath, e);
+        }
+
+        return cloudBootstrapProperties;
+    }
+
+    /**
+     * Checks bootstrap-gcp.conf for the required configurations for Google Cloud KMS encrypt/decrypt operations.
+     * @return true if bootstrap-gcp.conf contains the required properties for GCP KMS SPP, false otherwise.
+     */
+    private boolean hasRequiredGCPProperties() {
+        if (gcpBootstrapProperties == null) {
+            return false;
+        }
+
+        final String projectId = gcpBootstrapProperties.getProperty(PROJECT_ID_PROPS_NAME);
+        final String locationId = gcpBootstrapProperties.getProperty(LOCATION_ID_PROPS_NAME);
+        final String keyRingId = gcpBootstrapProperties.getProperty(KEYRING_ID_PROPS_NAME);
+        final String keyId = gcpBootstrapProperties.getProperty(KEY_ID_PROPS_NAME);
+
+        // Note: the following does not verify if the properties are valid properties, they only verify if
+        // the properties are configured in bootstrap-gcp.conf.
+        return StringUtils.isNoneBlank(projectId, locationId, keyRingId, keyId);
+    }
+
+    @Override
+    public boolean isSupported() {
+        return hasRequiredGCPProperties();
+    }
+
+    @Override
+    protected PropertyProtectionScheme getProtectionScheme() {
+        return PropertyProtectionScheme.GCP_KMS;
+    }
+
+    /**
+     * Returns the name of the underlying implementation.
+     *
+     * @return the name of this sensitive property provider.
+     */
+    @Override
+    public String getName() {
+        return PropertyProtectionScheme.GCP_KMS.getName();
+    }
+
+    /**
+     * Returns the key used to identify the provider implementation in {@code nifi.properties}.
+     *
+     * @return the key to persist in the sibling property.
+     */
+    @Override
+    public String getIdentifierKey() {
+        return PropertyProtectionScheme.GCP_KMS.getIdentifier();
+    }
+
+    /**
+     * Returns the ciphertext blob of this value encrypted using a key stored in GCP KMS.
+     * @return the ciphertext blob to persist in the {@code nifi.properties} file.
+     */
+    private byte[] encrypt(final byte[] input) throws IOException {
+        final EncryptResponse response = client.encrypt(keyName, ByteString.copyFrom(input));
+        return response.getCiphertext().toByteArray();
+    }
+
+    /**
+     * Returns the value corresponding to a ciphertext blob decrypted using a key stored in GCP KMS.
+     * @return the "unprotected" byte[] of this value, which could be used by the application.
+     */
+    private byte[] decrypt(final byte[] input) throws IOException {
+        final DecryptResponse response = client.decrypt(keyName, ByteString.copyFrom(input));
+        return response.getPlaintext().toByteArray();
+    }
+
+    /**
+     * Checks if the client is open and if not, initializes the client and validates the key required for GCP KMS.
+     */
+    private void checkAndInitializeClient() throws SensitivePropertyProtectionException {
+        if (client == null) {
+            try {
+                initializeClient();
+                validate();
+            } catch (final SensitivePropertyProtectionException e) {
+                throw new SensitivePropertyProtectionException("Error initializing the GCP KMS client", e);
+            }
+        }
+    }
+
+    /**
+     * Returns the "protected" form of this value. This is a form which can safely be persisted in the {@code nifi.properties} file without compromising the value.
+     * An encryption-based provider would return a cipher text, while a remote-lookup provider could return a unique ID to retrieve the secured value.
+     *
+     * @param unprotectedValue the sensitive value.
+     * @param context The context of the value (ignored in this implementation)
+     * @return the value to persist in the {@code nifi.properties} file.
+     */
+    @Override
+    public String protect(final String unprotectedValue, final ProtectedPropertyContext context) throws SensitivePropertyProtectionException {
+        if (StringUtils.isBlank(unprotectedValue)) {
+            throw new IllegalArgumentException("Cannot encrypt a blank value");
+        }
+
+        checkAndInitializeClient();
+
+        try {
+            byte[] plainBytes = unprotectedValue.getBytes(PROPERTY_CHARSET);
+            byte[] cipherBytes = encrypt(plainBytes);
+            return Base64.getEncoder().encodeToString(cipherBytes);
+        } catch (final IOException | ApiException e) {
+            throw new SensitivePropertyProtectionException("Encrypt failed", e);
+        }
+    }
+
+    /**
+     * Returns the "unprotected" form of this value. This is the raw sensitive value which is used by the application logic.
+     * An encryption-based provider would decrypt a cipher text and return the plaintext, while a remote-lookup provider could retrieve the secured value.

Review comment:
       > The GCP implementation uses its native KMS to retrieve the encryption key, which is used to recover the plaintext.
   
   Since we're in the context of a particular provider, I think we can be more specific here.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org