You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by pv...@apache.org on 2021/12/11 18:58:17 UTC

[nifi] branch main updated: NIFI-9343: Adding config verification to GCP processors

This is an automated email from the ASF dual-hosted git repository.

pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new f3c7537  NIFI-9343: Adding config verification to GCP processors
f3c7537 is described below

commit f3c7537d9b8f84a4769ce8eba74a551dce3836a0
Author: Joe Gresock <jg...@gmail.com>
AuthorDate: Thu Oct 28 10:59:39 2021 -0400

    NIFI-9343: Adding config verification to GCP processors
    
    Signed-off-by: Pierre Villard <pi...@gmail.com>
    
    This closes #5494.
---
 .../nifi/processors/gcp/AbstractGCPProcessor.java  |  66 ++++-
 .../gcp/bigquery/AbstractBigQueryProcessor.java    |  83 +++++--
 .../processors/gcp/bigquery/PutBigQueryBatch.java  |  48 ++--
 .../gcp/bigquery/PutBigQueryStreaming.java         |  47 ++--
 .../service/GCPCredentialsControllerService.java   |  39 ++-
 .../gcp/pubsub/AbstractGCPubSubProcessor.java      |   5 +-
 .../processors/gcp/pubsub/ConsumeGCPubSub.java     |  82 ++++++-
 .../processors/gcp/pubsub/PublishGCPubSub.java     |  77 +++++-
 .../gcp/storage/AbstractGCSProcessor.java          |  73 +++++-
 .../processors/gcp/storage/DeleteGCSObject.java    |   6 +
 .../processors/gcp/storage/FetchGCSObject.java     | 145 ++++++++---
 .../nifi/processors/gcp/storage/ListGCSBucket.java | 271 ++++++++++++++++-----
 .../nifi/processors/gcp/storage/PutGCSObject.java  |   7 +-
 .../gcp/bigquery/PutBigQueryBatchTest.java         |  21 +-
 .../processors/gcp/storage/AbstractGCSTest.java    |   2 +-
 .../processors/gcp/storage/FetchGCSObjectIT.java   |  13 +-
 .../processors/gcp/storage/FetchGCSObjectTest.java |  36 ++-
 .../processors/gcp/storage/ListGCSBucketTest.java  |  53 ++++
 .../nifi-gcp-bundle/nifi-gcp-services-api/pom.xml  |   9 +
 19 files changed, 867 insertions(+), 216 deletions(-)

diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/AbstractGCPProcessor.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/AbstractGCPProcessor.java
index 626b5d2..9f98438 100644
--- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/AbstractGCPProcessor.java
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/AbstractGCPProcessor.java
@@ -22,18 +22,22 @@ import com.google.cloud.ServiceOptions;
 import com.google.cloud.TransportOptions;
 import com.google.cloud.http.HttpTransportOptions;
 import com.google.common.collect.ImmutableList;
-
 import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.ConfigVerificationResult;
+import org.apache.nifi.components.ConfigVerificationResult.Outcome;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.expression.ExpressionLanguageScope;
 import org.apache.nifi.gcp.credentials.service.GCPCredentialsService;
+import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.processor.AbstractProcessor;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.proxy.ProxyConfiguration;
 
 import java.net.Proxy;
+import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 
 /**
  * Abstract base class for gcp processors.
@@ -122,17 +126,46 @@ public abstract class AbstractGCPProcessor<
     @Override
     public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
         return ImmutableList.of(
-            PROJECT_ID,
-            GCP_CREDENTIALS_PROVIDER_SERVICE,
-            RETRY_COUNT,
-            PROXY_HOST,
-            PROXY_PORT,
-            HTTP_PROXY_USERNAME,
-            HTTP_PROXY_PASSWORD,
-            ProxyConfiguration.createProxyConfigPropertyDescriptor(true, ProxyAwareTransportFactory.PROXY_SPECS)
+                PROJECT_ID,
+                GCP_CREDENTIALS_PROVIDER_SERVICE,
+                RETRY_COUNT,
+                PROXY_HOST,
+                PROXY_PORT,
+                HTTP_PROXY_USERNAME,
+                HTTP_PROXY_PASSWORD,
+                ProxyConfiguration.createProxyConfigPropertyDescriptor(true, ProxyAwareTransportFactory.PROXY_SPECS)
         );
     }
 
+    /**
+     * Verifies the cloud service configuration.  This is in a separate method rather than implementing VerifiableProcessor due to type erasure.
+     * @param context The process context
+     * @param verificationLogger Logger for verification
+     * @param attributes Additional attributes
+     * @return The verification results
+     */
+    protected List<ConfigVerificationResult> verifyCloudService(final ProcessContext context, final ComponentLog verificationLogger, final Map<String, String> attributes) {
+
+        ConfigVerificationResult result = null;
+        try {
+            final CloudService cloudService = getCloudService(context);
+            if (cloudService != null) {
+                result = new ConfigVerificationResult.Builder()
+                        .verificationStepName("Configure Cloud Service")
+                        .outcome(Outcome.SUCCESSFUL)
+                        .explanation(String.format("Successfully configured Cloud Service [%s]", cloudService.getClass().getSimpleName()))
+                        .build();
+            }
+        } catch (final Exception e) {
+            verificationLogger.error("Failed to configure Cloud Service", e);
+            result = new ConfigVerificationResult.Builder()
+                    .verificationStepName("Configure Cloud Service")
+                    .outcome(Outcome.FAILED)
+                    .explanation(String.format("Failed to configure Cloud Service [%s]: %s", cloudService.getClass().getSimpleName(), e.getMessage()))
+                    .build();
+        }
+        return result == null ? Collections.emptyList() : Collections.singletonList(result);
+    }
 
     /**
      * Retrieve credentials from the {@link GCPCredentialsService} attached to this processor.
@@ -147,13 +180,22 @@ public abstract class AbstractGCPProcessor<
     }
 
     /**
+     * Returns the cloud client service constructed based on the context.
+     * @param context the process context
+     * @return The constructed cloud client service
+     */
+    protected CloudService getCloudService(final ProcessContext context) {
+        final CloudServiceOptions options = getServiceOptions(context, getGoogleCredentials(context));
+        return options != null ? options.getService() : null;
+    }
+
+    /**
      * Assigns the cloud service client on scheduling.
      * @param context the process context provided on scheduling the processor.
      */
     @OnScheduled
-    public void onScheduled(ProcessContext context) {
-        final CloudServiceOptions options = getServiceOptions(context, getGoogleCredentials(context));
-        this.cloudService = options != null ? options.getService() : null;
+    public void onScheduled(final ProcessContext context) {
+        this.cloudService = getCloudService(context);
     }
 
     /**
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/AbstractBigQueryProcessor.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/AbstractBigQueryProcessor.java
index b2dc43a..78c845d 100644
--- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/AbstractBigQueryProcessor.java
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/AbstractBigQueryProcessor.java
@@ -17,39 +17,46 @@
 
 package org.apache.nifi.processors.gcp.bigquery;
 
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
+import com.google.api.gax.retrying.RetrySettings;
+import com.google.auth.oauth2.GoogleCredentials;
+import com.google.cloud.BaseServiceException;
+import com.google.cloud.bigquery.BigQuery;
+import com.google.cloud.bigquery.BigQueryOptions;
+import com.google.cloud.bigquery.TableId;
+import com.google.common.collect.ImmutableList;
+import org.apache.nifi.components.ConfigVerificationResult;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.ValidationContext;
 import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.VerifiableProcessor;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.processors.gcp.AbstractGCPProcessor;
 import org.apache.nifi.processors.gcp.ProxyAwareTransportFactory;
 import org.apache.nifi.proxy.ProxyConfiguration;
 import org.apache.nifi.util.StringUtils;
 
-import com.google.api.gax.retrying.RetrySettings;
-import com.google.auth.oauth2.GoogleCredentials;
-import com.google.cloud.bigquery.BigQuery;
-import com.google.cloud.bigquery.BigQueryOptions;
-import com.google.common.collect.ImmutableList;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 
 /**
  * Base class for creating processors that connect to GCP BiqQuery service
  */
-public abstract class AbstractBigQueryProcessor extends AbstractGCPProcessor<BigQuery, BigQueryOptions> {
+public abstract class AbstractBigQueryProcessor extends AbstractGCPProcessor<BigQuery, BigQueryOptions> implements VerifiableProcessor  {
 
     static final int BUFFER_SIZE = 65536;
 
+    private static final List<String> REQUIRED_PERMISSIONS = Collections.singletonList("bigquery.tables.updateData");
+
     public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success")
             .description("FlowFiles are routed to this relationship after a successful Google BigQuery operation.")
             .build();
@@ -123,6 +130,40 @@ public abstract class AbstractBigQueryProcessor extends AbstractGCPProcessor<Big
     }
 
     @Override
+    public List<ConfigVerificationResult> verify(final ProcessContext context, final ComponentLog verificationLogger, final Map<String, String> attributes) {
+        final List<ConfigVerificationResult> results = new ArrayList<>(verifyCloudService(context, verificationLogger, attributes));
+
+        final BigQuery bigQuery = getCloudService(context);
+        if (bigQuery != null) {
+            try {
+                final TableId tableId = getTableId(context, attributes);
+                if (bigQuery.testIamPermissions(tableId, REQUIRED_PERMISSIONS).size() >= REQUIRED_PERMISSIONS.size()) {
+                    results.add(new ConfigVerificationResult.Builder()
+                            .verificationStepName("Test IAM Permissions")
+                            .outcome(ConfigVerificationResult.Outcome.SUCCESSFUL)
+                            .explanation(String.format("Verified BigQuery Table [%s] exists and the configured user has the correct permissions.", tableId))
+                            .build());
+                } else {
+                    results.add(new ConfigVerificationResult.Builder()
+                            .verificationStepName("Test IAM Permissions")
+                            .outcome(ConfigVerificationResult.Outcome.FAILED)
+                            .explanation(String.format("The configured user does not have the correct permissions on BigQuery Table [%s].", tableId))
+                            .build());
+                }
+            } catch (final BaseServiceException e) {
+                verificationLogger.error("The configured user appears to have the correct permissions, but the following error was encountered", e);
+                results.add(new ConfigVerificationResult.Builder()
+                        .verificationStepName("Test IAM Permissions")
+                        .outcome(ConfigVerificationResult.Outcome.FAILED)
+                        .explanation(String.format("The configured user appears to have the correct permissions, but the following error was encountered: " + e.getMessage()))
+                        .build());
+            }
+        }
+
+        return results;
+    }
+
+    @Override
     protected final Collection<ValidationResult> customValidate(ValidationContext validationContext) {
         final Collection<ValidationResult> results = new ArrayList<ValidationResult>(super.customValidate(validationContext));
         ProxyConfiguration.validateProxySpec(validationContext, results, ProxyAwareTransportFactory.PROXY_SPECS);
@@ -147,4 +188,18 @@ public abstract class AbstractBigQueryProcessor extends AbstractGCPProcessor<Big
     protected void customValidate(ValidationContext validationContext, Collection<ValidationResult> results) {
     }
 
+    protected TableId getTableId(final ProcessContext context, final Map<String, String> attributes) {
+        final String projectId = context.getProperty(PROJECT_ID).evaluateAttributeExpressions().getValue();
+        final String dataset = context.getProperty(DATASET).evaluateAttributeExpressions(attributes).getValue();
+        final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(attributes).getValue();
+
+        final TableId tableId;
+        if (StringUtils.isEmpty(projectId)) {
+            tableId = TableId.of(dataset, tableName);
+        } else {
+            tableId = TableId.of(projectId, dataset, tableName);
+        }
+        return tableId;
+    }
+
 }
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatch.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatch.java
index cf4b42a..8c421e8 100644
--- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatch.java
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatch.java
@@ -17,15 +17,16 @@
 
 package org.apache.nifi.processors.gcp.bigquery;
 
-import java.nio.ByteBuffer;
-import java.nio.channels.Channels;
-import java.nio.channels.ReadableByteChannel;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
+import com.google.cloud.RetryOption;
+import com.google.cloud.bigquery.FormatOptions;
+import com.google.cloud.bigquery.Job;
+import com.google.cloud.bigquery.JobInfo;
+import com.google.cloud.bigquery.JobStatistics.LoadStatistics;
+import com.google.cloud.bigquery.Schema;
+import com.google.cloud.bigquery.TableDataWriteChannel;
+import com.google.cloud.bigquery.TableId;
+import com.google.cloud.bigquery.WriteChannelConfiguration;
+import com.google.common.collect.ImmutableList;
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.WritesAttribute;
 import org.apache.nifi.annotation.behavior.WritesAttributes;
@@ -50,16 +51,14 @@ import org.apache.nifi.util.StringUtils;
 import org.threeten.bp.Duration;
 import org.threeten.bp.temporal.ChronoUnit;
 
-import com.google.cloud.RetryOption;
-import com.google.cloud.bigquery.FormatOptions;
-import com.google.cloud.bigquery.Job;
-import com.google.cloud.bigquery.JobInfo;
-import com.google.cloud.bigquery.JobStatistics.LoadStatistics;
-import com.google.cloud.bigquery.Schema;
-import com.google.cloud.bigquery.TableDataWriteChannel;
-import com.google.cloud.bigquery.TableId;
-import com.google.cloud.bigquery.WriteChannelConfiguration;
-import com.google.common.collect.ImmutableList;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.ReadableByteChannel;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
 
 /**
  * A processor for batch loading data into a Google BigQuery table
@@ -257,17 +256,8 @@ public class PutBigQueryBatch extends AbstractBigQueryProcessor {
             return;
         }
 
-        final String projectId = context.getProperty(PROJECT_ID).evaluateAttributeExpressions().getValue();
-        final String dataset = context.getProperty(DATASET).evaluateAttributeExpressions(flowFile).getValue();
-        final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
         final String type = context.getProperty(SOURCE_TYPE).evaluateAttributeExpressions(flowFile).getValue();
-
-        final TableId tableId;
-        if (StringUtils.isEmpty(projectId)) {
-            tableId = TableId.of(dataset, tableName);
-        } else {
-            tableId = TableId.of(projectId, dataset, tableName);
-        }
+        final TableId tableId = getTableId(context, flowFile.getAttributes());
 
         try {
 
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryStreaming.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryStreaming.java
index d8d9b3d..20fc401 100644
--- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryStreaming.java
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryStreaming.java
@@ -17,19 +17,11 @@
 
 package org.apache.nifi.processors.gcp.bigquery;
 
-import java.io.InputStream;
-import java.sql.Date;
-import java.sql.Time;
-import java.sql.Timestamp;
-import java.time.Instant;
-import java.time.LocalDateTime;
-import java.time.ZoneOffset;
-import java.time.format.DateTimeFormatter;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.ArrayList;
-
+import com.google.cloud.bigquery.BigQueryError;
+import com.google.cloud.bigquery.InsertAllRequest;
+import com.google.cloud.bigquery.InsertAllResponse;
+import com.google.cloud.bigquery.TableId;
+import com.google.common.collect.ImmutableList;
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.SystemResource;
 import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
@@ -51,13 +43,19 @@ import org.apache.nifi.serialization.RecordReader;
 import org.apache.nifi.serialization.RecordReaderFactory;
 import org.apache.nifi.serialization.record.MapRecord;
 import org.apache.nifi.serialization.record.Record;
-import org.apache.nifi.util.StringUtils;
 
-import com.google.cloud.bigquery.BigQueryError;
-import com.google.cloud.bigquery.InsertAllRequest;
-import com.google.cloud.bigquery.InsertAllResponse;
-import com.google.cloud.bigquery.TableId;
-import com.google.common.collect.ImmutableList;
+import java.io.InputStream;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 
 /**
  * A processor for streaming loading data into a Google BigQuery table. It uses the BigQuery
@@ -124,17 +122,8 @@ public class PutBigQueryStreaming extends AbstractBigQueryProcessor {
         if (flowFile == null) {
             return;
         }
-
-        final String projectId = context.getProperty(PROJECT_ID).evaluateAttributeExpressions().getValue();
-        final String dataset = context.getProperty(DATASET).evaluateAttributeExpressions(flowFile).getValue();
         final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
-
-        final TableId tableId;
-        if (StringUtils.isEmpty(projectId)) {
-            tableId = TableId.of(dataset, tableName);
-        } else {
-            tableId = TableId.of(projectId, dataset, tableName);
-        }
+        final TableId tableId = getTableId(context, flowFile.getAttributes());
 
         try {
 
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/credentials/service/GCPCredentialsControllerService.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/credentials/service/GCPCredentialsControllerService.java
index ce5ada6..b161e61 100644
--- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/credentials/service/GCPCredentialsControllerService.java
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/credentials/service/GCPCredentialsControllerService.java
@@ -21,12 +21,16 @@ import com.google.auth.oauth2.GoogleCredentials;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.ConfigVerificationResult;
+import org.apache.nifi.components.ConfigVerificationResult.Outcome;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.ValidationContext;
 import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.controller.AbstractControllerService;
 import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.controller.VerifiableControllerService;
 import org.apache.nifi.gcp.credentials.service.GCPCredentialsService;
+import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processors.gcp.ProxyAwareTransportFactory;
 import org.apache.nifi.processors.gcp.credentials.factory.CredentialsFactory;
@@ -38,6 +42,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 
 import static org.apache.nifi.processors.gcp.credentials.factory.CredentialPropertyDescriptors.SERVICE_ACCOUNT_JSON;
 import static org.apache.nifi.processors.gcp.credentials.factory.CredentialPropertyDescriptors.SERVICE_ACCOUNT_JSON_FILE;
@@ -55,7 +60,7 @@ import static org.apache.nifi.processors.gcp.credentials.factory.CredentialPrope
         "a credential file, the config generated by `gcloud auth application-default login`, AppEngine/Compute Engine" +
         " service accounts, etc.")
 @Tags({ "gcp", "credentials","provider" })
-public class GCPCredentialsControllerService extends AbstractControllerService implements GCPCredentialsService {
+public class GCPCredentialsControllerService extends AbstractControllerService implements GCPCredentialsService, VerifiableControllerService {
 
     private static final List<PropertyDescriptor> properties;
 
@@ -88,17 +93,41 @@ public class GCPCredentialsControllerService extends AbstractControllerService i
         return results;
     }
 
+    @Override
+    public List<ConfigVerificationResult> verify(final ConfigurationContext context, final ComponentLog verificationLogger, final Map<String, String> variables) {
+        ConfigVerificationResult result;
+        try {
+            final GoogleCredentials credentials = getGoogleCredentials(context);
+            result = new ConfigVerificationResult.Builder()
+                    .verificationStepName("Provide Google Credentials")
+                    .outcome(Outcome.SUCCESSFUL)
+                    .explanation(String.format("Successfully provided [%s] as Google Credentials", credentials.getClass().getSimpleName()))
+                    .build();
+        } catch (final IOException e) {
+            result = new ConfigVerificationResult.Builder()
+                    .verificationStepName("Provide Google Credentials")
+                    .outcome(Outcome.FAILED)
+                    .explanation(String.format("Failed to provide Google Credentials: " + e.getMessage()))
+                    .build();
+        }
+        return Collections.singletonList(result);
+    }
+
     @OnEnabled
     public void onConfigured(final ConfigurationContext context) throws InitializationException {
         try {
-            final ProxyConfiguration proxyConfiguration = ProxyConfiguration.getConfiguration(context);
-            final HttpTransportFactory transportFactory = new ProxyAwareTransportFactory(proxyConfiguration);
-            googleCredentials = credentialsProviderFactory.getGoogleCredentials(context.getProperties(), transportFactory);
-        } catch (IOException e) {
+            googleCredentials = getGoogleCredentials(context);
+        } catch (final IOException e) {
             throw new InitializationException(e);
         }
     }
 
+    private GoogleCredentials getGoogleCredentials(final ConfigurationContext context) throws IOException {
+        final ProxyConfiguration proxyConfiguration = ProxyConfiguration.getConfiguration(context);
+        final HttpTransportFactory transportFactory = new ProxyAwareTransportFactory(proxyConfiguration);
+        return credentialsProviderFactory.getGoogleCredentials(context.getProperties(), transportFactory);
+    }
+
     @Override
     public String toString() {
         return "GCPCredentialsControllerService[id=" + getIdentifier() + "]";
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/AbstractGCPubSubProcessor.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/AbstractGCPubSubProcessor.java
index 8930a27..fc135db 100644
--- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/AbstractGCPubSubProcessor.java
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/AbstractGCPubSubProcessor.java
@@ -24,6 +24,7 @@ import org.apache.nifi.components.ValidationContext;
 import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.VerifiableProcessor;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.processors.gcp.AbstractGCPProcessor;
 
@@ -33,7 +34,7 @@ import java.util.Collections;
 import java.util.HashSet;
 import java.util.Set;
 
-public abstract class AbstractGCPubSubProcessor extends AbstractGCPProcessor {
+public abstract class AbstractGCPubSubProcessor extends AbstractGCPProcessor implements VerifiableProcessor {
 
     public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
             .name("gcp-pubsub-publish-batch-size")
@@ -69,7 +70,7 @@ public abstract class AbstractGCPubSubProcessor extends AbstractGCPProcessor {
 
     @Override
     protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
-        final Collection<ValidationResult> results = super.customValidate(validationContext);
+        final Collection<ValidationResult> results = new HashSet<>(super.customValidate(validationContext));
 
         final boolean projectId = validationContext.getProperty(PROJECT_ID).isSet();
         if (!projectId) {
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/ConsumeGCPubSub.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/ConsumeGCPubSub.java
index 70b9e26..927a595 100644
--- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/ConsumeGCPubSub.java
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/ConsumeGCPubSub.java
@@ -17,10 +17,13 @@
 package org.apache.nifi.processors.gcp.pubsub;
 
 import com.google.api.gax.core.FixedCredentialsProvider;
+import com.google.api.gax.rpc.ApiException;
+import com.google.api.pathtemplate.ValidationException;
 import com.google.cloud.pubsub.v1.stub.GrpcSubscriberStub;
 import com.google.cloud.pubsub.v1.stub.SubscriberStub;
 import com.google.cloud.pubsub.v1.stub.SubscriberStubSettings;
 import com.google.common.collect.ImmutableList;
+import com.google.iam.v1.TestIamPermissionsRequest;
 import com.google.pubsub.v1.AcknowledgeRequest;
 import com.google.pubsub.v1.ProjectSubscriptionName;
 import com.google.pubsub.v1.PullRequest;
@@ -34,9 +37,12 @@ import org.apache.nifi.annotation.documentation.SeeAlso;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.annotation.lifecycle.OnScheduled;
 import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.ConfigVerificationResult;
+import org.apache.nifi.components.ConfigVerificationResult.Outcome;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.expression.ExpressionLanguageScope;
 import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.Relationship;
@@ -79,6 +85,8 @@ import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.SERIALIZED_
 })
 public class ConsumeGCPubSub extends AbstractGCPubSubProcessor {
 
+    private static final List<String> REQUIRED_PERMISSIONS = Collections.singletonList("pubsub.subscriptions.consume");
+
     public static final PropertyDescriptor SUBSCRIPTION = new PropertyDescriptor.Builder()
             .name("gcp-pubsub-subscription")
             .displayName("Subscription")
@@ -99,7 +107,7 @@ public class ConsumeGCPubSub extends AbstractGCPubSubProcessor {
 
         pullRequest = PullRequest.newBuilder()
                 .setMaxMessages(batchSize)
-                .setSubscription(getSubscriptionName(context))
+                .setSubscription(getSubscriptionName(context, null))
                 .build();
 
         try {
@@ -110,6 +118,70 @@ public class ConsumeGCPubSub extends AbstractGCPubSubProcessor {
         }
     }
 
+    @Override
+    public List<ConfigVerificationResult> verify(final ProcessContext context, final ComponentLog verificationLogger, final Map<String, String> attributes) {
+        final List<ConfigVerificationResult> results = new ArrayList<>();
+        String subscriptionName = null;
+        try {
+            subscriptionName = getSubscriptionName(context, attributes);
+            results.add(new ConfigVerificationResult.Builder()
+                    .verificationStepName("Parse Subscription Name")
+                    .outcome(Outcome.SUCCESSFUL)
+                    .explanation("Successfully parsed Subscription Name")
+                    .build());
+        } catch (final ValidationException e) {
+            verificationLogger.error("Failed to parse Subscription Name", e);
+            results.add(new ConfigVerificationResult.Builder()
+                    .verificationStepName("Parse Subscription Name")
+                    .outcome(Outcome.FAILED)
+                    .explanation(String.format("Failed to parse Subscription Name: " + e.getMessage()))
+                    .build());
+        }
+        SubscriberStub subscriber = null;
+        try {
+            subscriber = getSubscriber(context);
+            results.add(new ConfigVerificationResult.Builder()
+                    .verificationStepName("Create Subscriber")
+                    .outcome(Outcome.SUCCESSFUL)
+                    .explanation("Successfully created Subscriber")
+                    .build());
+        } catch (final IOException e) {
+            verificationLogger.error("Failed to create Subscriber", e);
+            results.add(new ConfigVerificationResult.Builder()
+                    .verificationStepName("Create Subscriber")
+                    .outcome(Outcome.FAILED)
+                    .explanation(String.format("Failed to create Subscriber: " + e.getMessage()))
+                    .build());
+        }
+
+        if (subscriber != null && subscriptionName != null) {
+            try {
+                final TestIamPermissionsRequest request = TestIamPermissionsRequest.newBuilder().addAllPermissions(REQUIRED_PERMISSIONS).setResource(subscriptionName).build();
+                if (subscriber.testIamPermissionsCallable().call(request).getPermissionsCount() >= REQUIRED_PERMISSIONS.size()) {
+                    results.add(new ConfigVerificationResult.Builder()
+                            .verificationStepName("Test IAM Permissions")
+                            .outcome(ConfigVerificationResult.Outcome.SUCCESSFUL)
+                            .explanation(String.format("Verified Subscription [%s] exists and the configured user has the correct permissions.", subscriptionName))
+                            .build());
+                } else {
+                    results.add(new ConfigVerificationResult.Builder()
+                            .verificationStepName("Test IAM Permissions")
+                            .outcome(ConfigVerificationResult.Outcome.FAILED)
+                            .explanation(String.format("The configured user does not have the correct permissions on Subscription [%s].", subscriptionName))
+                            .build());
+                }
+            } catch (final ApiException e) {
+                verificationLogger.error("The configured user appears to have the correct permissions, but the following error was encountered", e);
+                results.add(new ConfigVerificationResult.Builder()
+                        .verificationStepName("Test IAM Permissions")
+                        .outcome(ConfigVerificationResult.Outcome.FAILED)
+                        .explanation(String.format("The configured user appears to have the correct permissions, but the following error was encountered: " + e.getMessage()))
+                        .build());
+            }
+        }
+        return results;
+    }
+
     @OnStopped
     public void onStopped() {
         if (subscriber != null) {
@@ -145,7 +217,7 @@ public class ConsumeGCPubSub extends AbstractGCPubSubProcessor {
 
         final PullResponse pullResponse = subscriber.pullCallable().call(pullRequest);
         final List<String> ackIds = new ArrayList<>();
-        final String subscriptionName = getSubscriptionName(context);
+        final String subscriptionName = getSubscriptionName(context, null);
 
         for (ReceivedMessage message : pullResponse.getReceivedMessagesList()) {
             if (message.hasMessage()) {
@@ -184,9 +256,9 @@ public class ConsumeGCPubSub extends AbstractGCPubSubProcessor {
         subscriber.acknowledgeCallable().call(acknowledgeRequest);
     }
 
-    private String getSubscriptionName(final ProcessContext context) {
-        final String subscriptionName = context.getProperty(SUBSCRIPTION).evaluateAttributeExpressions().getValue();
-        final String projectId = context.getProperty(PROJECT_ID).evaluateAttributeExpressions().getValue();
+    private String getSubscriptionName(final ProcessContext context, final Map<String, String> additionalAttributes) {
+        final String subscriptionName = context.getProperty(SUBSCRIPTION).evaluateAttributeExpressions(additionalAttributes).getValue();
+        final String projectId = context.getProperty(PROJECT_ID).evaluateAttributeExpressions(additionalAttributes).getValue();
 
         if (subscriptionName.contains("/")) {
             return ProjectSubscriptionName.parse(subscriptionName).toString();
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/PublishGCPubSub.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/PublishGCPubSub.java
index 5ad4e76..71ba4ad 100644
--- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/PublishGCPubSub.java
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/PublishGCPubSub.java
@@ -19,18 +19,23 @@ package org.apache.nifi.processors.gcp.pubsub;
 import com.google.api.core.ApiFuture;
 import com.google.api.gax.batching.BatchingSettings;
 import com.google.api.gax.core.FixedCredentialsProvider;
+import com.google.api.gax.rpc.ApiException;
 import com.google.api.gax.rpc.DeadlineExceededException;
 import com.google.cloud.pubsub.v1.Publisher;
+import com.google.cloud.pubsub.v1.stub.GrpcPublisherStub;
+import com.google.cloud.pubsub.v1.stub.PublisherStubSettings;
 import com.google.common.collect.ImmutableList;
+import com.google.iam.v1.TestIamPermissionsRequest;
+import com.google.iam.v1.TestIamPermissionsResponse;
 import com.google.protobuf.ByteString;
 import com.google.protobuf.Timestamp;
 import com.google.pubsub.v1.ProjectTopicName;
 import com.google.pubsub.v1.PubsubMessage;
 import org.apache.nifi.annotation.behavior.DynamicProperty;
 import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
 import org.apache.nifi.annotation.behavior.SystemResource;
 import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
-import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
 import org.apache.nifi.annotation.behavior.WritesAttribute;
 import org.apache.nifi.annotation.behavior.WritesAttributes;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
@@ -38,9 +43,12 @@ import org.apache.nifi.annotation.documentation.SeeAlso;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.annotation.lifecycle.OnScheduled;
 import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.ConfigVerificationResult;
+import org.apache.nifi.components.ConfigVerificationResult.Outcome;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.expression.ExpressionLanguageScope;
 import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.Relationship;
@@ -80,6 +88,7 @@ import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.TOPIC_NAME_
 @SystemResourceConsideration(resource = SystemResource.MEMORY, description = "The entirety of the FlowFile's content "
         + "will be read into memory to be sent as a PubSub message.")
 public class PublishGCPubSub extends AbstractGCPubSubProcessor{
+    private static final List<String> REQUIRED_PERMISSIONS = Collections.singletonList("pubsub.topics.publish");
 
     public static final PropertyDescriptor TOPIC_NAME = new PropertyDescriptor.Builder()
             .name("gcp-pubsub-topic")
@@ -137,6 +146,72 @@ public class PublishGCPubSub extends AbstractGCPubSubProcessor{
     }
 
     @Override
+    public List<ConfigVerificationResult> verify(final ProcessContext context, final ComponentLog verificationLogger, final Map<String, String> attributes) {
+        final List<ConfigVerificationResult> results = new ArrayList<>();
+        Publisher publisher = null;
+        try {
+            publisher = getPublisherBuilder(context).build();
+            results.add(new ConfigVerificationResult.Builder()
+                    .verificationStepName("Create Publisher")
+                    .outcome(Outcome.SUCCESSFUL)
+                    .explanation("Successfully created Publisher")
+                    .build());
+        } catch (final IOException e) {
+            verificationLogger.error("Failed to create Publisher", e);
+            results.add(new ConfigVerificationResult.Builder()
+                    .verificationStepName("Create Publisher")
+                    .outcome(Outcome.FAILED)
+                    .explanation(String.format("Failed to create Publisher: " + e.getMessage()))
+                    .build());
+        }
+
+        if (publisher != null) {
+            try {
+                final PublisherStubSettings publisherStubSettings = PublisherStubSettings.newBuilder()
+                        .setCredentialsProvider(FixedCredentialsProvider.create(getGoogleCredentials(context)))
+                        .build();
+
+                final GrpcPublisherStub publisherStub = GrpcPublisherStub.create(publisherStubSettings);
+                final String topicName = context.getProperty(TOPIC_NAME).evaluateAttributeExpressions().getValue();
+                final TestIamPermissionsRequest request = TestIamPermissionsRequest.newBuilder()
+                        .addAllPermissions(REQUIRED_PERMISSIONS)
+                        .setResource(topicName)
+                        .build();
+                final TestIamPermissionsResponse response = publisherStub.testIamPermissionsCallable().call(request);
+                if (response.getPermissionsCount() >= REQUIRED_PERMISSIONS.size()) {
+                    results.add(new ConfigVerificationResult.Builder()
+                            .verificationStepName("Test IAM Permissions")
+                            .outcome(ConfigVerificationResult.Outcome.SUCCESSFUL)
+                            .explanation(String.format("Verified Topic [%s] exists and the configured user has the correct permissions.", topicName))
+                            .build());
+                } else {
+                    results.add(new ConfigVerificationResult.Builder()
+                            .verificationStepName("Test IAM Permissions")
+                            .outcome(ConfigVerificationResult.Outcome.FAILED)
+                            .explanation(String.format("The configured user does not have the correct permissions on Topic [%s].", topicName))
+                            .build());
+                }
+            } catch (final ApiException e) {
+                verificationLogger.error("The configured user appears to have the correct permissions, but the following error was encountered", e);
+                results.add(new ConfigVerificationResult.Builder()
+                        .verificationStepName("Test IAM Permissions")
+                        .outcome(ConfigVerificationResult.Outcome.FAILED)
+                        .explanation(String.format("The configured user appears to have the correct permissions, but the following error was encountered: " + e.getMessage()))
+                        .build());
+            } catch (final IOException e) {
+                verificationLogger.error("The publisher stub could not be created in order to test the permissions", e);
+                results.add(new ConfigVerificationResult.Builder()
+                        .verificationStepName("Test IAM Permissions")
+                        .outcome(ConfigVerificationResult.Outcome.FAILED)
+                        .explanation(String.format("The publisher stub could not be created in order to test the permissions: " + e.getMessage()))
+                        .build());
+
+            }
+        }
+        return results;
+    }
+
+    @Override
     public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
         final int flowFileCount = context.getProperty(BATCH_SIZE).asInteger();
         final List<FlowFile> flowFiles = session.get(flowFileCount);
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/AbstractGCSProcessor.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/AbstractGCSProcessor.java
index 81af2ad..0c82c62 100644
--- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/AbstractGCSProcessor.java
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/AbstractGCSProcessor.java
@@ -16,34 +16,39 @@
  */
 package org.apache.nifi.processors.gcp.storage;
 
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
+import com.google.api.gax.retrying.RetrySettings;
+import com.google.auth.oauth2.GoogleCredentials;
+import com.google.cloud.BaseServiceException;
+import com.google.cloud.storage.Storage;
+import com.google.cloud.storage.StorageOptions;
+import com.google.common.collect.ImmutableList;
+import org.apache.nifi.components.ConfigVerificationResult;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.ValidationContext;
 import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.VerifiableProcessor;
 import org.apache.nifi.processors.gcp.AbstractGCPProcessor;
 import org.apache.nifi.processors.gcp.ProxyAwareTransportFactory;
 import org.apache.nifi.proxy.ProxyConfiguration;
 
-import com.google.api.gax.retrying.RetrySettings;
-import com.google.auth.oauth2.GoogleCredentials;
-import com.google.cloud.storage.Storage;
-import com.google.cloud.storage.StorageOptions;
-import com.google.common.collect.ImmutableList;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 
 /**
  * Base class for creating processors which connect to Google Cloud Storage.
  *
  * Every GCS processor operation requires a bucket, whether it's reading or writing from said bucket.
  */
-public abstract class AbstractGCSProcessor extends AbstractGCPProcessor<Storage, StorageOptions> {
+public abstract class AbstractGCSProcessor extends AbstractGCPProcessor<Storage, StorageOptions> implements VerifiableProcessor {
     public static final Relationship REL_SUCCESS =
             new Relationship.Builder().name("success")
                     .description("FlowFiles are routed to this relationship after a successful Google Cloud Storage operation.")
@@ -69,6 +74,39 @@ public abstract class AbstractGCSProcessor extends AbstractGCPProcessor<Storage,
     }
 
     @Override
+    public List<ConfigVerificationResult> verify(final ProcessContext context, final ComponentLog verificationLogger, final Map<String, String> attributes) {
+        final List<ConfigVerificationResult> results = new ArrayList<>(verifyCloudService(context, verificationLogger, attributes));
+        final Storage storage = getCloudService(context);
+        if (storage != null) {
+            try {
+                final String bucket = getBucketName(context, attributes);
+                final List<String> requiredPermissions = getRequiredPermissions();
+                if (storage.testIamPermissions(bucket, requiredPermissions).size() >= requiredPermissions.size()) {
+                    results.add(new ConfigVerificationResult.Builder()
+                            .verificationStepName("Test IAM Permissions")
+                            .outcome(ConfigVerificationResult.Outcome.SUCCESSFUL)
+                            .explanation(String.format("Verified Bucket [%s] exists and the configured user has the correct permissions.", bucket))
+                            .build());
+                } else {
+                    results.add(new ConfigVerificationResult.Builder()
+                            .verificationStepName("Test IAM Permissions")
+                            .outcome(ConfigVerificationResult.Outcome.FAILED)
+                            .explanation(String.format("The configured user does not have the correct permissions on Bucket [%s].", bucket))
+                            .build());
+                }
+            } catch (final BaseServiceException e) {
+                verificationLogger.error("The configured user appears to have the correct permissions, but the following error was encountered", e);
+                results.add(new ConfigVerificationResult.Builder()
+                        .verificationStepName("Test IAM Permissions")
+                        .outcome(ConfigVerificationResult.Outcome.FAILED)
+                        .explanation(String.format("The configured user appears to have the correct permissions, but the following error was encountered: " + e.getMessage()))
+                        .build());
+            }
+        }
+        return results;
+    }
+
+    @Override
     protected final Collection<ValidationResult> customValidate(ValidationContext validationContext) {
         final Collection<ValidationResult> results = super.customValidate(validationContext);
         ProxyConfiguration.validateProxySpec(validationContext, results, ProxyAwareTransportFactory.PROXY_SPECS);
@@ -82,6 +120,15 @@ public abstract class AbstractGCSProcessor extends AbstractGCPProcessor<Storage,
     protected void customValidate(ValidationContext validationContext, Collection<ValidationResult> results) {
     }
 
+    /**
+     * @return The list of GCP permissions required for the processor
+     */
+    protected abstract List<String> getRequiredPermissions();
+
+    protected String getBucketName(final ProcessContext context, final Map<String, String> attributes) {
+        return context.getProperty("gcs-bucket").evaluateAttributeExpressions(attributes).getValue();
+    }
+
     @Override
     protected StorageOptions getServiceOptions(ProcessContext context, GoogleCredentials credentials) {
         final String projectId = context.getProperty(PROJECT_ID).evaluateAttributeExpressions().getValue();
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/DeleteGCSObject.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/DeleteGCSObject.java
index 66d51db..c30128c 100644
--- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/DeleteGCSObject.java
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/DeleteGCSObject.java
@@ -32,6 +32,7 @@ import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.util.StandardValidators;
 
+import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 
@@ -87,6 +88,11 @@ public class DeleteGCSObject extends AbstractGCSProcessor {
     }
 
     @Override
+    protected List<String> getRequiredPermissions() {
+        return Collections.singletonList("storage.objects.delete");
+    }
+
+    @Override
     public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
         FlowFile flowFile = session.get();
         if (flowFile == null) {
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/FetchGCSObject.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/FetchGCSObject.java
index f04eb05..b471003 100644
--- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/FetchGCSObject.java
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/FetchGCSObject.java
@@ -23,17 +23,23 @@ import com.google.cloud.storage.BlobId;
 import com.google.cloud.storage.Storage;
 import com.google.cloud.storage.StorageException;
 import com.google.common.collect.ImmutableList;
+import org.apache.commons.io.IOUtils;
 import org.apache.commons.io.input.BoundedInputStream;
+import org.apache.commons.io.output.CountingOutputStream;
+import org.apache.commons.io.output.NullOutputStream;
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.WritesAttribute;
 import org.apache.nifi.annotation.behavior.WritesAttributes;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.SeeAlso;
 import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.ConfigVerificationResult;
+import org.apache.nifi.components.ConfigVerificationResult.Outcome;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.expression.ExpressionLanguageScope;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.processor.DataUnit;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
@@ -41,8 +47,10 @@ import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.util.StandardValidators;
 
 import java.io.IOException;
+import java.io.InputStream;
 import java.nio.channels.Channels;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
@@ -195,10 +203,45 @@ public class FetchGCSObject extends AbstractGCSProcessor {
             .build();
     }
 
+    @Override
+    protected List<String> getRequiredPermissions() {
+        return Collections.singletonList("storage.objects.get");
+    }
+
+    @Override
+    public List<ConfigVerificationResult> verify(final ProcessContext context, final ComponentLog verificationLogger, final Map<String, String> attributes) {
+        final List<ConfigVerificationResult> results = new ArrayList<>(super.verify(context, verificationLogger, attributes));
+
+        final String bucketName = context.getProperty(BUCKET).evaluateAttributeExpressions(attributes).getValue();
+        final String key = context.getProperty(KEY).evaluateAttributeExpressions(attributes).getValue();
+
+        final Storage storage = getCloudService(context);
 
+        try {
+            final FetchedBlob blob = fetchBlob(context, storage, attributes);
+
+            final CountingOutputStream out = new CountingOutputStream(NullOutputStream.NULL_OUTPUT_STREAM);
+            IOUtils.copy(blob.contents, out);
+            final long byteCount = out.getByteCount();
+            results.add(new ConfigVerificationResult.Builder()
+                    .verificationStepName("Fetch GCS Blob")
+                    .outcome(Outcome.SUCCESSFUL)
+                    .explanation(String.format("Successfully fetched [%s] from Bucket [%s], totaling %s bytes", key, bucketName, byteCount))
+                    .build());
+        } catch (final StorageException | IOException e) {
+            getLogger().error(String.format("Failed to fetch [%s] from Bucket [%s]", key, bucketName), e);
+            results.add(new ConfigVerificationResult.Builder()
+                    .verificationStepName("Fetch GCS Blob")
+                    .outcome(Outcome.FAILED)
+                    .explanation(String.format("Failed to fetch [%s] from Bucket [%s]: %s", key, bucketName, e.getMessage()))
+                    .build());
+        }
+
+        return results;
+    }
 
     @Override
-    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
+    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
         FlowFile flowFile = session.get();
         if (flowFile == null) {
             return;
@@ -208,51 +251,19 @@ public class FetchGCSObject extends AbstractGCSProcessor {
 
         final String bucketName = context.getProperty(BUCKET).evaluateAttributeExpressions(flowFile).getValue();
         final String key = context.getProperty(KEY).evaluateAttributeExpressions(flowFile).getValue();
-        final Long generation = context.getProperty(GENERATION).evaluateAttributeExpressions(flowFile).asLong();
-        final String encryptionKey = context.getProperty(ENCRYPTION_KEY).evaluateAttributeExpressions(flowFile).getValue();
 
         final Storage storage = getCloudService();
-        final BlobId blobId = BlobId.of(bucketName, key, generation);
 
         final long rangeStart = (context.getProperty(RANGE_START).isSet() ? context.getProperty(RANGE_START).evaluateAttributeExpressions(flowFile).asDataSize(DataUnit.B).longValue() : 0L);
         final Long rangeLength = (context.getProperty(RANGE_LENGTH).isSet() ? context.getProperty(RANGE_LENGTH).evaluateAttributeExpressions(flowFile).asDataSize(DataUnit.B).longValue() : null);
 
         try {
-            final List<Storage.BlobSourceOption> blobSourceOptions = new ArrayList<>(2);
-
-            if (encryptionKey != null) {
-                blobSourceOptions.add(Storage.BlobSourceOption.decryptionKey(encryptionKey));
-            }
-
-            if (generation != null) {
-                blobSourceOptions.add(Storage.BlobSourceOption.generationMatch());
-            }
-
-            final Blob blob = storage.get(blobId);
-            if (blob == null) {
-                throw new StorageException(404, "Blob " + blobId + " not found");
-            }
-
-            if (rangeStart > 0 && rangeStart >= blob.getSize()) {
-                if (getLogger().isDebugEnabled()) {
-                    getLogger().debug("Start position: {}, blob size: {}", new Object[] {rangeStart, blob.getSize()});
-                }
-                throw new StorageException(416, "The range specified is not valid for the blob " + blobId
-                        + ". Range Start is beyond the end of the blob.");
-            }
-
-            final ReadChannel reader = storage.reader(blobId, blobSourceOptions.toArray(new Storage.BlobSourceOption[0]));
-            reader.seek(rangeStart);
-
-            if (rangeLength == null) {
-                flowFile = session.importFrom(Channels.newInputStream(reader), flowFile);
-            } else {
-                flowFile = session.importFrom(new BoundedInputStream(Channels.newInputStream(reader), rangeLength), flowFile);
-            }
+            final FetchedBlob blob = fetchBlob(context, storage, flowFile.getAttributes());
+            flowFile = session.importFrom(blob.contents, flowFile);
 
-            final Map<String, String> attributes = StorageAttributes.createAttributes(blob);
+            final Map<String, String> attributes = StorageAttributes.createAttributes(blob.blob);
             flowFile = session.putAllAttributes(flowFile, attributes);
-        } catch (StorageException | IOException e) {
+        } catch (final StorageException | IOException e) {
             getLogger().error("Failed to fetch GCS Object due to {}", new Object[] {e}, e);
             flowFile = session.penalize(flowFile);
             session.transfer(flowFile, REL_FAILURE);
@@ -265,4 +276,64 @@ public class FetchGCSObject extends AbstractGCSProcessor {
         getLogger().info("Successfully retrieved GCS Object for {} in {} millis; routing to success", new Object[]{flowFile, millis});
         session.getProvenanceReporter().fetch(flowFile, "https://" + bucketName + ".storage.googleapis.com/" + key, millis);
     }
+
+    private FetchedBlob fetchBlob(final ProcessContext context, final Storage storage, final Map<String, String> attributes) throws IOException {
+        final String bucketName = context.getProperty(BUCKET).evaluateAttributeExpressions(attributes).getValue();
+        final String key = context.getProperty(KEY).evaluateAttributeExpressions(attributes).getValue();
+        final Long generation = context.getProperty(GENERATION).evaluateAttributeExpressions(attributes).asLong();
+        final long rangeStart = (context.getProperty(RANGE_START).isSet() ? context.getProperty(RANGE_START).evaluateAttributeExpressions(attributes).asDataSize(DataUnit.B).longValue() : 0L);
+        final Long rangeLength = (context.getProperty(RANGE_LENGTH).isSet() ? context.getProperty(RANGE_LENGTH).evaluateAttributeExpressions(attributes).asDataSize(DataUnit.B).longValue() : null);
+
+        final BlobId blobId = BlobId.of(bucketName, key, generation);
+        final List<Storage.BlobSourceOption> blobSourceOptions = getBlobSourceOptions(context, attributes);
+
+        if (blobId.getName() == null || blobId.getName().isEmpty()) {
+            throw new IllegalArgumentException("Name is required");
+        }
+        final Blob blob = storage.get(blobId);
+        if (blob == null) {
+            throw new StorageException(404, "Blob " + blobId + " not found");
+        }
+        if (rangeStart > 0 && rangeStart >= blob.getSize()) {
+            if (getLogger().isDebugEnabled()) {
+                getLogger().debug("Start position: {}, blob size: {}", new Object[] {rangeStart, blob.getSize()});
+            }
+            throw new StorageException(416, "The range specified is not valid for the blob " + blob.getBlobId()
+                    + ". Range Start is beyond the end of the blob.");
+        }
+
+        final ReadChannel reader = storage.reader(blob.getBlobId(), blobSourceOptions.toArray(new Storage.BlobSourceOption[0]));
+        reader.seek(rangeStart);
+
+        final InputStream rawInputStream = Channels.newInputStream(reader);
+        final InputStream blobContents = rangeLength == null ? rawInputStream : new BoundedInputStream(rawInputStream, rangeLength);
+
+        return new FetchedBlob(blobContents, blob);
+    }
+
+    private List<Storage.BlobSourceOption> getBlobSourceOptions(final ProcessContext context, final Map<String, String> attributes) {
+        final Long generation = context.getProperty(GENERATION).evaluateAttributeExpressions(attributes).asLong();
+        final String encryptionKey = context.getProperty(ENCRYPTION_KEY).evaluateAttributeExpressions(attributes).getValue();
+
+        final List<Storage.BlobSourceOption> blobSourceOptions = new ArrayList<>(2);
+
+        if (encryptionKey != null) {
+            blobSourceOptions.add(Storage.BlobSourceOption.decryptionKey(encryptionKey));
+        }
+
+        if (generation != null) {
+            blobSourceOptions.add(Storage.BlobSourceOption.generationMatch());
+        }
+        return blobSourceOptions;
+    }
+
+    private class FetchedBlob {
+        private final InputStream contents;
+        private final Blob blob;
+
+        private FetchedBlob(final InputStream contents, final Blob blob) {
+            this.contents = contents;
+            this.blob = blob;
+        }
+    }
 }
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/ListGCSBucket.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/ListGCSBucket.java
index ede0420..dd43e78 100644
--- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/ListGCSBucket.java
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/ListGCSBucket.java
@@ -33,6 +33,8 @@ import org.apache.nifi.annotation.behavior.WritesAttributes;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.SeeAlso;
 import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.ConfigVerificationResult;
+import org.apache.nifi.components.ConfigVerificationResult.Outcome;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.state.Scope;
 import org.apache.nifi.components.state.StateMap;
@@ -266,6 +268,41 @@ public class ListGCSBucket extends AbstractGCSProcessor {
     }
 
     @Override
+    protected List<String> getRequiredPermissions() {
+        return Collections.singletonList("storage.objects.list");
+    }
+
+    @Override
+    protected String getBucketName(final ProcessContext context, final Map<String, String> attributes) {
+        return context.getProperty(BUCKET).evaluateAttributeExpressions().getValue();
+    }
+
+    @Override
+    public List<ConfigVerificationResult> verify(final ProcessContext context, final ComponentLog verificationLogger, final Map<String, String> attributes) {
+        final List<ConfigVerificationResult> results = new ArrayList<>(super.verify(context, verificationLogger, attributes));
+        final String bucketName = getBucketName(context, attributes);
+
+        try {
+            final VerifyListingAction listingAction = new VerifyListingAction(context);
+            listBucket(context, listingAction);
+            final int blobCount = listingAction.getBlobWriter().getCount();
+            results.add(new ConfigVerificationResult.Builder()
+                    .verificationStepName("List GCS Bucket")
+                    .outcome(Outcome.SUCCESSFUL)
+                    .explanation(String.format("Successfully listed Bucket [%s], finding %s blobs matching the filter", bucketName, blobCount))
+                    .build());
+        } catch (final Exception e) {
+            verificationLogger.error("Failed to list GCS Bucket", e);
+            results.add(new ConfigVerificationResult.Builder()
+                    .verificationStepName("List GCS Bucket")
+                    .outcome(Outcome.FAILED)
+                    .explanation(String.format("Failed to list Bucket [%s]: %s", bucketName, e.getMessage()))
+                    .build());
+        }
+        return results;
+    }
+
+    @Override
     public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
         try {
             restoreState(session);
@@ -277,6 +314,22 @@ public class ListGCSBucket extends AbstractGCSProcessor {
 
         final long startNanos = System.nanoTime();
 
+        final ListingAction listingAction = new TriggerListingAction(context, session);
+        try {
+            listBucket(context, listingAction);
+        } catch (final Exception e) {
+            getLogger().error("Failed to list contents of GCS Bucket due to {}", new Object[] {e}, e);
+            listingAction.getBlobWriter().finishListingExceptionally(e);
+            session.rollback();
+            context.yield();
+            return;
+        }
+
+        final long listMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
+        getLogger().info("Successfully listed GCS bucket {} in {} millis", new Object[]{ context.getProperty(BUCKET).evaluateAttributeExpressions().getValue(), listMillis });
+    }
+
+    private void listBucket(final ProcessContext context, final ListingAction listingAction) throws IOException, SchemaNotFoundException {
         final String bucket = context.getProperty(BUCKET).evaluateAttributeExpressions().getValue();
         final String prefix = context.getProperty(PREFIX).evaluateAttributeExpressions().getValue();
         final boolean useGenerations = context.getProperty(USE_GENERATIONS).asBoolean();
@@ -289,87 +342,160 @@ public class ListGCSBucket extends AbstractGCSProcessor {
             listOptions.add(Storage.BlobListOption.versions(true));
         }
 
-        final Storage storage = getCloudService();
+        final Storage storage = listingAction.getCloudService();
 
         long maxTimestamp = 0L;
         final Set<String> keysMatchingTimestamp = new HashSet<>();
 
-        final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
+        final BlobWriter writer = listingAction.getBlobWriter();
 
-        final BlobWriter writer;
-        if (writerFactory == null) {
-            writer = new AttributeBlobWriter(session);
-        } else {
-            writer = new RecordBlobWriter(session, writerFactory, getLogger());
-        }
+        writer.beginListing();
 
-        try {
-            writer.beginListing();
-
-            Page<Blob> blobPage = storage.list(bucket, listOptions.toArray(new Storage.BlobListOption[0]));
-            int listCount = 0;
-
-            do {
-                for (final Blob blob : blobPage.getValues()) {
-                    long lastModified = blob.getUpdateTime();
-                    if (lastModified < currentTimestamp || lastModified == currentTimestamp && currentKeys.contains(blob.getName())) {
-                        continue;
-                    }
-
-                    writer.addToListing(blob);
-
-                    // Update state
-                    if (lastModified > maxTimestamp) {
-                        maxTimestamp = lastModified;
-                        keysMatchingTimestamp.clear();
-                    }
-                    if (lastModified == maxTimestamp) {
-                        keysMatchingTimestamp.add(blob.getName());
-                    }
-
-                    listCount++;
+        Page<Blob> blobPage = storage.list(bucket, listOptions.toArray(new Storage.BlobListOption[0]));
+        int listCount = 0;
+
+        do {
+            for (final Blob blob : blobPage.getValues()) {
+                long lastModified = blob.getUpdateTime();
+                if (listingAction.skipBlob(blob)) {
+                    continue;
                 }
 
-                if (writer.isCheckpoint()) {
-                    commit(session, listCount);
-                    listCount = 0;
+                writer.addToListing(blob);
+
+                // Update state
+                if (lastModified > maxTimestamp) {
+                    maxTimestamp = lastModified;
+                    keysMatchingTimestamp.clear();
                 }
+                if (lastModified == maxTimestamp) {
+                    keysMatchingTimestamp.add(blob.getName());
+                }
+
+                listCount++;
+            }
 
-                blobPage = blobPage.getNextPage();
-            } while (blobPage != null);
+            if (writer.isCheckpoint()) {
+                listingAction.commit(listCount);
+                listCount = 0;
+            }
 
-            writer.finishListing();
+            blobPage = blobPage.getNextPage();
+        } while (blobPage != null);
+
+        writer.finishListing();
+
+        listingAction.finishListing(listCount, maxTimestamp, keysMatchingTimestamp);
+    }
+
+    private void commit(final ProcessSession session, final int listCount) {
+        if (listCount > 0) {
+            getLogger().info("Successfully listed {} new files from GCS; routing to success", new Object[] {listCount});
+            session.commitAsync();
+        }
+    }
+
+    private interface ListingAction<T extends BlobWriter> {
+        boolean skipBlob(final Blob blob);
+
+        T getBlobWriter();
+
+        Storage getCloudService();
+
+        void finishListing(int listCount, long maxTimestamp, Set<String> keysMatchingTimestamp);
+
+        void commit(int listCount);
+    }
+
+    private class TriggerListingAction implements ListingAction<BlobWriter> {
+        final ProcessContext context;
+        final ProcessSession session;
+        final BlobWriter blobWriter;
+
+        private TriggerListingAction(final ProcessContext context, final ProcessSession session) {
+            this.context = context;
+            this.session = session;
+
+            final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
+            if (writerFactory == null) {
+                blobWriter = new AttributeBlobWriter(session);
+            } else {
+                blobWriter = new RecordBlobWriter(session, writerFactory, getLogger());
+            }
+        }
+
+        @Override
+        public boolean skipBlob(final Blob blob) {
+            final long lastModified = blob.getUpdateTime();
+            return lastModified < currentTimestamp || lastModified == currentTimestamp && currentKeys.contains(blob.getName());
+        }
+
+        @Override
+        public void commit(final int listCount) {
+            ListGCSBucket.this.commit(session, listCount);
+        }
+
+        @Override
+        public BlobWriter getBlobWriter() {
+            return blobWriter;
+        }
 
+        @Override
+        public Storage getCloudService() {
+            return ListGCSBucket.this.getCloudService();
+        }
+
+        @Override
+        public void finishListing(final int listCount, final long maxTimestamp, final Set<String> keysMatchingTimestamp) {
             if (maxTimestamp == 0) {
-                getLogger().debug("No new objects in GCS bucket {} to list. Yielding.", bucket);
+                getLogger().debug("No new objects in GCS bucket {} to list. Yielding.", context.getProperty(BUCKET).evaluateAttributeExpressions().getValue());
                 context.yield();
             } else {
-                commit(session, listCount);
+                commit(listCount);
 
                 currentTimestamp = maxTimestamp;
                 currentKeys.clear();
                 currentKeys.addAll(keysMatchingTimestamp);
                 persistState(session, currentTimestamp, currentKeys);
             }
-        } catch (final Exception e) {
-            getLogger().error("Failed to list contents of GCS Bucket due to {}", new Object[] {e}, e);
-            writer.finishListingExceptionally(e);
-            session.rollback();
-            context.yield();
-            return;
         }
-
-        final long listMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
-        getLogger().info("Successfully listed GCS bucket {} in {} millis", new Object[]{bucket, listMillis});
     }
 
-    private void commit(final ProcessSession session, final int listCount) {
-        if (listCount > 0) {
-            getLogger().info("Successfully listed {} new files from GCS; routing to success", new Object[] {listCount});
-            session.commitAsync();
+    private class VerifyListingAction implements ListingAction<CountingBlobWriter> {
+        final ProcessContext context;
+        final CountingBlobWriter blobWriter;
+
+        private VerifyListingAction(final ProcessContext context) {
+            this.context = context;
+            blobWriter = new CountingBlobWriter();
         }
-    }
 
+        @Override
+        public boolean skipBlob(final Blob blob) {
+            return false;
+        }
+
+        @Override
+        public void commit(final int listCount) {
+
+        }
+
+        @Override
+        public CountingBlobWriter getBlobWriter() {
+            return blobWriter;
+        }
+
+        @Override
+        public Storage getCloudService() {
+            // Use the verification context
+            return ListGCSBucket.this.getCloudService(context);
+        }
+
+        @Override
+        public void finishListing(final int listCount, final long maxTimestamp, final Set<String> keysMatchingTimestamp) {
+
+        }
+    }
 
     private interface BlobWriter {
         void beginListing() throws IOException, SchemaNotFoundException;
@@ -547,8 +673,6 @@ public class ListGCSBucket extends AbstractGCSProcessor {
         }
     }
 
-
-
     /**
      * Writes Blobs by creating a new FlowFile for each blob and writing information as FlowFile attributes
      */
@@ -586,4 +710,37 @@ public class ListGCSBucket extends AbstractGCSProcessor {
             return true;
         }
     }
+
+    /**
+     * Simply counts the blobs.
+     */
+    private static class CountingBlobWriter implements BlobWriter {
+        private int count = 0;
+
+        @Override
+        public void beginListing() {
+        }
+
+        @Override
+        public void addToListing(final Blob blob) {
+            count++;
+        }
+
+        @Override
+        public void finishListing() {
+        }
+
+        @Override
+        public void finishListingExceptionally(final Exception cause) {
+        }
+
+        @Override
+        public boolean isCheckpoint() {
+            return false;
+        }
+
+        public int getCount() {
+            return count;
+        }
+    }
 }
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/PutGCSObject.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/PutGCSObject.java
index 5be6e39..6c2f431 100644
--- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/PutGCSObject.java
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/PutGCSObject.java
@@ -47,6 +47,7 @@ import java.io.BufferedInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -314,6 +315,11 @@ public class PutGCSObject extends AbstractGCSProcessor {
     }
 
     @Override
+    protected List<String> getRequiredPermissions() {
+        return Collections.singletonList("storage.objects.create");
+    }
+
+    @Override
     public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
         FlowFile flowFile = session.get();
         if (flowFile == null) {
@@ -400,7 +406,6 @@ public class PutGCSObject extends AbstractGCSProcessor {
                         }
 
                         try {
-
                             final Blob blob = storage.create(blobInfoBuilder.build(),
                                     in,
                                     blobWriteOptions.toArray(new Storage.BlobWriteOption[blobWriteOptions.size()])
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatchTest.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatchTest.java
index 5b51e0a..8561e62 100644
--- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatchTest.java
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatchTest.java
@@ -28,12 +28,20 @@ import com.google.cloud.bigquery.JobStatus;
 import com.google.cloud.bigquery.Table;
 import com.google.cloud.bigquery.TableDataWriteChannel;
 import com.google.cloud.bigquery.WriteChannelConfiguration;
+import org.apache.nifi.components.ConfigVerificationResult;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.VerifiableProcessor;
 import org.apache.nifi.util.TestRunner;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.ArgumentMatchers;
 import org.mockito.Mock;
 
+import java.util.Collections;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.reset;
 import static org.mockito.Mockito.when;
 
@@ -88,6 +96,11 @@ public class PutBigQueryBatchTest extends AbstractBQTest {
             protected BigQuery getCloudService() {
                 return bq;
             }
+
+            @Override
+            protected BigQuery getCloudService(final ProcessContext context) {
+                return bq;
+            }
         };
     }
 
@@ -120,7 +133,8 @@ public class PutBigQueryBatchTest extends AbstractBQTest {
         when(job.getJobId()).thenReturn(jobId);
         when(jobId.getJob()).thenReturn("job-id");
 
-        final TestRunner runner = buildNewRunner(getProcessor());
+        final AbstractBigQueryProcessor processor = getProcessor();
+        final TestRunner runner = buildNewRunner(processor);
         addRequiredPropertiesToRunner(runner);
         runner.assertValid();
 
@@ -128,6 +142,11 @@ public class PutBigQueryBatchTest extends AbstractBQTest {
 
         runner.run();
 
+        when(bq.testIamPermissions(any(), any())).thenReturn(Collections.singletonList("permission"));
+        final List<ConfigVerificationResult> verificationResults = ((VerifiableProcessor) processor).verify(runner.getProcessContext(), runner.getLogger(), Collections.emptyMap());
+        assertEquals(2, verificationResults.size());
+        assertEquals(ConfigVerificationResult.Outcome.SUCCESSFUL, verificationResults.get(1).getOutcome());
+
         runner.assertAllFlowFilesTransferred(PutBigQueryBatch.REL_SUCCESS);
     }
 
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/storage/AbstractGCSTest.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/storage/AbstractGCSTest.java
index 8b94c47..4318f61 100644
--- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/storage/AbstractGCSTest.java
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/storage/AbstractGCSTest.java
@@ -21,9 +21,9 @@ import com.google.auth.oauth2.GoogleCredentials;
 import com.google.cloud.storage.Storage;
 import com.google.cloud.storage.StorageOptions;
 import com.google.cloud.storage.testing.RemoteStorageHelper;
+import org.apache.nifi.gcp.credentials.service.GCPCredentialsService;
 import org.apache.nifi.processor.Processor;
 import org.apache.nifi.processors.gcp.credentials.service.GCPCredentialsControllerService;
-import org.apache.nifi.gcp.credentials.service.GCPCredentialsService;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
 import org.junit.Before;
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/storage/FetchGCSObjectIT.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/storage/FetchGCSObjectIT.java
index a8b8306..489a942 100644
--- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/storage/FetchGCSObjectIT.java
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/storage/FetchGCSObjectIT.java
@@ -17,13 +17,16 @@
 package org.apache.nifi.processors.gcp.storage;
 
 import com.google.common.collect.ImmutableMap;
+import org.apache.nifi.components.ConfigVerificationResult;
 import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.TestRunner;
 import org.junit.Test;
 
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
 /**
@@ -66,7 +69,8 @@ public class FetchGCSObjectIT extends AbstractGCSIT {
         putTestFileEncrypted(KEY, CONTENT);
         assertTrue(fileExists(KEY));
 
-        final TestRunner runner = buildNewRunner(new FetchGCSObject());
+        final FetchGCSObject processor = new FetchGCSObject();
+        final TestRunner runner = buildNewRunner(processor);
         runner.setProperty(FetchGCSObject.BUCKET, BUCKET);
         runner.setProperty(FetchGCSObject.ENCRYPTION_KEY, ENCRYPTION_KEY);
 
@@ -77,6 +81,13 @@ public class FetchGCSObjectIT extends AbstractGCSIT {
         runner.assertValid();
         runner.run();
 
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put("filename", KEY);
+        final List<ConfigVerificationResult> results = processor.verify(runner.getProcessContext(), runner.getLogger(), attributes);
+        assertEquals(2, results.size());
+        assertEquals(ConfigVerificationResult.Outcome.SUCCESSFUL, results.get(1).getOutcome());
+        assertTrue(results.get(1).getExplanation().matches("Successfully fetched \\[delete-me\\] from Bucket \\[gcloud-test-bucket-temp-.*\\], totaling 3 bytes"));
+
         runner.assertAllFlowFilesTransferred(FetchGCSObject.REL_SUCCESS, 1);
         final List<MockFlowFile> ffs = runner.getFlowFilesForRelationship(FetchGCSObject.REL_SUCCESS);
         MockFlowFile ff = ffs.get(0);
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/storage/FetchGCSObjectTest.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/storage/FetchGCSObjectTest.java
index 5e51270..3dc3a70 100644
--- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/storage/FetchGCSObjectTest.java
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/storage/FetchGCSObjectTest.java
@@ -27,6 +27,7 @@ import com.google.cloud.storage.StorageException;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.TestRunner;
 import org.junit.Before;
@@ -116,6 +117,11 @@ public class FetchGCSObjectTest extends AbstractGCSTest {
             protected Storage getCloudService() {
                 return storage;
             }
+
+            @Override
+            protected Storage getCloudService(final ProcessContext context) {
+                return storage;
+            }
         };
     }
 
@@ -214,6 +220,8 @@ public class FetchGCSObjectTest extends AbstractGCSTest {
         when(blob.getContentDisposition()).thenReturn(CONTENT_DISPOSITION);
         when(blob.getCreateTime()).thenReturn(CREATE_TIME);
         when(blob.getUpdateTime()).thenReturn(UPDATE_TIME);
+        final BlobId blobId = mock(BlobId.class);
+        when(blob.getBlobId()).thenReturn(blobId);
 
         when(storage.get(any(BlobId.class))).thenReturn(blob);
         when(storage.reader(any(BlobId.class), any(Storage.BlobSourceOption.class))).thenReturn(new MockReadChannel(CONTENT));
@@ -345,6 +353,8 @@ public class FetchGCSObjectTest extends AbstractGCSTest {
         final Acl.User mockUser = mock(Acl.User.class);
         when(mockUser.getEmail()).thenReturn(OWNER_USER_EMAIL);
         when(blob.getOwner()).thenReturn(mockUser);
+        final BlobId blobId = mock(BlobId.class);
+        when(blob.getBlobId()).thenReturn(blobId);
 
         when(storage.get(any(BlobId.class))).thenReturn(blob);
         when(storage.reader(any(BlobId.class), any(Storage.BlobSourceOption.class))).thenReturn(new MockReadChannel(CONTENT));
@@ -383,6 +393,8 @@ public class FetchGCSObjectTest extends AbstractGCSTest {
         final Acl.Group mockGroup = mock(Acl.Group.class);
         when(mockGroup.getEmail()).thenReturn(OWNER_GROUP_EMAIL);
         when(blob.getOwner()).thenReturn(mockGroup);
+        final BlobId blobId = mock(BlobId.class);
+        when(blob.getBlobId()).thenReturn(blobId);
 
         when(storage.get(any(BlobId.class))).thenReturn(blob);
         when(storage.reader(any(BlobId.class), any(Storage.BlobSourceOption.class))).thenReturn(new MockReadChannel(CONTENT));
@@ -423,6 +435,8 @@ public class FetchGCSObjectTest extends AbstractGCSTest {
         final Acl.Domain mockDomain = mock(Acl.Domain.class);
         when(mockDomain.getDomain()).thenReturn(OWNER_DOMAIN);
         when(blob.getOwner()).thenReturn(mockDomain);
+        final BlobId blobId = mock(BlobId.class);
+        when(blob.getBlobId()).thenReturn(blobId);
 
         when(storage.get(any(BlobId.class))).thenReturn(blob);
         when(storage.reader(any(BlobId.class), any(Storage.BlobSourceOption.class))).thenReturn(new MockReadChannel(CONTENT));
@@ -460,8 +474,10 @@ public class FetchGCSObjectTest extends AbstractGCSTest {
 
         final Blob blob = mock(Blob.class);
         final Acl.Project mockProject = mock(Acl.Project.class);
+        final BlobId blobId = mock(BlobId.class);
         when(mockProject.getProjectId()).thenReturn(OWNER_PROJECT_ID);
         when(blob.getOwner()).thenReturn(mockProject);
+        when(blob.getBlobId()).thenReturn(blobId);
 
         when(storage.get(any(BlobId.class))).thenReturn(blob);
         when(storage.reader(any(BlobId.class), any(Storage.BlobSourceOption.class))).thenReturn(new MockReadChannel(CONTENT));
@@ -501,6 +517,8 @@ public class FetchGCSObjectTest extends AbstractGCSTest {
         runner.assertValid();
 
         final Blob blob = mock(Blob.class);
+        final BlobId blobId = mock(BlobId.class);
+        when(blob.getBlobId()).thenReturn(blobId);
         when(storage.get(any(BlobId.class))).thenReturn(blob);
         when(storage.reader(any(BlobId.class), any(Storage.BlobSourceOption.class))).thenReturn(new MockReadChannel(CONTENT));
 
@@ -516,21 +534,21 @@ public class FetchGCSObjectTest extends AbstractGCSTest {
         verify(storage).get(blobIdArgumentCaptor.capture());
         verify(storage).reader(any(BlobId.class), blobSourceOptionArgumentCaptor.capture());
 
-        final BlobId blobId = blobIdArgumentCaptor.getValue();
+        final BlobId capturedBlobId = blobIdArgumentCaptor.getValue();
 
         assertEquals(
                 BUCKET,
-                blobId.getBucket()
+                capturedBlobId.getBucket()
         );
 
         assertEquals(
                 KEY,
-                blobId.getName()
+                capturedBlobId.getName()
         );
 
         assertEquals(
                 GENERATION,
-                blobId.getGeneration()
+                capturedBlobId.getGeneration()
         );
 
 
@@ -554,6 +572,8 @@ public class FetchGCSObjectTest extends AbstractGCSTest {
         runner.assertValid();
 
         final Blob blob = mock(Blob.class);
+        final BlobId blobId = mock(BlobId.class);
+        when(blob.getBlobId()).thenReturn(blobId);
         when(storage.get(any(BlobId.class))).thenReturn(blob);
         when(storage.reader(any(BlobId.class), any(Storage.BlobSourceOption.class))).thenReturn(new MockReadChannel(CONTENT));
 
@@ -566,19 +586,19 @@ public class FetchGCSObjectTest extends AbstractGCSTest {
         verify(storage).get(blobIdArgumentCaptor.capture());
         verify(storage).reader(any(BlobId.class), blobSourceOptionArgumentCaptor.capture());
 
-        final BlobId blobId = blobIdArgumentCaptor.getValue();
+        final BlobId capturedBlobId = blobIdArgumentCaptor.getValue();
 
         assertEquals(
                 BUCKET,
-                blobId.getBucket()
+                capturedBlobId.getBucket()
         );
 
         assertEquals(
                 KEY,
-                blobId.getName()
+                capturedBlobId.getName()
         );
 
-        assertNull(blobId.getGeneration());
+        assertNull(capturedBlobId.getGeneration());
 
         final Set<Storage.BlobSourceOption> blobSourceOptions = ImmutableSet.copyOf(blobSourceOptionArgumentCaptor.getAllValues());
 
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/storage/ListGCSBucketTest.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/storage/ListGCSBucketTest.java
index 7f22500..4139fee 100644
--- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/storage/ListGCSBucketTest.java
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/storage/ListGCSBucketTest.java
@@ -24,9 +24,11 @@ import com.google.cloud.storage.Storage;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
+import org.apache.nifi.components.ConfigVerificationResult;
 import org.apache.nifi.components.state.Scope;
 import org.apache.nifi.components.state.StateMap;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.util.LogMessage;
 import org.apache.nifi.util.MockFlowFile;
@@ -36,6 +38,7 @@ import org.mockito.ArgumentCaptor;
 import org.mockito.Captor;
 import org.mockito.Mock;
 
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -115,6 +118,11 @@ public class ListGCSBucketTest extends AbstractGCSTest {
             protected Storage getCloudService() {
                 return storage;
             }
+
+            @Override
+            protected Storage getCloudService(final ProcessContext context) {
+                return storage;
+            }
         };
     }
 
@@ -241,6 +249,22 @@ public class ListGCSBucketTest extends AbstractGCSTest {
         return blob;
     }
 
+    private void verifyConfigVerification(final TestRunner runner, final ListGCSBucket processor, final int expectedCount) {
+        final List<ConfigVerificationResult> verificationResults = processor.verify(runner.getProcessContext(), runner.getLogger(), Collections.emptyMap());
+        assertEquals(3, verificationResults.size());
+        final ConfigVerificationResult cloudServiceResult = verificationResults.get(0);
+        assertEquals(ConfigVerificationResult.Outcome.SUCCESSFUL, cloudServiceResult.getOutcome());
+
+        final ConfigVerificationResult iamPermissionsResult = verificationResults.get(1);
+        assertEquals(ConfigVerificationResult.Outcome.SUCCESSFUL, iamPermissionsResult.getOutcome());
+
+        final ConfigVerificationResult listingResult = verificationResults.get(2);
+        assertEquals(ConfigVerificationResult.Outcome.SUCCESSFUL, listingResult.getOutcome());
+
+        assertTrue(String.format("Expected %s blobs to be counted, but explanation was: %s", expectedCount, listingResult.getExplanation()),
+                listingResult.getExplanation().matches(String.format(".*finding %s blobs.*", expectedCount)));
+    }
+
     @Test
     public void testSuccessfulList() throws Exception {
         reset(storage, mockBlobPage);
@@ -261,8 +285,11 @@ public class ListGCSBucketTest extends AbstractGCSTest {
         runner.enqueue("test");
         runner.run();
 
+        when(storage.testIamPermissions(anyString(), any())).thenReturn(Collections.singletonList(true));
+
         runner.assertAllFlowFilesTransferred(ListGCSBucket.REL_SUCCESS);
         runner.assertTransferCount(ListGCSBucket.REL_SUCCESS, 2);
+        verifyConfigVerification(runner, processor, 2);
 
         final List<MockFlowFile> successes = runner.getFlowFilesForRelationship(ListGCSBucket.REL_SUCCESS);
 
@@ -301,8 +328,11 @@ public class ListGCSBucketTest extends AbstractGCSTest {
         runner.enqueue("test2");
         runner.run(2);
 
+        when(storage.testIamPermissions(anyString(), any())).thenReturn(Collections.singletonList(true));
+
         runner.assertAllFlowFilesTransferred(ListGCSBucket.REL_SUCCESS);
         runner.assertTransferCount(ListGCSBucket.REL_SUCCESS, 1);
+        verifyConfigVerification(runner, processor, 1);
 
         assertEquals("blob-key-1", runner.getStateManager().getState(Scope.CLUSTER).get(ListGCSBucket.CURRENT_KEY_PREFIX+"0"));
         assertEquals("2", runner.getStateManager().getState(Scope.CLUSTER).get(ListGCSBucket.CURRENT_TIMESTAMP));
@@ -327,7 +357,10 @@ public class ListGCSBucketTest extends AbstractGCSTest {
         runner.enqueue("test");
         runner.run();
 
+        when(storage.testIamPermissions(anyString(), any())).thenReturn(Collections.singletonList(true));
+
         runner.assertTransferCount(ListGCSBucket.REL_SUCCESS, 0);
+        verifyConfigVerification(runner, processor, 0);
 
         assertEquals("No state should be persisted on an empty return", -1L, runner.getStateManager().getState(Scope.CLUSTER).getVersion());
     }
@@ -353,12 +386,17 @@ public class ListGCSBucketTest extends AbstractGCSTest {
         when(mockBlobPage.getNextPage()).thenReturn(null);
         when(storage.list(anyString(), any(Storage.BlobListOption[].class))).thenReturn(mockBlobPage);
 
+        when(storage.testIamPermissions(anyString(), any())).thenReturn(Collections.singletonList(true));
+
         runner.enqueue("test");
         runner.run();
 
         runner.assertAllFlowFilesTransferred(ListGCSBucket.REL_SUCCESS);
         runner.assertTransferCount(ListGCSBucket.REL_SUCCESS, 1);
 
+        // Both blobs are counted, because verification does not account for entity tracking
+        verifyConfigVerification(runner, processor, 2);
+
         final List<MockFlowFile> successes = runner.getFlowFilesForRelationship(ListGCSBucket.REL_SUCCESS);
 
         MockFlowFile flowFile = successes.get(0);
@@ -400,9 +438,14 @@ public class ListGCSBucketTest extends AbstractGCSTest {
         runner.enqueue("test");
         runner.run();
 
+        when(storage.testIamPermissions(anyString(), any())).thenReturn(Collections.singletonList(true));
+
         runner.assertAllFlowFilesTransferred(ListGCSBucket.REL_SUCCESS);
         runner.assertTransferCount(ListGCSBucket.REL_SUCCESS, 1);
 
+        // Both blobs are counted, because verification does not account for entity tracking
+        verifyConfigVerification(runner, processor, 2);
+
         final List<MockFlowFile> successes = runner.getFlowFilesForRelationship(ListGCSBucket.REL_SUCCESS);
 
         MockFlowFile flowFile = successes.get(0);
@@ -443,12 +486,17 @@ public class ListGCSBucketTest extends AbstractGCSTest {
         when(storage.list(anyString(), any(Storage.BlobListOption[].class)))
                 .thenReturn(mockBlobPage);
 
+        when(storage.testIamPermissions(anyString(), any())).thenReturn(Collections.singletonList(true));
+
         runner.enqueue("test");
         runner.run();
 
         runner.assertAllFlowFilesTransferred(ListGCSBucket.REL_SUCCESS);
         runner.assertTransferCount(ListGCSBucket.REL_SUCCESS, 2);
 
+        // All blobs are counted, because verification does not account for entity tracking
+        verifyConfigVerification(runner, processor, 3);
+
         final List<MockFlowFile> successes = runner.getFlowFilesForRelationship(ListGCSBucket.REL_SUCCESS);
 
         MockFlowFile flowFile = successes.get(0);
@@ -493,12 +541,17 @@ public class ListGCSBucketTest extends AbstractGCSTest {
         when(storage.list(anyString(), any(Storage.BlobListOption[].class)))
                 .thenReturn(mockBlobPage);
 
+        when(storage.testIamPermissions(anyString(), any())).thenReturn(Collections.singletonList(true));
+
         runner.enqueue("test");
         runner.run();
 
         runner.assertAllFlowFilesTransferred(ListGCSBucket.REL_SUCCESS);
         runner.assertTransferCount(ListGCSBucket.REL_SUCCESS, 2);
 
+        // All blobs are counted, because verification does not account for entity tracking
+        verifyConfigVerification(runner, processor, 3);
+
         final List<MockFlowFile> successes = runner.getFlowFilesForRelationship(ListGCSBucket.REL_SUCCESS);
 
         MockFlowFile flowFile = successes.get(0);
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-services-api/pom.xml b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-services-api/pom.xml
index 1f99faf..28cb17d 100644
--- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-services-api/pom.xml
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-services-api/pom.xml
@@ -39,9 +39,18 @@
                     <groupId>com.google.code.findbugs</groupId>
                     <artifactId>jsr305</artifactId>
                 </exclusion>
+                <exclusion>
+                    <groupId>com.google.guava</groupId>
+                    <artifactId>guava</artifactId>
+                </exclusion>
             </exclusions>
         </dependency>
         <dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+            <version>31.0.1-jre</version>
+        </dependency>
+        <dependency>
             <groupId>com.github.stephenc.findbugs</groupId>
             <artifactId>findbugs-annotations</artifactId>
             <version>1.3.9-1</version>