You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by pv...@apache.org on 2021/12/11 18:58:17 UTC
[nifi] branch main updated: NIFI-9343: Adding config verification to GCP processors
This is an automated email from the ASF dual-hosted git repository.
pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new f3c7537 NIFI-9343: Adding config verification to GCP processors
f3c7537 is described below
commit f3c7537d9b8f84a4769ce8eba74a551dce3836a0
Author: Joe Gresock <jg...@gmail.com>
AuthorDate: Thu Oct 28 10:59:39 2021 -0400
NIFI-9343: Adding config verification to GCP processors
Signed-off-by: Pierre Villard <pi...@gmail.com>
This closes #5494.
---
.../nifi/processors/gcp/AbstractGCPProcessor.java | 66 ++++-
.../gcp/bigquery/AbstractBigQueryProcessor.java | 83 +++++--
.../processors/gcp/bigquery/PutBigQueryBatch.java | 48 ++--
.../gcp/bigquery/PutBigQueryStreaming.java | 47 ++--
.../service/GCPCredentialsControllerService.java | 39 ++-
.../gcp/pubsub/AbstractGCPubSubProcessor.java | 5 +-
.../processors/gcp/pubsub/ConsumeGCPubSub.java | 82 ++++++-
.../processors/gcp/pubsub/PublishGCPubSub.java | 77 +++++-
.../gcp/storage/AbstractGCSProcessor.java | 73 +++++-
.../processors/gcp/storage/DeleteGCSObject.java | 6 +
.../processors/gcp/storage/FetchGCSObject.java | 145 ++++++++---
.../nifi/processors/gcp/storage/ListGCSBucket.java | 271 ++++++++++++++++-----
.../nifi/processors/gcp/storage/PutGCSObject.java | 7 +-
.../gcp/bigquery/PutBigQueryBatchTest.java | 21 +-
.../processors/gcp/storage/AbstractGCSTest.java | 2 +-
.../processors/gcp/storage/FetchGCSObjectIT.java | 13 +-
.../processors/gcp/storage/FetchGCSObjectTest.java | 36 ++-
.../processors/gcp/storage/ListGCSBucketTest.java | 53 ++++
.../nifi-gcp-bundle/nifi-gcp-services-api/pom.xml | 9 +
19 files changed, 867 insertions(+), 216 deletions(-)
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/AbstractGCPProcessor.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/AbstractGCPProcessor.java
index 626b5d2..9f98438 100644
--- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/AbstractGCPProcessor.java
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/AbstractGCPProcessor.java
@@ -22,18 +22,22 @@ import com.google.cloud.ServiceOptions;
import com.google.cloud.TransportOptions;
import com.google.cloud.http.HttpTransportOptions;
import com.google.common.collect.ImmutableList;
-
import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.ConfigVerificationResult;
+import org.apache.nifi.components.ConfigVerificationResult.Outcome;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.gcp.credentials.service.GCPCredentialsService;
+import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.proxy.ProxyConfiguration;
import java.net.Proxy;
+import java.util.Collections;
import java.util.List;
+import java.util.Map;
/**
* Abstract base class for gcp processors.
@@ -122,17 +126,46 @@ public abstract class AbstractGCPProcessor<
@Override
public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return ImmutableList.of(
- PROJECT_ID,
- GCP_CREDENTIALS_PROVIDER_SERVICE,
- RETRY_COUNT,
- PROXY_HOST,
- PROXY_PORT,
- HTTP_PROXY_USERNAME,
- HTTP_PROXY_PASSWORD,
- ProxyConfiguration.createProxyConfigPropertyDescriptor(true, ProxyAwareTransportFactory.PROXY_SPECS)
+ PROJECT_ID,
+ GCP_CREDENTIALS_PROVIDER_SERVICE,
+ RETRY_COUNT,
+ PROXY_HOST,
+ PROXY_PORT,
+ HTTP_PROXY_USERNAME,
+ HTTP_PROXY_PASSWORD,
+ ProxyConfiguration.createProxyConfigPropertyDescriptor(true, ProxyAwareTransportFactory.PROXY_SPECS)
);
}
+ /**
+ * Verifies the cloud service configuration. This is in a separate method rather than implementing VerifiableProcessor due to type erasure.
+ * @param context The process context
+ * @param verificationLogger Logger for verification
+ * @param attributes Additional attributes
+ * @return The verification results
+ */
+ protected List<ConfigVerificationResult> verifyCloudService(final ProcessContext context, final ComponentLog verificationLogger, final Map<String, String> attributes) {
+
+ ConfigVerificationResult result = null;
+ try {
+ final CloudService cloudService = getCloudService(context);
+ if (cloudService != null) {
+ result = new ConfigVerificationResult.Builder()
+ .verificationStepName("Configure Cloud Service")
+ .outcome(Outcome.SUCCESSFUL)
+ .explanation(String.format("Successfully configured Cloud Service [%s]", cloudService.getClass().getSimpleName()))
+ .build();
+ }
+ } catch (final Exception e) {
+ verificationLogger.error("Failed to configure Cloud Service", e);
+ result = new ConfigVerificationResult.Builder()
+ .verificationStepName("Configure Cloud Service")
+ .outcome(Outcome.FAILED)
+ .explanation(String.format("Failed to configure Cloud Service [%s]: %s", cloudService.getClass().getSimpleName(), e.getMessage()))
+ .build();
+ }
+ return result == null ? Collections.emptyList() : Collections.singletonList(result);
+ }
/**
* Retrieve credentials from the {@link GCPCredentialsService} attached to this processor.
@@ -147,13 +180,22 @@ public abstract class AbstractGCPProcessor<
}
/**
+ * Returns the cloud client service constructed based on the context.
+ * @param context the process context
+ * @return The constructed cloud client service
+ */
+ protected CloudService getCloudService(final ProcessContext context) {
+ final CloudServiceOptions options = getServiceOptions(context, getGoogleCredentials(context));
+ return options != null ? options.getService() : null;
+ }
+
+ /**
* Assigns the cloud service client on scheduling.
* @param context the process context provided on scheduling the processor.
*/
@OnScheduled
- public void onScheduled(ProcessContext context) {
- final CloudServiceOptions options = getServiceOptions(context, getGoogleCredentials(context));
- this.cloudService = options != null ? options.getService() : null;
+ public void onScheduled(final ProcessContext context) {
+ this.cloudService = getCloudService(context);
}
/**
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/AbstractBigQueryProcessor.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/AbstractBigQueryProcessor.java
index b2dc43a..78c845d 100644
--- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/AbstractBigQueryProcessor.java
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/AbstractBigQueryProcessor.java
@@ -17,39 +17,46 @@
package org.apache.nifi.processors.gcp.bigquery;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
+import com.google.api.gax.retrying.RetrySettings;
+import com.google.auth.oauth2.GoogleCredentials;
+import com.google.cloud.BaseServiceException;
+import com.google.cloud.bigquery.BigQuery;
+import com.google.cloud.bigquery.BigQueryOptions;
+import com.google.cloud.bigquery.TableId;
+import com.google.common.collect.ImmutableList;
+import org.apache.nifi.components.ConfigVerificationResult;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.VerifiableProcessor;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.gcp.AbstractGCPProcessor;
import org.apache.nifi.processors.gcp.ProxyAwareTransportFactory;
import org.apache.nifi.proxy.ProxyConfiguration;
import org.apache.nifi.util.StringUtils;
-import com.google.api.gax.retrying.RetrySettings;
-import com.google.auth.oauth2.GoogleCredentials;
-import com.google.cloud.bigquery.BigQuery;
-import com.google.cloud.bigquery.BigQueryOptions;
-import com.google.common.collect.ImmutableList;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
/**
* Base class for creating processors that connect to GCP BiqQuery service
*/
-public abstract class AbstractBigQueryProcessor extends AbstractGCPProcessor<BigQuery, BigQueryOptions> {
+public abstract class AbstractBigQueryProcessor extends AbstractGCPProcessor<BigQuery, BigQueryOptions> implements VerifiableProcessor {
static final int BUFFER_SIZE = 65536;
+ private static final List<String> REQUIRED_PERMISSIONS = Collections.singletonList("bigquery.tables.updateData");
+
public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success")
.description("FlowFiles are routed to this relationship after a successful Google BigQuery operation.")
.build();
@@ -123,6 +130,40 @@ public abstract class AbstractBigQueryProcessor extends AbstractGCPProcessor<Big
}
@Override
+ public List<ConfigVerificationResult> verify(final ProcessContext context, final ComponentLog verificationLogger, final Map<String, String> attributes) {
+ final List<ConfigVerificationResult> results = new ArrayList<>(verifyCloudService(context, verificationLogger, attributes));
+
+ final BigQuery bigQuery = getCloudService(context);
+ if (bigQuery != null) {
+ try {
+ final TableId tableId = getTableId(context, attributes);
+ if (bigQuery.testIamPermissions(tableId, REQUIRED_PERMISSIONS).size() >= REQUIRED_PERMISSIONS.size()) {
+ results.add(new ConfigVerificationResult.Builder()
+ .verificationStepName("Test IAM Permissions")
+ .outcome(ConfigVerificationResult.Outcome.SUCCESSFUL)
+ .explanation(String.format("Verified BigQuery Table [%s] exists and the configured user has the correct permissions.", tableId))
+ .build());
+ } else {
+ results.add(new ConfigVerificationResult.Builder()
+ .verificationStepName("Test IAM Permissions")
+ .outcome(ConfigVerificationResult.Outcome.FAILED)
+ .explanation(String.format("The configured user does not have the correct permissions on BigQuery Table [%s].", tableId))
+ .build());
+ }
+ } catch (final BaseServiceException e) {
+ verificationLogger.error("The configured user appears to have the correct permissions, but the following error was encountered", e);
+ results.add(new ConfigVerificationResult.Builder()
+ .verificationStepName("Test IAM Permissions")
+ .outcome(ConfigVerificationResult.Outcome.FAILED)
+ .explanation(String.format("The configured user appears to have the correct permissions, but the following error was encountered: " + e.getMessage()))
+ .build());
+ }
+ }
+
+ return results;
+ }
+
+ @Override
protected final Collection<ValidationResult> customValidate(ValidationContext validationContext) {
final Collection<ValidationResult> results = new ArrayList<ValidationResult>(super.customValidate(validationContext));
ProxyConfiguration.validateProxySpec(validationContext, results, ProxyAwareTransportFactory.PROXY_SPECS);
@@ -147,4 +188,18 @@ public abstract class AbstractBigQueryProcessor extends AbstractGCPProcessor<Big
protected void customValidate(ValidationContext validationContext, Collection<ValidationResult> results) {
}
+ protected TableId getTableId(final ProcessContext context, final Map<String, String> attributes) {
+ final String projectId = context.getProperty(PROJECT_ID).evaluateAttributeExpressions().getValue();
+ final String dataset = context.getProperty(DATASET).evaluateAttributeExpressions(attributes).getValue();
+ final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(attributes).getValue();
+
+ final TableId tableId;
+ if (StringUtils.isEmpty(projectId)) {
+ tableId = TableId.of(dataset, tableName);
+ } else {
+ tableId = TableId.of(projectId, dataset, tableName);
+ }
+ return tableId;
+ }
+
}
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatch.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatch.java
index cf4b42a..8c421e8 100644
--- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatch.java
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatch.java
@@ -17,15 +17,16 @@
package org.apache.nifi.processors.gcp.bigquery;
-import java.nio.ByteBuffer;
-import java.nio.channels.Channels;
-import java.nio.channels.ReadableByteChannel;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
+import com.google.cloud.RetryOption;
+import com.google.cloud.bigquery.FormatOptions;
+import com.google.cloud.bigquery.Job;
+import com.google.cloud.bigquery.JobInfo;
+import com.google.cloud.bigquery.JobStatistics.LoadStatistics;
+import com.google.cloud.bigquery.Schema;
+import com.google.cloud.bigquery.TableDataWriteChannel;
+import com.google.cloud.bigquery.TableId;
+import com.google.cloud.bigquery.WriteChannelConfiguration;
+import com.google.common.collect.ImmutableList;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
@@ -50,16 +51,14 @@ import org.apache.nifi.util.StringUtils;
import org.threeten.bp.Duration;
import org.threeten.bp.temporal.ChronoUnit;
-import com.google.cloud.RetryOption;
-import com.google.cloud.bigquery.FormatOptions;
-import com.google.cloud.bigquery.Job;
-import com.google.cloud.bigquery.JobInfo;
-import com.google.cloud.bigquery.JobStatistics.LoadStatistics;
-import com.google.cloud.bigquery.Schema;
-import com.google.cloud.bigquery.TableDataWriteChannel;
-import com.google.cloud.bigquery.TableId;
-import com.google.cloud.bigquery.WriteChannelConfiguration;
-import com.google.common.collect.ImmutableList;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.ReadableByteChannel;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
/**
* A processor for batch loading data into a Google BigQuery table
@@ -257,17 +256,8 @@ public class PutBigQueryBatch extends AbstractBigQueryProcessor {
return;
}
- final String projectId = context.getProperty(PROJECT_ID).evaluateAttributeExpressions().getValue();
- final String dataset = context.getProperty(DATASET).evaluateAttributeExpressions(flowFile).getValue();
- final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
final String type = context.getProperty(SOURCE_TYPE).evaluateAttributeExpressions(flowFile).getValue();
-
- final TableId tableId;
- if (StringUtils.isEmpty(projectId)) {
- tableId = TableId.of(dataset, tableName);
- } else {
- tableId = TableId.of(projectId, dataset, tableName);
- }
+ final TableId tableId = getTableId(context, flowFile.getAttributes());
try {
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryStreaming.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryStreaming.java
index d8d9b3d..20fc401 100644
--- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryStreaming.java
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryStreaming.java
@@ -17,19 +17,11 @@
package org.apache.nifi.processors.gcp.bigquery;
-import java.io.InputStream;
-import java.sql.Date;
-import java.sql.Time;
-import java.sql.Timestamp;
-import java.time.Instant;
-import java.time.LocalDateTime;
-import java.time.ZoneOffset;
-import java.time.format.DateTimeFormatter;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.ArrayList;
-
+import com.google.cloud.bigquery.BigQueryError;
+import com.google.cloud.bigquery.InsertAllRequest;
+import com.google.cloud.bigquery.InsertAllResponse;
+import com.google.cloud.bigquery.TableId;
+import com.google.common.collect.ImmutableList;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SystemResource;
import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
@@ -51,13 +43,19 @@ import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.record.MapRecord;
import org.apache.nifi.serialization.record.Record;
-import org.apache.nifi.util.StringUtils;
-import com.google.cloud.bigquery.BigQueryError;
-import com.google.cloud.bigquery.InsertAllRequest;
-import com.google.cloud.bigquery.InsertAllResponse;
-import com.google.cloud.bigquery.TableId;
-import com.google.common.collect.ImmutableList;
+import java.io.InputStream;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
/**
* A processor for streaming loading data into a Google BigQuery table. It uses the BigQuery
@@ -124,17 +122,8 @@ public class PutBigQueryStreaming extends AbstractBigQueryProcessor {
if (flowFile == null) {
return;
}
-
- final String projectId = context.getProperty(PROJECT_ID).evaluateAttributeExpressions().getValue();
- final String dataset = context.getProperty(DATASET).evaluateAttributeExpressions(flowFile).getValue();
final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
-
- final TableId tableId;
- if (StringUtils.isEmpty(projectId)) {
- tableId = TableId.of(dataset, tableName);
- } else {
- tableId = TableId.of(projectId, dataset, tableName);
- }
+ final TableId tableId = getTableId(context, flowFile.getAttributes());
try {
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/credentials/service/GCPCredentialsControllerService.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/credentials/service/GCPCredentialsControllerService.java
index ce5ada6..b161e61 100644
--- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/credentials/service/GCPCredentialsControllerService.java
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/credentials/service/GCPCredentialsControllerService.java
@@ -21,12 +21,16 @@ import com.google.auth.oauth2.GoogleCredentials;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.ConfigVerificationResult;
+import org.apache.nifi.components.ConfigVerificationResult.Outcome;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.controller.VerifiableControllerService;
import org.apache.nifi.gcp.credentials.service.GCPCredentialsService;
+import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processors.gcp.ProxyAwareTransportFactory;
import org.apache.nifi.processors.gcp.credentials.factory.CredentialsFactory;
@@ -38,6 +42,7 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
import static org.apache.nifi.processors.gcp.credentials.factory.CredentialPropertyDescriptors.SERVICE_ACCOUNT_JSON;
import static org.apache.nifi.processors.gcp.credentials.factory.CredentialPropertyDescriptors.SERVICE_ACCOUNT_JSON_FILE;
@@ -55,7 +60,7 @@ import static org.apache.nifi.processors.gcp.credentials.factory.CredentialPrope
"a credential file, the config generated by `gcloud auth application-default login`, AppEngine/Compute Engine" +
" service accounts, etc.")
@Tags({ "gcp", "credentials","provider" })
-public class GCPCredentialsControllerService extends AbstractControllerService implements GCPCredentialsService {
+public class GCPCredentialsControllerService extends AbstractControllerService implements GCPCredentialsService, VerifiableControllerService {
private static final List<PropertyDescriptor> properties;
@@ -88,17 +93,41 @@ public class GCPCredentialsControllerService extends AbstractControllerService i
return results;
}
+ @Override
+ public List<ConfigVerificationResult> verify(final ConfigurationContext context, final ComponentLog verificationLogger, final Map<String, String> variables) {
+ ConfigVerificationResult result;
+ try {
+ final GoogleCredentials credentials = getGoogleCredentials(context);
+ result = new ConfigVerificationResult.Builder()
+ .verificationStepName("Provide Google Credentials")
+ .outcome(Outcome.SUCCESSFUL)
+ .explanation(String.format("Successfully provided [%s] as Google Credentials", credentials.getClass().getSimpleName()))
+ .build();
+ } catch (final IOException e) {
+ result = new ConfigVerificationResult.Builder()
+ .verificationStepName("Provide Google Credentials")
+ .outcome(Outcome.FAILED)
+ .explanation(String.format("Failed to provide Google Credentials: " + e.getMessage()))
+ .build();
+ }
+ return Collections.singletonList(result);
+ }
+
@OnEnabled
public void onConfigured(final ConfigurationContext context) throws InitializationException {
try {
- final ProxyConfiguration proxyConfiguration = ProxyConfiguration.getConfiguration(context);
- final HttpTransportFactory transportFactory = new ProxyAwareTransportFactory(proxyConfiguration);
- googleCredentials = credentialsProviderFactory.getGoogleCredentials(context.getProperties(), transportFactory);
- } catch (IOException e) {
+ googleCredentials = getGoogleCredentials(context);
+ } catch (final IOException e) {
throw new InitializationException(e);
}
}
+ private GoogleCredentials getGoogleCredentials(final ConfigurationContext context) throws IOException {
+ final ProxyConfiguration proxyConfiguration = ProxyConfiguration.getConfiguration(context);
+ final HttpTransportFactory transportFactory = new ProxyAwareTransportFactory(proxyConfiguration);
+ return credentialsProviderFactory.getGoogleCredentials(context.getProperties(), transportFactory);
+ }
+
@Override
public String toString() {
return "GCPCredentialsControllerService[id=" + getIdentifier() + "]";
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/AbstractGCPubSubProcessor.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/AbstractGCPubSubProcessor.java
index 8930a27..fc135db 100644
--- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/AbstractGCPubSubProcessor.java
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/AbstractGCPubSubProcessor.java
@@ -24,6 +24,7 @@ import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.VerifiableProcessor;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.gcp.AbstractGCPProcessor;
@@ -33,7 +34,7 @@ import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
-public abstract class AbstractGCPubSubProcessor extends AbstractGCPProcessor {
+public abstract class AbstractGCPubSubProcessor extends AbstractGCPProcessor implements VerifiableProcessor {
public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
.name("gcp-pubsub-publish-batch-size")
@@ -69,7 +70,7 @@ public abstract class AbstractGCPubSubProcessor extends AbstractGCPProcessor {
@Override
protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
- final Collection<ValidationResult> results = super.customValidate(validationContext);
+ final Collection<ValidationResult> results = new HashSet<>(super.customValidate(validationContext));
final boolean projectId = validationContext.getProperty(PROJECT_ID).isSet();
if (!projectId) {
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/ConsumeGCPubSub.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/ConsumeGCPubSub.java
index 70b9e26..927a595 100644
--- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/ConsumeGCPubSub.java
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/ConsumeGCPubSub.java
@@ -17,10 +17,13 @@
package org.apache.nifi.processors.gcp.pubsub;
import com.google.api.gax.core.FixedCredentialsProvider;
+import com.google.api.gax.rpc.ApiException;
+import com.google.api.pathtemplate.ValidationException;
import com.google.cloud.pubsub.v1.stub.GrpcSubscriberStub;
import com.google.cloud.pubsub.v1.stub.SubscriberStub;
import com.google.cloud.pubsub.v1.stub.SubscriberStubSettings;
import com.google.common.collect.ImmutableList;
+import com.google.iam.v1.TestIamPermissionsRequest;
import com.google.pubsub.v1.AcknowledgeRequest;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PullRequest;
@@ -34,9 +37,12 @@ import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.ConfigVerificationResult;
+import org.apache.nifi.components.ConfigVerificationResult.Outcome;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
@@ -79,6 +85,8 @@ import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.SERIALIZED_
})
public class ConsumeGCPubSub extends AbstractGCPubSubProcessor {
+ private static final List<String> REQUIRED_PERMISSIONS = Collections.singletonList("pubsub.subscriptions.consume");
+
public static final PropertyDescriptor SUBSCRIPTION = new PropertyDescriptor.Builder()
.name("gcp-pubsub-subscription")
.displayName("Subscription")
@@ -99,7 +107,7 @@ public class ConsumeGCPubSub extends AbstractGCPubSubProcessor {
pullRequest = PullRequest.newBuilder()
.setMaxMessages(batchSize)
- .setSubscription(getSubscriptionName(context))
+ .setSubscription(getSubscriptionName(context, null))
.build();
try {
@@ -110,6 +118,70 @@ public class ConsumeGCPubSub extends AbstractGCPubSubProcessor {
}
}
+ @Override
+ public List<ConfigVerificationResult> verify(final ProcessContext context, final ComponentLog verificationLogger, final Map<String, String> attributes) {
+ final List<ConfigVerificationResult> results = new ArrayList<>();
+ String subscriptionName = null;
+ try {
+ subscriptionName = getSubscriptionName(context, attributes);
+ results.add(new ConfigVerificationResult.Builder()
+ .verificationStepName("Parse Subscription Name")
+ .outcome(Outcome.SUCCESSFUL)
+ .explanation("Successfully parsed Subscription Name")
+ .build());
+ } catch (final ValidationException e) {
+ verificationLogger.error("Failed to parse Subscription Name", e);
+ results.add(new ConfigVerificationResult.Builder()
+ .verificationStepName("Parse Subscription Name")
+ .outcome(Outcome.FAILED)
+ .explanation(String.format("Failed to parse Subscription Name: " + e.getMessage()))
+ .build());
+ }
+ SubscriberStub subscriber = null;
+ try {
+ subscriber = getSubscriber(context);
+ results.add(new ConfigVerificationResult.Builder()
+ .verificationStepName("Create Subscriber")
+ .outcome(Outcome.SUCCESSFUL)
+ .explanation("Successfully created Subscriber")
+ .build());
+ } catch (final IOException e) {
+ verificationLogger.error("Failed to create Subscriber", e);
+ results.add(new ConfigVerificationResult.Builder()
+ .verificationStepName("Create Subscriber")
+ .outcome(Outcome.FAILED)
+ .explanation(String.format("Failed to create Subscriber: " + e.getMessage()))
+ .build());
+ }
+
+ if (subscriber != null && subscriptionName != null) {
+ try {
+ final TestIamPermissionsRequest request = TestIamPermissionsRequest.newBuilder().addAllPermissions(REQUIRED_PERMISSIONS).setResource(subscriptionName).build();
+ if (subscriber.testIamPermissionsCallable().call(request).getPermissionsCount() >= REQUIRED_PERMISSIONS.size()) {
+ results.add(new ConfigVerificationResult.Builder()
+ .verificationStepName("Test IAM Permissions")
+ .outcome(ConfigVerificationResult.Outcome.SUCCESSFUL)
+ .explanation(String.format("Verified Subscription [%s] exists and the configured user has the correct permissions.", subscriptionName))
+ .build());
+ } else {
+ results.add(new ConfigVerificationResult.Builder()
+ .verificationStepName("Test IAM Permissions")
+ .outcome(ConfigVerificationResult.Outcome.FAILED)
+ .explanation(String.format("The configured user does not have the correct permissions on Subscription [%s].", subscriptionName))
+ .build());
+ }
+ } catch (final ApiException e) {
+ verificationLogger.error("The configured user appears to have the correct permissions, but the following error was encountered", e);
+ results.add(new ConfigVerificationResult.Builder()
+ .verificationStepName("Test IAM Permissions")
+ .outcome(ConfigVerificationResult.Outcome.FAILED)
+ .explanation(String.format("The configured user appears to have the correct permissions, but the following error was encountered: " + e.getMessage()))
+ .build());
+ }
+ }
+ return results;
+ }
+
@OnStopped
public void onStopped() {
if (subscriber != null) {
@@ -145,7 +217,7 @@ public class ConsumeGCPubSub extends AbstractGCPubSubProcessor {
final PullResponse pullResponse = subscriber.pullCallable().call(pullRequest);
final List<String> ackIds = new ArrayList<>();
- final String subscriptionName = getSubscriptionName(context);
+ final String subscriptionName = getSubscriptionName(context, null);
for (ReceivedMessage message : pullResponse.getReceivedMessagesList()) {
if (message.hasMessage()) {
@@ -184,9 +256,9 @@ public class ConsumeGCPubSub extends AbstractGCPubSubProcessor {
subscriber.acknowledgeCallable().call(acknowledgeRequest);
}
- private String getSubscriptionName(final ProcessContext context) {
- final String subscriptionName = context.getProperty(SUBSCRIPTION).evaluateAttributeExpressions().getValue();
- final String projectId = context.getProperty(PROJECT_ID).evaluateAttributeExpressions().getValue();
+ private String getSubscriptionName(final ProcessContext context, final Map<String, String> additionalAttributes) {
+ final String subscriptionName = context.getProperty(SUBSCRIPTION).evaluateAttributeExpressions(additionalAttributes).getValue();
+ final String projectId = context.getProperty(PROJECT_ID).evaluateAttributeExpressions(additionalAttributes).getValue();
if (subscriptionName.contains("/")) {
return ProjectSubscriptionName.parse(subscriptionName).toString();
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/PublishGCPubSub.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/PublishGCPubSub.java
index 5ad4e76..71ba4ad 100644
--- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/PublishGCPubSub.java
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/PublishGCPubSub.java
@@ -19,18 +19,23 @@ package org.apache.nifi.processors.gcp.pubsub;
import com.google.api.core.ApiFuture;
import com.google.api.gax.batching.BatchingSettings;
import com.google.api.gax.core.FixedCredentialsProvider;
+import com.google.api.gax.rpc.ApiException;
import com.google.api.gax.rpc.DeadlineExceededException;
import com.google.cloud.pubsub.v1.Publisher;
+import com.google.cloud.pubsub.v1.stub.GrpcPublisherStub;
+import com.google.cloud.pubsub.v1.stub.PublisherStubSettings;
import com.google.common.collect.ImmutableList;
+import com.google.iam.v1.TestIamPermissionsRequest;
+import com.google.iam.v1.TestIamPermissionsResponse;
import com.google.protobuf.ByteString;
import com.google.protobuf.Timestamp;
import com.google.pubsub.v1.ProjectTopicName;
import com.google.pubsub.v1.PubsubMessage;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.SystemResource;
import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
-import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
@@ -38,9 +43,12 @@ import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.ConfigVerificationResult;
+import org.apache.nifi.components.ConfigVerificationResult.Outcome;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
@@ -80,6 +88,7 @@ import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.TOPIC_NAME_
@SystemResourceConsideration(resource = SystemResource.MEMORY, description = "The entirety of the FlowFile's content "
+ "will be read into memory to be sent as a PubSub message.")
public class PublishGCPubSub extends AbstractGCPubSubProcessor{
+ private static final List<String> REQUIRED_PERMISSIONS = Collections.singletonList("pubsub.topics.publish");
public static final PropertyDescriptor TOPIC_NAME = new PropertyDescriptor.Builder()
.name("gcp-pubsub-topic")
@@ -137,6 +146,72 @@ public class PublishGCPubSub extends AbstractGCPubSubProcessor{
}
@Override
+ public List<ConfigVerificationResult> verify(final ProcessContext context, final ComponentLog verificationLogger, final Map<String, String> attributes) {
+ final List<ConfigVerificationResult> results = new ArrayList<>();
+ Publisher publisher = null;
+ try {
+ publisher = getPublisherBuilder(context).build();
+ results.add(new ConfigVerificationResult.Builder()
+ .verificationStepName("Create Publisher")
+ .outcome(Outcome.SUCCESSFUL)
+ .explanation("Successfully created Publisher")
+ .build());
+ } catch (final IOException e) {
+ verificationLogger.error("Failed to create Publisher", e);
+ results.add(new ConfigVerificationResult.Builder()
+ .verificationStepName("Create Publisher")
+ .outcome(Outcome.FAILED)
+ .explanation(String.format("Failed to create Publisher: " + e.getMessage()))
+ .build());
+ }
+
+ if (publisher != null) {
+ try {
+ final PublisherStubSettings publisherStubSettings = PublisherStubSettings.newBuilder()
+ .setCredentialsProvider(FixedCredentialsProvider.create(getGoogleCredentials(context)))
+ .build();
+
+ final GrpcPublisherStub publisherStub = GrpcPublisherStub.create(publisherStubSettings);
+ final String topicName = context.getProperty(TOPIC_NAME).evaluateAttributeExpressions().getValue();
+ final TestIamPermissionsRequest request = TestIamPermissionsRequest.newBuilder()
+ .addAllPermissions(REQUIRED_PERMISSIONS)
+ .setResource(topicName)
+ .build();
+ final TestIamPermissionsResponse response = publisherStub.testIamPermissionsCallable().call(request);
+ if (response.getPermissionsCount() >= REQUIRED_PERMISSIONS.size()) {
+ results.add(new ConfigVerificationResult.Builder()
+ .verificationStepName("Test IAM Permissions")
+ .outcome(ConfigVerificationResult.Outcome.SUCCESSFUL)
+ .explanation(String.format("Verified Topic [%s] exists and the configured user has the correct permissions.", topicName))
+ .build());
+ } else {
+ results.add(new ConfigVerificationResult.Builder()
+ .verificationStepName("Test IAM Permissions")
+ .outcome(ConfigVerificationResult.Outcome.FAILED)
+ .explanation(String.format("The configured user does not have the correct permissions on Topic [%s].", topicName))
+ .build());
+ }
+ } catch (final ApiException e) {
+ verificationLogger.error("The configured user appears to have the correct permissions, but the following error was encountered", e);
+ results.add(new ConfigVerificationResult.Builder()
+ .verificationStepName("Test IAM Permissions")
+ .outcome(ConfigVerificationResult.Outcome.FAILED)
+ .explanation(String.format("The configured user appears to have the correct permissions, but the following error was encountered: " + e.getMessage()))
+ .build());
+ } catch (final IOException e) {
+ verificationLogger.error("The publisher stub could not be created in order to test the permissions", e);
+ results.add(new ConfigVerificationResult.Builder()
+ .verificationStepName("Test IAM Permissions")
+ .outcome(ConfigVerificationResult.Outcome.FAILED)
+ .explanation(String.format("The publisher stub could not be created in order to test the permissions: " + e.getMessage()))
+ .build());
+
+ }
+ }
+ return results;
+ }
+
+ @Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
final int flowFileCount = context.getProperty(BATCH_SIZE).asInteger();
final List<FlowFile> flowFiles = session.get(flowFileCount);
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/AbstractGCSProcessor.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/AbstractGCSProcessor.java
index 81af2ad..0c82c62 100644
--- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/AbstractGCSProcessor.java
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/AbstractGCSProcessor.java
@@ -16,34 +16,39 @@
*/
package org.apache.nifi.processors.gcp.storage;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
+import com.google.api.gax.retrying.RetrySettings;
+import com.google.auth.oauth2.GoogleCredentials;
+import com.google.cloud.BaseServiceException;
+import com.google.cloud.storage.Storage;
+import com.google.cloud.storage.StorageOptions;
+import com.google.common.collect.ImmutableList;
+import org.apache.nifi.components.ConfigVerificationResult;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.VerifiableProcessor;
import org.apache.nifi.processors.gcp.AbstractGCPProcessor;
import org.apache.nifi.processors.gcp.ProxyAwareTransportFactory;
import org.apache.nifi.proxy.ProxyConfiguration;
-import com.google.api.gax.retrying.RetrySettings;
-import com.google.auth.oauth2.GoogleCredentials;
-import com.google.cloud.storage.Storage;
-import com.google.cloud.storage.StorageOptions;
-import com.google.common.collect.ImmutableList;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
/**
* Base class for creating processors which connect to Google Cloud Storage.
*
* Every GCS processor operation requires a bucket, whether it's reading or writing from said bucket.
*/
-public abstract class AbstractGCSProcessor extends AbstractGCPProcessor<Storage, StorageOptions> {
+public abstract class AbstractGCSProcessor extends AbstractGCPProcessor<Storage, StorageOptions> implements VerifiableProcessor {
public static final Relationship REL_SUCCESS =
new Relationship.Builder().name("success")
.description("FlowFiles are routed to this relationship after a successful Google Cloud Storage operation.")
@@ -69,6 +74,39 @@ public abstract class AbstractGCSProcessor extends AbstractGCPProcessor<Storage,
}
@Override
+ public List<ConfigVerificationResult> verify(final ProcessContext context, final ComponentLog verificationLogger, final Map<String, String> attributes) {
+ final List<ConfigVerificationResult> results = new ArrayList<>(verifyCloudService(context, verificationLogger, attributes));
+ final Storage storage = getCloudService(context);
+ if (storage != null) {
+ try {
+ final String bucket = getBucketName(context, attributes);
+ final List<String> requiredPermissions = getRequiredPermissions();
+ if (storage.testIamPermissions(bucket, requiredPermissions).size() >= requiredPermissions.size()) {
+ results.add(new ConfigVerificationResult.Builder()
+ .verificationStepName("Test IAM Permissions")
+ .outcome(ConfigVerificationResult.Outcome.SUCCESSFUL)
+ .explanation(String.format("Verified Bucket [%s] exists and the configured user has the correct permissions.", bucket))
+ .build());
+ } else {
+ results.add(new ConfigVerificationResult.Builder()
+ .verificationStepName("Test IAM Permissions")
+ .outcome(ConfigVerificationResult.Outcome.FAILED)
+ .explanation(String.format("The configured user does not have the correct permissions on Bucket [%s].", bucket))
+ .build());
+ }
+ } catch (final BaseServiceException e) {
+ verificationLogger.error("The configured user appears to have the correct permissions, but the following error was encountered", e);
+ results.add(new ConfigVerificationResult.Builder()
+ .verificationStepName("Test IAM Permissions")
+ .outcome(ConfigVerificationResult.Outcome.FAILED)
+ .explanation(String.format("The configured user appears to have the correct permissions, but the following error was encountered: " + e.getMessage()))
+ .build());
+ }
+ }
+ return results;
+ }
+
+ @Override
protected final Collection<ValidationResult> customValidate(ValidationContext validationContext) {
final Collection<ValidationResult> results = super.customValidate(validationContext);
ProxyConfiguration.validateProxySpec(validationContext, results, ProxyAwareTransportFactory.PROXY_SPECS);
@@ -82,6 +120,15 @@ public abstract class AbstractGCSProcessor extends AbstractGCPProcessor<Storage,
protected void customValidate(ValidationContext validationContext, Collection<ValidationResult> results) {
}
+ /**
+ * @return The list of GCP permissions required for the processor
+ */
+ protected abstract List<String> getRequiredPermissions();
+
+ protected String getBucketName(final ProcessContext context, final Map<String, String> attributes) {
+ return context.getProperty("gcs-bucket").evaluateAttributeExpressions(attributes).getValue();
+ }
+
@Override
protected StorageOptions getServiceOptions(ProcessContext context, GoogleCredentials credentials) {
final String projectId = context.getProperty(PROJECT_ID).evaluateAttributeExpressions().getValue();
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/DeleteGCSObject.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/DeleteGCSObject.java
index 66d51db..c30128c 100644
--- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/DeleteGCSObject.java
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/DeleteGCSObject.java
@@ -32,6 +32,7 @@ import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
+import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
@@ -87,6 +88,11 @@ public class DeleteGCSObject extends AbstractGCSProcessor {
}
@Override
+ protected List<String> getRequiredPermissions() {
+ return Collections.singletonList("storage.objects.delete");
+ }
+
+ @Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
FlowFile flowFile = session.get();
if (flowFile == null) {
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/FetchGCSObject.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/FetchGCSObject.java
index f04eb05..b471003 100644
--- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/FetchGCSObject.java
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/FetchGCSObject.java
@@ -23,17 +23,23 @@ import com.google.cloud.storage.BlobId;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageException;
import com.google.common.collect.ImmutableList;
+import org.apache.commons.io.IOUtils;
import org.apache.commons.io.input.BoundedInputStream;
+import org.apache.commons.io.output.CountingOutputStream;
+import org.apache.commons.io.output.NullOutputStream;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.ConfigVerificationResult;
+import org.apache.nifi.components.ConfigVerificationResult.Outcome;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
@@ -41,8 +47,10 @@ import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import java.io.IOException;
+import java.io.InputStream;
import java.nio.channels.Channels;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@@ -195,10 +203,45 @@ public class FetchGCSObject extends AbstractGCSProcessor {
.build();
}
+ @Override
+ protected List<String> getRequiredPermissions() {
+ return Collections.singletonList("storage.objects.get");
+ }
+
+ @Override
+ public List<ConfigVerificationResult> verify(final ProcessContext context, final ComponentLog verificationLogger, final Map<String, String> attributes) {
+ final List<ConfigVerificationResult> results = new ArrayList<>(super.verify(context, verificationLogger, attributes));
+
+ final String bucketName = context.getProperty(BUCKET).evaluateAttributeExpressions(attributes).getValue();
+ final String key = context.getProperty(KEY).evaluateAttributeExpressions(attributes).getValue();
+
+ final Storage storage = getCloudService(context);
+ try {
+ final FetchedBlob blob = fetchBlob(context, storage, attributes);
+
+ final CountingOutputStream out = new CountingOutputStream(NullOutputStream.NULL_OUTPUT_STREAM);
+ IOUtils.copy(blob.contents, out);
+ final long byteCount = out.getByteCount();
+ results.add(new ConfigVerificationResult.Builder()
+ .verificationStepName("Fetch GCS Blob")
+ .outcome(Outcome.SUCCESSFUL)
+ .explanation(String.format("Successfully fetched [%s] from Bucket [%s], totaling %s bytes", key, bucketName, byteCount))
+ .build());
+ } catch (final StorageException | IOException e) {
+ getLogger().error(String.format("Failed to fetch [%s] from Bucket [%s]", key, bucketName), e);
+ results.add(new ConfigVerificationResult.Builder()
+ .verificationStepName("Fetch GCS Blob")
+ .outcome(Outcome.FAILED)
+ .explanation(String.format("Failed to fetch [%s] from Bucket [%s]: %s", key, bucketName, e.getMessage()))
+ .build());
+ }
+
+ return results;
+ }
@Override
- public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
+ public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
FlowFile flowFile = session.get();
if (flowFile == null) {
return;
@@ -208,51 +251,19 @@ public class FetchGCSObject extends AbstractGCSProcessor {
final String bucketName = context.getProperty(BUCKET).evaluateAttributeExpressions(flowFile).getValue();
final String key = context.getProperty(KEY).evaluateAttributeExpressions(flowFile).getValue();
- final Long generation = context.getProperty(GENERATION).evaluateAttributeExpressions(flowFile).asLong();
- final String encryptionKey = context.getProperty(ENCRYPTION_KEY).evaluateAttributeExpressions(flowFile).getValue();
final Storage storage = getCloudService();
- final BlobId blobId = BlobId.of(bucketName, key, generation);
final long rangeStart = (context.getProperty(RANGE_START).isSet() ? context.getProperty(RANGE_START).evaluateAttributeExpressions(flowFile).asDataSize(DataUnit.B).longValue() : 0L);
final Long rangeLength = (context.getProperty(RANGE_LENGTH).isSet() ? context.getProperty(RANGE_LENGTH).evaluateAttributeExpressions(flowFile).asDataSize(DataUnit.B).longValue() : null);
try {
- final List<Storage.BlobSourceOption> blobSourceOptions = new ArrayList<>(2);
-
- if (encryptionKey != null) {
- blobSourceOptions.add(Storage.BlobSourceOption.decryptionKey(encryptionKey));
- }
-
- if (generation != null) {
- blobSourceOptions.add(Storage.BlobSourceOption.generationMatch());
- }
-
- final Blob blob = storage.get(blobId);
- if (blob == null) {
- throw new StorageException(404, "Blob " + blobId + " not found");
- }
-
- if (rangeStart > 0 && rangeStart >= blob.getSize()) {
- if (getLogger().isDebugEnabled()) {
- getLogger().debug("Start position: {}, blob size: {}", new Object[] {rangeStart, blob.getSize()});
- }
- throw new StorageException(416, "The range specified is not valid for the blob " + blobId
- + ". Range Start is beyond the end of the blob.");
- }
-
- final ReadChannel reader = storage.reader(blobId, blobSourceOptions.toArray(new Storage.BlobSourceOption[0]));
- reader.seek(rangeStart);
-
- if (rangeLength == null) {
- flowFile = session.importFrom(Channels.newInputStream(reader), flowFile);
- } else {
- flowFile = session.importFrom(new BoundedInputStream(Channels.newInputStream(reader), rangeLength), flowFile);
- }
+ final FetchedBlob blob = fetchBlob(context, storage, flowFile.getAttributes());
+ flowFile = session.importFrom(blob.contents, flowFile);
- final Map<String, String> attributes = StorageAttributes.createAttributes(blob);
+ final Map<String, String> attributes = StorageAttributes.createAttributes(blob.blob);
flowFile = session.putAllAttributes(flowFile, attributes);
- } catch (StorageException | IOException e) {
+ } catch (final StorageException | IOException e) {
getLogger().error("Failed to fetch GCS Object due to {}", new Object[] {e}, e);
flowFile = session.penalize(flowFile);
session.transfer(flowFile, REL_FAILURE);
@@ -265,4 +276,64 @@ public class FetchGCSObject extends AbstractGCSProcessor {
getLogger().info("Successfully retrieved GCS Object for {} in {} millis; routing to success", new Object[]{flowFile, millis});
session.getProvenanceReporter().fetch(flowFile, "https://" + bucketName + ".storage.googleapis.com/" + key, millis);
}
+
+ private FetchedBlob fetchBlob(final ProcessContext context, final Storage storage, final Map<String, String> attributes) throws IOException {
+ final String bucketName = context.getProperty(BUCKET).evaluateAttributeExpressions(attributes).getValue();
+ final String key = context.getProperty(KEY).evaluateAttributeExpressions(attributes).getValue();
+ final Long generation = context.getProperty(GENERATION).evaluateAttributeExpressions(attributes).asLong();
+ final long rangeStart = (context.getProperty(RANGE_START).isSet() ? context.getProperty(RANGE_START).evaluateAttributeExpressions(attributes).asDataSize(DataUnit.B).longValue() : 0L);
+ final Long rangeLength = (context.getProperty(RANGE_LENGTH).isSet() ? context.getProperty(RANGE_LENGTH).evaluateAttributeExpressions(attributes).asDataSize(DataUnit.B).longValue() : null);
+
+ final BlobId blobId = BlobId.of(bucketName, key, generation);
+ final List<Storage.BlobSourceOption> blobSourceOptions = getBlobSourceOptions(context, attributes);
+
+ if (blobId.getName() == null || blobId.getName().isEmpty()) {
+ throw new IllegalArgumentException("Name is required");
+ }
+ final Blob blob = storage.get(blobId);
+ if (blob == null) {
+ throw new StorageException(404, "Blob " + blobId + " not found");
+ }
+ if (rangeStart > 0 && rangeStart >= blob.getSize()) {
+ if (getLogger().isDebugEnabled()) {
+ getLogger().debug("Start position: {}, blob size: {}", new Object[] {rangeStart, blob.getSize()});
+ }
+ throw new StorageException(416, "The range specified is not valid for the blob " + blob.getBlobId()
+ + ". Range Start is beyond the end of the blob.");
+ }
+
+ final ReadChannel reader = storage.reader(blob.getBlobId(), blobSourceOptions.toArray(new Storage.BlobSourceOption[0]));
+ reader.seek(rangeStart);
+
+ final InputStream rawInputStream = Channels.newInputStream(reader);
+ final InputStream blobContents = rangeLength == null ? rawInputStream : new BoundedInputStream(rawInputStream, rangeLength);
+
+ return new FetchedBlob(blobContents, blob);
+ }
+
+ private List<Storage.BlobSourceOption> getBlobSourceOptions(final ProcessContext context, final Map<String, String> attributes) {
+ final Long generation = context.getProperty(GENERATION).evaluateAttributeExpressions(attributes).asLong();
+ final String encryptionKey = context.getProperty(ENCRYPTION_KEY).evaluateAttributeExpressions(attributes).getValue();
+
+ final List<Storage.BlobSourceOption> blobSourceOptions = new ArrayList<>(2);
+
+ if (encryptionKey != null) {
+ blobSourceOptions.add(Storage.BlobSourceOption.decryptionKey(encryptionKey));
+ }
+
+ if (generation != null) {
+ blobSourceOptions.add(Storage.BlobSourceOption.generationMatch());
+ }
+ return blobSourceOptions;
+ }
+
+ private class FetchedBlob {
+ private final InputStream contents;
+ private final Blob blob;
+
+ private FetchedBlob(final InputStream contents, final Blob blob) {
+ this.contents = contents;
+ this.blob = blob;
+ }
+ }
}
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/ListGCSBucket.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/ListGCSBucket.java
index ede0420..dd43e78 100644
--- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/ListGCSBucket.java
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/ListGCSBucket.java
@@ -33,6 +33,8 @@ import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.ConfigVerificationResult;
+import org.apache.nifi.components.ConfigVerificationResult.Outcome;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateMap;
@@ -266,6 +268,41 @@ public class ListGCSBucket extends AbstractGCSProcessor {
}
@Override
+ protected List<String> getRequiredPermissions() {
+ return Collections.singletonList("storage.objects.list");
+ }
+
+ @Override
+ protected String getBucketName(final ProcessContext context, final Map<String, String> attributes) {
+ return context.getProperty(BUCKET).evaluateAttributeExpressions().getValue();
+ }
+
+ @Override
+ public List<ConfigVerificationResult> verify(final ProcessContext context, final ComponentLog verificationLogger, final Map<String, String> attributes) {
+ final List<ConfigVerificationResult> results = new ArrayList<>(super.verify(context, verificationLogger, attributes));
+ final String bucketName = getBucketName(context, attributes);
+
+ try {
+ final VerifyListingAction listingAction = new VerifyListingAction(context);
+ listBucket(context, listingAction);
+ final int blobCount = listingAction.getBlobWriter().getCount();
+ results.add(new ConfigVerificationResult.Builder()
+ .verificationStepName("List GCS Bucket")
+ .outcome(Outcome.SUCCESSFUL)
+ .explanation(String.format("Successfully listed Bucket [%s], finding %s blobs matching the filter", bucketName, blobCount))
+ .build());
+ } catch (final Exception e) {
+ verificationLogger.error("Failed to list GCS Bucket", e);
+ results.add(new ConfigVerificationResult.Builder()
+ .verificationStepName("List GCS Bucket")
+ .outcome(Outcome.FAILED)
+ .explanation(String.format("Failed to list Bucket [%s]: %s", bucketName, e.getMessage()))
+ .build());
+ }
+ return results;
+ }
+
+ @Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
try {
restoreState(session);
@@ -277,6 +314,22 @@ public class ListGCSBucket extends AbstractGCSProcessor {
final long startNanos = System.nanoTime();
+ final ListingAction listingAction = new TriggerListingAction(context, session);
+ try {
+ listBucket(context, listingAction);
+ } catch (final Exception e) {
+ getLogger().error("Failed to list contents of GCS Bucket due to {}", new Object[] {e}, e);
+ listingAction.getBlobWriter().finishListingExceptionally(e);
+ session.rollback();
+ context.yield();
+ return;
+ }
+
+ final long listMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
+ getLogger().info("Successfully listed GCS bucket {} in {} millis", new Object[]{ context.getProperty(BUCKET).evaluateAttributeExpressions().getValue(), listMillis });
+ }
+
+ private void listBucket(final ProcessContext context, final ListingAction listingAction) throws IOException, SchemaNotFoundException {
final String bucket = context.getProperty(BUCKET).evaluateAttributeExpressions().getValue();
final String prefix = context.getProperty(PREFIX).evaluateAttributeExpressions().getValue();
final boolean useGenerations = context.getProperty(USE_GENERATIONS).asBoolean();
@@ -289,87 +342,160 @@ public class ListGCSBucket extends AbstractGCSProcessor {
listOptions.add(Storage.BlobListOption.versions(true));
}
- final Storage storage = getCloudService();
+ final Storage storage = listingAction.getCloudService();
long maxTimestamp = 0L;
final Set<String> keysMatchingTimestamp = new HashSet<>();
- final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
+ final BlobWriter writer = listingAction.getBlobWriter();
- final BlobWriter writer;
- if (writerFactory == null) {
- writer = new AttributeBlobWriter(session);
- } else {
- writer = new RecordBlobWriter(session, writerFactory, getLogger());
- }
+ writer.beginListing();
- try {
- writer.beginListing();
-
- Page<Blob> blobPage = storage.list(bucket, listOptions.toArray(new Storage.BlobListOption[0]));
- int listCount = 0;
-
- do {
- for (final Blob blob : blobPage.getValues()) {
- long lastModified = blob.getUpdateTime();
- if (lastModified < currentTimestamp || lastModified == currentTimestamp && currentKeys.contains(blob.getName())) {
- continue;
- }
-
- writer.addToListing(blob);
-
- // Update state
- if (lastModified > maxTimestamp) {
- maxTimestamp = lastModified;
- keysMatchingTimestamp.clear();
- }
- if (lastModified == maxTimestamp) {
- keysMatchingTimestamp.add(blob.getName());
- }
-
- listCount++;
+ Page<Blob> blobPage = storage.list(bucket, listOptions.toArray(new Storage.BlobListOption[0]));
+ int listCount = 0;
+
+ do {
+ for (final Blob blob : blobPage.getValues()) {
+ long lastModified = blob.getUpdateTime();
+ if (listingAction.skipBlob(blob)) {
+ continue;
}
- if (writer.isCheckpoint()) {
- commit(session, listCount);
- listCount = 0;
+ writer.addToListing(blob);
+
+ // Update state
+ if (lastModified > maxTimestamp) {
+ maxTimestamp = lastModified;
+ keysMatchingTimestamp.clear();
}
+ if (lastModified == maxTimestamp) {
+ keysMatchingTimestamp.add(blob.getName());
+ }
+
+ listCount++;
+ }
- blobPage = blobPage.getNextPage();
- } while (blobPage != null);
+ if (writer.isCheckpoint()) {
+ listingAction.commit(listCount);
+ listCount = 0;
+ }
- writer.finishListing();
+ blobPage = blobPage.getNextPage();
+ } while (blobPage != null);
+
+ writer.finishListing();
+
+ listingAction.finishListing(listCount, maxTimestamp, keysMatchingTimestamp);
+ }
+
+ private void commit(final ProcessSession session, final int listCount) {
+ if (listCount > 0) {
+ getLogger().info("Successfully listed {} new files from GCS; routing to success", new Object[] {listCount});
+ session.commitAsync();
+ }
+ }
+
+ private interface ListingAction<T extends BlobWriter> {
+ boolean skipBlob(final Blob blob);
+
+ T getBlobWriter();
+
+ Storage getCloudService();
+
+ void finishListing(int listCount, long maxTimestamp, Set<String> keysMatchingTimestamp);
+
+ void commit(int listCount);
+ }
+
+ private class TriggerListingAction implements ListingAction<BlobWriter> {
+ final ProcessContext context;
+ final ProcessSession session;
+ final BlobWriter blobWriter;
+
+ private TriggerListingAction(final ProcessContext context, final ProcessSession session) {
+ this.context = context;
+ this.session = session;
+
+ final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
+ if (writerFactory == null) {
+ blobWriter = new AttributeBlobWriter(session);
+ } else {
+ blobWriter = new RecordBlobWriter(session, writerFactory, getLogger());
+ }
+ }
+
+ @Override
+ public boolean skipBlob(final Blob blob) {
+ final long lastModified = blob.getUpdateTime();
+ return lastModified < currentTimestamp || lastModified == currentTimestamp && currentKeys.contains(blob.getName());
+ }
+
+ @Override
+ public void commit(final int listCount) {
+ ListGCSBucket.this.commit(session, listCount);
+ }
+
+ @Override
+ public BlobWriter getBlobWriter() {
+ return blobWriter;
+ }
+ @Override
+ public Storage getCloudService() {
+ return ListGCSBucket.this.getCloudService();
+ }
+
+ @Override
+ public void finishListing(final int listCount, final long maxTimestamp, final Set<String> keysMatchingTimestamp) {
if (maxTimestamp == 0) {
- getLogger().debug("No new objects in GCS bucket {} to list. Yielding.", bucket);
+ getLogger().debug("No new objects in GCS bucket {} to list. Yielding.", context.getProperty(BUCKET).evaluateAttributeExpressions().getValue());
context.yield();
} else {
- commit(session, listCount);
+ commit(listCount);
currentTimestamp = maxTimestamp;
currentKeys.clear();
currentKeys.addAll(keysMatchingTimestamp);
persistState(session, currentTimestamp, currentKeys);
}
- } catch (final Exception e) {
- getLogger().error("Failed to list contents of GCS Bucket due to {}", new Object[] {e}, e);
- writer.finishListingExceptionally(e);
- session.rollback();
- context.yield();
- return;
}
-
- final long listMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
- getLogger().info("Successfully listed GCS bucket {} in {} millis", new Object[]{bucket, listMillis});
}
- private void commit(final ProcessSession session, final int listCount) {
- if (listCount > 0) {
- getLogger().info("Successfully listed {} new files from GCS; routing to success", new Object[] {listCount});
- session.commitAsync();
+ private class VerifyListingAction implements ListingAction<CountingBlobWriter> {
+ final ProcessContext context;
+ final CountingBlobWriter blobWriter;
+
+ private VerifyListingAction(final ProcessContext context) {
+ this.context = context;
+ blobWriter = new CountingBlobWriter();
}
- }
+ @Override
+ public boolean skipBlob(final Blob blob) {
+ return false;
+ }
+
+ @Override
+ public void commit(final int listCount) {
+
+ }
+
+ @Override
+ public CountingBlobWriter getBlobWriter() {
+ return blobWriter;
+ }
+
+ @Override
+ public Storage getCloudService() {
+ // Use the verification context
+ return ListGCSBucket.this.getCloudService(context);
+ }
+
+ @Override
+ public void finishListing(final int listCount, final long maxTimestamp, final Set<String> keysMatchingTimestamp) {
+
+ }
+ }
private interface BlobWriter {
void beginListing() throws IOException, SchemaNotFoundException;
@@ -547,8 +673,6 @@ public class ListGCSBucket extends AbstractGCSProcessor {
}
}
-
-
/**
* Writes Blobs by creating a new FlowFile for each blob and writing information as FlowFile attributes
*/
@@ -586,4 +710,37 @@ public class ListGCSBucket extends AbstractGCSProcessor {
return true;
}
}
+
+ /**
+ * Simply counts the blobs.
+ */
+ private static class CountingBlobWriter implements BlobWriter {
+ private int count = 0;
+
+ @Override
+ public void beginListing() {
+ }
+
+ @Override
+ public void addToListing(final Blob blob) {
+ count++;
+ }
+
+ @Override
+ public void finishListing() {
+ }
+
+ @Override
+ public void finishListingExceptionally(final Exception cause) {
+ }
+
+ @Override
+ public boolean isCheckpoint() {
+ return false;
+ }
+
+ public int getCount() {
+ return count;
+ }
+ }
}
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/PutGCSObject.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/PutGCSObject.java
index 5be6e39..6c2f431 100644
--- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/PutGCSObject.java
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/storage/PutGCSObject.java
@@ -47,6 +47,7 @@ import java.io.BufferedInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -314,6 +315,11 @@ public class PutGCSObject extends AbstractGCSProcessor {
}
@Override
+ protected List<String> getRequiredPermissions() {
+ return Collections.singletonList("storage.objects.create");
+ }
+
+ @Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
FlowFile flowFile = session.get();
if (flowFile == null) {
@@ -400,7 +406,6 @@ public class PutGCSObject extends AbstractGCSProcessor {
}
try {
-
final Blob blob = storage.create(blobInfoBuilder.build(),
in,
blobWriteOptions.toArray(new Storage.BlobWriteOption[blobWriteOptions.size()])
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatchTest.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatchTest.java
index 5b51e0a..8561e62 100644
--- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatchTest.java
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatchTest.java
@@ -28,12 +28,20 @@ import com.google.cloud.bigquery.JobStatus;
import com.google.cloud.bigquery.Table;
import com.google.cloud.bigquery.TableDataWriteChannel;
import com.google.cloud.bigquery.WriteChannelConfiguration;
+import org.apache.nifi.components.ConfigVerificationResult;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.VerifiableProcessor;
import org.apache.nifi.util.TestRunner;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mock;
+import java.util.Collections;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.when;
@@ -88,6 +96,11 @@ public class PutBigQueryBatchTest extends AbstractBQTest {
protected BigQuery getCloudService() {
return bq;
}
+
+ @Override
+ protected BigQuery getCloudService(final ProcessContext context) {
+ return bq;
+ }
};
}
@@ -120,7 +133,8 @@ public class PutBigQueryBatchTest extends AbstractBQTest {
when(job.getJobId()).thenReturn(jobId);
when(jobId.getJob()).thenReturn("job-id");
- final TestRunner runner = buildNewRunner(getProcessor());
+ final AbstractBigQueryProcessor processor = getProcessor();
+ final TestRunner runner = buildNewRunner(processor);
addRequiredPropertiesToRunner(runner);
runner.assertValid();
@@ -128,6 +142,11 @@ public class PutBigQueryBatchTest extends AbstractBQTest {
runner.run();
+ when(bq.testIamPermissions(any(), any())).thenReturn(Collections.singletonList("permission"));
+ final List<ConfigVerificationResult> verificationResults = ((VerifiableProcessor) processor).verify(runner.getProcessContext(), runner.getLogger(), Collections.emptyMap());
+ assertEquals(2, verificationResults.size());
+ assertEquals(ConfigVerificationResult.Outcome.SUCCESSFUL, verificationResults.get(1).getOutcome());
+
runner.assertAllFlowFilesTransferred(PutBigQueryBatch.REL_SUCCESS);
}
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/storage/AbstractGCSTest.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/storage/AbstractGCSTest.java
index 8b94c47..4318f61 100644
--- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/storage/AbstractGCSTest.java
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/storage/AbstractGCSTest.java
@@ -21,9 +21,9 @@ import com.google.auth.oauth2.GoogleCredentials;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageOptions;
import com.google.cloud.storage.testing.RemoteStorageHelper;
+import org.apache.nifi.gcp.credentials.service.GCPCredentialsService;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.processors.gcp.credentials.service.GCPCredentialsControllerService;
-import org.apache.nifi.gcp.credentials.service.GCPCredentialsService;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Before;
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/storage/FetchGCSObjectIT.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/storage/FetchGCSObjectIT.java
index a8b8306..489a942 100644
--- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/storage/FetchGCSObjectIT.java
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/storage/FetchGCSObjectIT.java
@@ -17,13 +17,16 @@
package org.apache.nifi.processors.gcp.storage;
import com.google.common.collect.ImmutableMap;
+import org.apache.nifi.components.ConfigVerificationResult;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.junit.Test;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
/**
@@ -66,7 +69,8 @@ public class FetchGCSObjectIT extends AbstractGCSIT {
putTestFileEncrypted(KEY, CONTENT);
assertTrue(fileExists(KEY));
- final TestRunner runner = buildNewRunner(new FetchGCSObject());
+ final FetchGCSObject processor = new FetchGCSObject();
+ final TestRunner runner = buildNewRunner(processor);
runner.setProperty(FetchGCSObject.BUCKET, BUCKET);
runner.setProperty(FetchGCSObject.ENCRYPTION_KEY, ENCRYPTION_KEY);
@@ -77,6 +81,13 @@ public class FetchGCSObjectIT extends AbstractGCSIT {
runner.assertValid();
runner.run();
+ final Map<String, String> attributes = new HashMap<>();
+ attributes.put("filename", KEY);
+ final List<ConfigVerificationResult> results = processor.verify(runner.getProcessContext(), runner.getLogger(), attributes);
+ assertEquals(2, results.size());
+ assertEquals(ConfigVerificationResult.Outcome.SUCCESSFUL, results.get(1).getOutcome());
+ assertTrue(results.get(1).getExplanation().matches("Successfully fetched \\[delete-me\\] from Bucket \\[gcloud-test-bucket-temp-.*\\], totaling 3 bytes"));
+
runner.assertAllFlowFilesTransferred(FetchGCSObject.REL_SUCCESS, 1);
final List<MockFlowFile> ffs = runner.getFlowFilesForRelationship(FetchGCSObject.REL_SUCCESS);
MockFlowFile ff = ffs.get(0);
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/storage/FetchGCSObjectTest.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/storage/FetchGCSObjectTest.java
index 5e51270..3dc3a70 100644
--- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/storage/FetchGCSObjectTest.java
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/storage/FetchGCSObjectTest.java
@@ -27,6 +27,7 @@ import com.google.cloud.storage.StorageException;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.junit.Before;
@@ -116,6 +117,11 @@ public class FetchGCSObjectTest extends AbstractGCSTest {
protected Storage getCloudService() {
return storage;
}
+
+ @Override
+ protected Storage getCloudService(final ProcessContext context) {
+ return storage;
+ }
};
}
@@ -214,6 +220,8 @@ public class FetchGCSObjectTest extends AbstractGCSTest {
when(blob.getContentDisposition()).thenReturn(CONTENT_DISPOSITION);
when(blob.getCreateTime()).thenReturn(CREATE_TIME);
when(blob.getUpdateTime()).thenReturn(UPDATE_TIME);
+ final BlobId blobId = mock(BlobId.class);
+ when(blob.getBlobId()).thenReturn(blobId);
when(storage.get(any(BlobId.class))).thenReturn(blob);
when(storage.reader(any(BlobId.class), any(Storage.BlobSourceOption.class))).thenReturn(new MockReadChannel(CONTENT));
@@ -345,6 +353,8 @@ public class FetchGCSObjectTest extends AbstractGCSTest {
final Acl.User mockUser = mock(Acl.User.class);
when(mockUser.getEmail()).thenReturn(OWNER_USER_EMAIL);
when(blob.getOwner()).thenReturn(mockUser);
+ final BlobId blobId = mock(BlobId.class);
+ when(blob.getBlobId()).thenReturn(blobId);
when(storage.get(any(BlobId.class))).thenReturn(blob);
when(storage.reader(any(BlobId.class), any(Storage.BlobSourceOption.class))).thenReturn(new MockReadChannel(CONTENT));
@@ -383,6 +393,8 @@ public class FetchGCSObjectTest extends AbstractGCSTest {
final Acl.Group mockGroup = mock(Acl.Group.class);
when(mockGroup.getEmail()).thenReturn(OWNER_GROUP_EMAIL);
when(blob.getOwner()).thenReturn(mockGroup);
+ final BlobId blobId = mock(BlobId.class);
+ when(blob.getBlobId()).thenReturn(blobId);
when(storage.get(any(BlobId.class))).thenReturn(blob);
when(storage.reader(any(BlobId.class), any(Storage.BlobSourceOption.class))).thenReturn(new MockReadChannel(CONTENT));
@@ -423,6 +435,8 @@ public class FetchGCSObjectTest extends AbstractGCSTest {
final Acl.Domain mockDomain = mock(Acl.Domain.class);
when(mockDomain.getDomain()).thenReturn(OWNER_DOMAIN);
when(blob.getOwner()).thenReturn(mockDomain);
+ final BlobId blobId = mock(BlobId.class);
+ when(blob.getBlobId()).thenReturn(blobId);
when(storage.get(any(BlobId.class))).thenReturn(blob);
when(storage.reader(any(BlobId.class), any(Storage.BlobSourceOption.class))).thenReturn(new MockReadChannel(CONTENT));
@@ -460,8 +474,10 @@ public class FetchGCSObjectTest extends AbstractGCSTest {
final Blob blob = mock(Blob.class);
final Acl.Project mockProject = mock(Acl.Project.class);
+ final BlobId blobId = mock(BlobId.class);
when(mockProject.getProjectId()).thenReturn(OWNER_PROJECT_ID);
when(blob.getOwner()).thenReturn(mockProject);
+ when(blob.getBlobId()).thenReturn(blobId);
when(storage.get(any(BlobId.class))).thenReturn(blob);
when(storage.reader(any(BlobId.class), any(Storage.BlobSourceOption.class))).thenReturn(new MockReadChannel(CONTENT));
@@ -501,6 +517,8 @@ public class FetchGCSObjectTest extends AbstractGCSTest {
runner.assertValid();
final Blob blob = mock(Blob.class);
+ final BlobId blobId = mock(BlobId.class);
+ when(blob.getBlobId()).thenReturn(blobId);
when(storage.get(any(BlobId.class))).thenReturn(blob);
when(storage.reader(any(BlobId.class), any(Storage.BlobSourceOption.class))).thenReturn(new MockReadChannel(CONTENT));
@@ -516,21 +534,21 @@ public class FetchGCSObjectTest extends AbstractGCSTest {
verify(storage).get(blobIdArgumentCaptor.capture());
verify(storage).reader(any(BlobId.class), blobSourceOptionArgumentCaptor.capture());
- final BlobId blobId = blobIdArgumentCaptor.getValue();
+ final BlobId capturedBlobId = blobIdArgumentCaptor.getValue();
assertEquals(
BUCKET,
- blobId.getBucket()
+ capturedBlobId.getBucket()
);
assertEquals(
KEY,
- blobId.getName()
+ capturedBlobId.getName()
);
assertEquals(
GENERATION,
- blobId.getGeneration()
+ capturedBlobId.getGeneration()
);
@@ -554,6 +572,8 @@ public class FetchGCSObjectTest extends AbstractGCSTest {
runner.assertValid();
final Blob blob = mock(Blob.class);
+ final BlobId blobId = mock(BlobId.class);
+ when(blob.getBlobId()).thenReturn(blobId);
when(storage.get(any(BlobId.class))).thenReturn(blob);
when(storage.reader(any(BlobId.class), any(Storage.BlobSourceOption.class))).thenReturn(new MockReadChannel(CONTENT));
@@ -566,19 +586,19 @@ public class FetchGCSObjectTest extends AbstractGCSTest {
verify(storage).get(blobIdArgumentCaptor.capture());
verify(storage).reader(any(BlobId.class), blobSourceOptionArgumentCaptor.capture());
- final BlobId blobId = blobIdArgumentCaptor.getValue();
+ final BlobId capturedBlobId = blobIdArgumentCaptor.getValue();
assertEquals(
BUCKET,
- blobId.getBucket()
+ capturedBlobId.getBucket()
);
assertEquals(
KEY,
- blobId.getName()
+ capturedBlobId.getName()
);
- assertNull(blobId.getGeneration());
+ assertNull(capturedBlobId.getGeneration());
final Set<Storage.BlobSourceOption> blobSourceOptions = ImmutableSet.copyOf(blobSourceOptionArgumentCaptor.getAllValues());
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/storage/ListGCSBucketTest.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/storage/ListGCSBucketTest.java
index 7f22500..4139fee 100644
--- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/storage/ListGCSBucketTest.java
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/storage/ListGCSBucketTest.java
@@ -24,9 +24,11 @@ import com.google.cloud.storage.Storage;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
+import org.apache.nifi.components.ConfigVerificationResult;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.util.LogMessage;
import org.apache.nifi.util.MockFlowFile;
@@ -36,6 +38,7 @@ import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.Mock;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -115,6 +118,11 @@ public class ListGCSBucketTest extends AbstractGCSTest {
protected Storage getCloudService() {
return storage;
}
+
+ @Override
+ protected Storage getCloudService(final ProcessContext context) {
+ return storage;
+ }
};
}
@@ -241,6 +249,22 @@ public class ListGCSBucketTest extends AbstractGCSTest {
return blob;
}
+ private void verifyConfigVerification(final TestRunner runner, final ListGCSBucket processor, final int expectedCount) {
+ final List<ConfigVerificationResult> verificationResults = processor.verify(runner.getProcessContext(), runner.getLogger(), Collections.emptyMap());
+ assertEquals(3, verificationResults.size());
+ final ConfigVerificationResult cloudServiceResult = verificationResults.get(0);
+ assertEquals(ConfigVerificationResult.Outcome.SUCCESSFUL, cloudServiceResult.getOutcome());
+
+ final ConfigVerificationResult iamPermissionsResult = verificationResults.get(1);
+ assertEquals(ConfigVerificationResult.Outcome.SUCCESSFUL, iamPermissionsResult.getOutcome());
+
+ final ConfigVerificationResult listingResult = verificationResults.get(2);
+ assertEquals(ConfigVerificationResult.Outcome.SUCCESSFUL, listingResult.getOutcome());
+
+ assertTrue(String.format("Expected %s blobs to be counted, but explanation was: %s", expectedCount, listingResult.getExplanation()),
+ listingResult.getExplanation().matches(String.format(".*finding %s blobs.*", expectedCount)));
+ }
+
@Test
public void testSuccessfulList() throws Exception {
reset(storage, mockBlobPage);
@@ -261,8 +285,11 @@ public class ListGCSBucketTest extends AbstractGCSTest {
runner.enqueue("test");
runner.run();
+ when(storage.testIamPermissions(anyString(), any())).thenReturn(Collections.singletonList(true));
+
runner.assertAllFlowFilesTransferred(ListGCSBucket.REL_SUCCESS);
runner.assertTransferCount(ListGCSBucket.REL_SUCCESS, 2);
+ verifyConfigVerification(runner, processor, 2);
final List<MockFlowFile> successes = runner.getFlowFilesForRelationship(ListGCSBucket.REL_SUCCESS);
@@ -301,8 +328,11 @@ public class ListGCSBucketTest extends AbstractGCSTest {
runner.enqueue("test2");
runner.run(2);
+ when(storage.testIamPermissions(anyString(), any())).thenReturn(Collections.singletonList(true));
+
runner.assertAllFlowFilesTransferred(ListGCSBucket.REL_SUCCESS);
runner.assertTransferCount(ListGCSBucket.REL_SUCCESS, 1);
+ verifyConfigVerification(runner, processor, 1);
assertEquals("blob-key-1", runner.getStateManager().getState(Scope.CLUSTER).get(ListGCSBucket.CURRENT_KEY_PREFIX+"0"));
assertEquals("2", runner.getStateManager().getState(Scope.CLUSTER).get(ListGCSBucket.CURRENT_TIMESTAMP));
@@ -327,7 +357,10 @@ public class ListGCSBucketTest extends AbstractGCSTest {
runner.enqueue("test");
runner.run();
+ when(storage.testIamPermissions(anyString(), any())).thenReturn(Collections.singletonList(true));
+
runner.assertTransferCount(ListGCSBucket.REL_SUCCESS, 0);
+ verifyConfigVerification(runner, processor, 0);
assertEquals("No state should be persisted on an empty return", -1L, runner.getStateManager().getState(Scope.CLUSTER).getVersion());
}
@@ -353,12 +386,17 @@ public class ListGCSBucketTest extends AbstractGCSTest {
when(mockBlobPage.getNextPage()).thenReturn(null);
when(storage.list(anyString(), any(Storage.BlobListOption[].class))).thenReturn(mockBlobPage);
+ when(storage.testIamPermissions(anyString(), any())).thenReturn(Collections.singletonList(true));
+
runner.enqueue("test");
runner.run();
runner.assertAllFlowFilesTransferred(ListGCSBucket.REL_SUCCESS);
runner.assertTransferCount(ListGCSBucket.REL_SUCCESS, 1);
+ // Both blobs are counted, because verification does not account for entity tracking
+ verifyConfigVerification(runner, processor, 2);
+
final List<MockFlowFile> successes = runner.getFlowFilesForRelationship(ListGCSBucket.REL_SUCCESS);
MockFlowFile flowFile = successes.get(0);
@@ -400,9 +438,14 @@ public class ListGCSBucketTest extends AbstractGCSTest {
runner.enqueue("test");
runner.run();
+ when(storage.testIamPermissions(anyString(), any())).thenReturn(Collections.singletonList(true));
+
runner.assertAllFlowFilesTransferred(ListGCSBucket.REL_SUCCESS);
runner.assertTransferCount(ListGCSBucket.REL_SUCCESS, 1);
+ // Both blobs are counted, because verification does not account for entity tracking
+ verifyConfigVerification(runner, processor, 2);
+
final List<MockFlowFile> successes = runner.getFlowFilesForRelationship(ListGCSBucket.REL_SUCCESS);
MockFlowFile flowFile = successes.get(0);
@@ -443,12 +486,17 @@ public class ListGCSBucketTest extends AbstractGCSTest {
when(storage.list(anyString(), any(Storage.BlobListOption[].class)))
.thenReturn(mockBlobPage);
+ when(storage.testIamPermissions(anyString(), any())).thenReturn(Collections.singletonList(true));
+
runner.enqueue("test");
runner.run();
runner.assertAllFlowFilesTransferred(ListGCSBucket.REL_SUCCESS);
runner.assertTransferCount(ListGCSBucket.REL_SUCCESS, 2);
+ // All blobs are counted, because verification does not account for entity tracking
+ verifyConfigVerification(runner, processor, 3);
+
final List<MockFlowFile> successes = runner.getFlowFilesForRelationship(ListGCSBucket.REL_SUCCESS);
MockFlowFile flowFile = successes.get(0);
@@ -493,12 +541,17 @@ public class ListGCSBucketTest extends AbstractGCSTest {
when(storage.list(anyString(), any(Storage.BlobListOption[].class)))
.thenReturn(mockBlobPage);
+ when(storage.testIamPermissions(anyString(), any())).thenReturn(Collections.singletonList(true));
+
runner.enqueue("test");
runner.run();
runner.assertAllFlowFilesTransferred(ListGCSBucket.REL_SUCCESS);
runner.assertTransferCount(ListGCSBucket.REL_SUCCESS, 2);
+ // All blobs are counted, because verification does not account for entity tracking
+ verifyConfigVerification(runner, processor, 3);
+
final List<MockFlowFile> successes = runner.getFlowFilesForRelationship(ListGCSBucket.REL_SUCCESS);
MockFlowFile flowFile = successes.get(0);
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-services-api/pom.xml b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-services-api/pom.xml
index 1f99faf..28cb17d 100644
--- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-services-api/pom.xml
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-services-api/pom.xml
@@ -39,9 +39,18 @@
<groupId>com.google.code.findbugs</groupId>
<artifactId>jsr305</artifactId>
</exclusion>
+ <exclusion>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </exclusion>
</exclusions>
</dependency>
<dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ <version>31.0.1-jre</version>
+ </dependency>
+ <dependency>
<groupId>com.github.stephenc.findbugs</groupId>
<artifactId>findbugs-annotations</artifactId>
<version>1.3.9-1</version>