You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by mc...@apache.org on 2021/10/20 15:07:26 UTC
[nifi] branch main updated: NIFI-9310: Addressed issues found while
testing the component verification features. Added a supportsVerification
flag to the ConfigAnalysis DTO (#5469)
This is an automated email from the ASF dual-hosted git repository.
mcgilman pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 90ae271 NIFI-9310: Addressed issues found while testing the component verification features. Added a supportsVerification flag to the ConfigAnalysis DTO (#5469)
90ae271 is described below
commit 90ae271692af69b2be99f32388a3088e19ef1cdd
Author: markap14 <ma...@hotmail.com>
AuthorDate: Wed Oct 20 11:07:17 2021 -0400
NIFI-9310: Addressed issues found while testing the component verification features. Added a supportsVerification flag to the ConfigAnalysis DTO (#5469)
This closes #5469
---
.../nifi/web/api/dto/ConfigurationAnalysisDTO.java | 10 ++++++
.../org/apache/nifi/web/NiFiServiceFacade.java | 6 ++--
.../apache/nifi/web/StandardNiFiServiceFacade.java | 22 +++++++++++--
.../nifi/web/api/ControllerServiceResource.java | 2 +-
.../org/apache/nifi/web/api/ProcessorResource.java | 2 +-
.../apache/nifi/web/api/ReportingTaskResource.java | 2 +-
.../web/api/concurrent/AsyncRequestManager.java | 4 +--
.../concurrent/StandardAsynchronousWebRequest.java | 5 ++-
.../nifi/processors/kafka/pubsub/ConsumerPool.java | 38 +++++++++++++++-------
9 files changed, 68 insertions(+), 23 deletions(-)
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ConfigurationAnalysisDTO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ConfigurationAnalysisDTO.java
index 29481f0..e8206b4 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ConfigurationAnalysisDTO.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ConfigurationAnalysisDTO.java
@@ -27,6 +27,7 @@ public class ConfigurationAnalysisDTO {
private String componentId;
private Map<String, String> properties;
private Map<String, String> referencedAttributes;
+ private boolean supportsVerification;
@ApiModelProperty("The ID of the component")
public String getComponentId() {
@@ -54,4 +55,13 @@ public class ConfigurationAnalysisDTO {
public void setReferencedAttributes(final Map<String, String> referencedAttributes) {
this.referencedAttributes = referencedAttributes;
}
+
+ @ApiModelProperty("Whether or not the component supports verification")
+ public boolean isSupportsVerification() {
+ return supportsVerification;
+ }
+
+ public void setSupportsVerification(final boolean supportsVerification) {
+ this.supportsVerification = supportsVerification;
+ }
}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
index 6e5f425..9ad65a3 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
@@ -642,7 +642,7 @@ public interface NiFiServiceFacade {
* @param attributes a map of values that can be used for resolving FlowFile attributes for Expression Language
* @return verification results
*/
- List<ConfigVerificationResultDTO> verifyProcessorConfiguration(String processorId, Map<String, String> properties, Map<String, String> attributes);
+ List<ConfigVerificationResultDTO> performProcessorConfigVerification(String processorId, Map<String, String> properties, Map<String, String> attributes);
/**
* Performs analysis of the given properties, determining which attributes are referenced by properties
@@ -2058,7 +2058,7 @@ public interface NiFiServiceFacade {
* @param variables a map of values that can be used for resolving FlowFile attributes for Expression Language
* @return verification results
*/
- List<ConfigVerificationResultDTO> verifyControllerServiceConfiguration(String controllerServiceId, Map<String, String> properties, Map<String, String> variables);
+ List<ConfigVerificationResultDTO> performControllerServiceConfigVerification(String controllerServiceId, Map<String, String> properties, Map<String, String> variables);
/**
* Performs analysis of the given properties, determining which attributes are referenced by properties
@@ -2159,7 +2159,7 @@ public interface NiFiServiceFacade {
* @param properties the configured properties to verify
* @return verification results
*/
- List<ConfigVerificationResultDTO> verifyReportingTaskConfiguration(String reportingTaskId, Map<String, String> properties);
+ List<ConfigVerificationResultDTO> performReportingTaskConfigVerification(String reportingTaskId, Map<String, String> properties);
/**
* Performs analysis of the given properties, determining which attributes are referenced by properties
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
index 721d6a0..0d0ebe6 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
@@ -81,6 +81,7 @@ import org.apache.nifi.controller.ReportingTaskNode;
import org.apache.nifi.controller.ScheduledState;
import org.apache.nifi.controller.Snippet;
import org.apache.nifi.controller.Template;
+import org.apache.nifi.controller.VerifiableControllerService;
import org.apache.nifi.controller.flow.FlowManager;
import org.apache.nifi.controller.label.Label;
import org.apache.nifi.controller.leader.election.LeaderElectionManager;
@@ -118,6 +119,7 @@ import org.apache.nifi.parameter.ParameterDescriptor;
import org.apache.nifi.parameter.ParameterLookup;
import org.apache.nifi.parameter.ParameterReferenceManager;
import org.apache.nifi.parameter.StandardParameterContext;
+import org.apache.nifi.processor.VerifiableProcessor;
import org.apache.nifi.prometheus.util.BulletinMetricsRegistry;
import org.apache.nifi.prometheus.util.ConnectionAnalyticsMetricsRegistry;
import org.apache.nifi.prometheus.util.JvmMetricsRegistry;
@@ -158,6 +160,7 @@ import org.apache.nifi.reporting.Bulletin;
import org.apache.nifi.reporting.BulletinQuery;
import org.apache.nifi.reporting.BulletinRepository;
import org.apache.nifi.reporting.ComponentType;
+import org.apache.nifi.reporting.VerifiableReportingTask;
import org.apache.nifi.util.BundleUtils;
import org.apache.nifi.util.FlowDifferenceFilters;
import org.apache.nifi.util.NiFiProperties;
@@ -772,7 +775,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
}
@Override
- public List<ConfigVerificationResultDTO> verifyProcessorConfiguration(final String processorId, final Map<String, String> properties, final Map<String, String> attributes) {
+ public List<ConfigVerificationResultDTO> performProcessorConfigVerification(final String processorId, final Map<String, String> properties, final Map<String, String> attributes) {
return processorDAO.verifyProcessorConfiguration(processorId, properties, attributes);
}
@@ -793,12 +796,25 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
dto.setComponentId(componentNode.getIdentifier());
dto.setProperties(properties);
dto.setReferencedAttributes(referencedAttributes);
+ dto.setSupportsVerification(isVerificationSupported(componentNode));
final ConfigurationAnalysisEntity entity = new ConfigurationAnalysisEntity();
entity.setConfigurationAnalysis(dto);
return entity;
}
+ private boolean isVerificationSupported(final ComponentNode componentNode) {
+ if (componentNode instanceof ProcessorNode) {
+ return ((ProcessorNode) componentNode).getProcessor() instanceof VerifiableProcessor;
+ } else if (componentNode instanceof ControllerServiceNode) {
+ return ((ControllerServiceNode) componentNode).getControllerServiceImplementation() instanceof VerifiableControllerService;
+ } else if (componentNode instanceof ReportingTaskNode) {
+ return ((ReportingTaskNode) componentNode).getReportingTask() instanceof VerifiableReportingTask;
+ } else {
+ return false;
+ }
+ }
+
private Map<String, String> determineReferencedAttributes(final Map<String, String> properties, final ComponentNode componentNode, final ParameterContext parameterContext) {
final Map<String, String> mergedProperties = new LinkedHashMap<>();
componentNode.getRawPropertyValues().forEach((desc, value) -> mergedProperties.put(desc.getName(), value));
@@ -2834,7 +2850,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
}
@Override
- public List<ConfigVerificationResultDTO> verifyControllerServiceConfiguration(final String controllerServiceId, final Map<String, String> properties, final Map<String, String> variables) {
+ public List<ConfigVerificationResultDTO> performControllerServiceConfigVerification(final String controllerServiceId, final Map<String, String> properties, final Map<String, String> variables) {
return controllerServiceDAO.verifyConfiguration(controllerServiceId, properties, variables);
}
@@ -3227,7 +3243,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
}
@Override
- public List<ConfigVerificationResultDTO> verifyReportingTaskConfiguration(final String reportingTaskId, final Map<String, String> properties) {
+ public List<ConfigVerificationResultDTO> performReportingTaskConfigVerification(final String reportingTaskId, final Map<String, String> properties) {
return reportingTaskDAO.verifyConfiguration(reportingTaskId, properties);
}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ControllerServiceResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ControllerServiceResource.java
index 8e89ada..273c417 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ControllerServiceResource.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ControllerServiceResource.java
@@ -1086,7 +1086,7 @@ public class ControllerServiceResource extends ApplicationResource {
final Consumer<AsynchronousWebRequest<VerifyConfigRequestEntity, List<ConfigVerificationResultDTO>>> updateTask = asyncRequest -> {
try {
final Map<String, String> attributes = requestDto.getAttributes() == null ? Collections.emptyMap() : requestDto.getAttributes();
- final List<ConfigVerificationResultDTO> results = serviceFacade.verifyControllerServiceConfiguration(serviceId, requestDto.getProperties(), attributes);
+ final List<ConfigVerificationResultDTO> results = serviceFacade.performControllerServiceConfigVerification(serviceId, requestDto.getProperties(), attributes);
asyncRequest.markStepComplete(results);
} catch (final Exception e) {
logger.error("Failed to verify Controller Service configuration", e);
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessorResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessorResource.java
index 94906b1..75413de 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessorResource.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessorResource.java
@@ -1066,7 +1066,7 @@ public class ProcessorResource extends ApplicationResource {
final Consumer<AsynchronousWebRequest<VerifyConfigRequestEntity, List<ConfigVerificationResultDTO>>> updateTask = asyncRequest -> {
try {
final Map<String, String> attributes = requestDto.getAttributes() == null ? Collections.emptyMap() : requestDto.getAttributes();
- final List<ConfigVerificationResultDTO> results = serviceFacade.verifyProcessorConfiguration(processorId, requestDto.getProperties(), attributes);
+ final List<ConfigVerificationResultDTO> results = serviceFacade.performProcessorConfigVerification(processorId, requestDto.getProperties(), attributes);
asyncRequest.markStepComplete(results);
} catch (final Exception e) {
logger.error("Failed to verify Processor configuration", e);
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ReportingTaskResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ReportingTaskResource.java
index f9459ee..6f27ec5 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ReportingTaskResource.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ReportingTaskResource.java
@@ -891,7 +891,7 @@ public class ReportingTaskResource extends ApplicationResource {
// Submit the request to be performed in the background
final Consumer<AsynchronousWebRequest<VerifyConfigRequestEntity, List<ConfigVerificationResultDTO>>> updateTask = asyncRequest -> {
try {
- final List<ConfigVerificationResultDTO> results = serviceFacade.verifyReportingTaskConfiguration(taskId, requestDto.getProperties());
+ final List<ConfigVerificationResultDTO> results = serviceFacade.performReportingTaskConfigVerification(taskId, requestDto.getProperties());
asyncRequest.markStepComplete(results);
} catch (final Exception e) {
logger.error("Failed to verify Reporting Task configuration", e);
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/concurrent/AsyncRequestManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/concurrent/AsyncRequestManager.java
index 5a6eec5..1e7fa03 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/concurrent/AsyncRequestManager.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/concurrent/AsyncRequestManager.java
@@ -56,8 +56,8 @@ public class AsyncRequestManager<R, T> implements RequestManager<R, T> {
this.requestExpirationMillis = requestExpirationMillis;
this.maxConcurrentRequests = maxConcurrentRequests;
- this.threadPool = new ThreadPoolExecutor(1, 50, 5L, TimeUnit.SECONDS,
- new ArrayBlockingQueue<>(maxConcurrentRequests),
+ this.threadPool = new ThreadPoolExecutor(1, maxConcurrentRequests, 5L, TimeUnit.SECONDS,
+ new ArrayBlockingQueue<>(1),
new ThreadFactory() {
private final AtomicLong counter = new AtomicLong(0L);
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/concurrent/StandardAsynchronousWebRequest.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/concurrent/StandardAsynchronousWebRequest.java
index b8aba15..1320b6b 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/concurrent/StandardAsynchronousWebRequest.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/concurrent/StandardAsynchronousWebRequest.java
@@ -158,7 +158,10 @@ public class StandardAsynchronousWebRequest<R, T> implements AsynchronousWebRequ
this.cancelled = true;
percentComplete = 100;
fail("Request cancelled by user");
- cancelCallback.run();
+
+ if (cancelCallback != null) {
+ cancelCallback.run();
+ }
}
@Override
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java
index e10a75b..6c4cc49 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java
@@ -308,6 +308,7 @@ public class ConsumerPool implements Closeable {
if (topicPattern == null) {
final Map<String, Long> messagesToConsumePerTopic = new HashMap<>();
+ long toConsume = 0L;
for (final String topicName : topics) {
final List<PartitionInfo> partitionInfos = consumer.partitionsFor(topicName);
@@ -320,13 +321,7 @@ public class ConsumerPool implements Closeable {
final Map<TopicPartition, OffsetAndMetadata> committedOffsets = consumer.committed(topicPartitions, Duration.ofSeconds(30));
for (final TopicPartition topicPartition : endOffsets.keySet()) {
- long endOffset = endOffsets.get(topicPartition);
- // When no messages have been added to a topic, end offset is 0. However, after the first message is added,
- // the end offset points to where the next message will be. I.e., it goes from 0 to 2. We want the offset
- // of the last message, not the offset of where the next one will be. So we subtract one.
- if (endOffset > 0) {
- endOffset--;
- }
+ final long endOffset = endOffsets.get(topicPartition);
final long beginningOffset = beginningOffsets.getOrDefault(topicPartition, 0L);
if (endOffset <= beginningOffset) {
@@ -339,6 +334,7 @@ public class ConsumerPool implements Closeable {
final long currentOffset = Math.max(beginningOffset, committedOffset);
final long messagesToConsume = endOffset - currentOffset;
+ toConsume += messagesToConsume;
messagesToConsumePerTopic.merge(topicPartition.topic(), messagesToConsume, Long::sum);
}
@@ -351,12 +347,31 @@ public class ConsumerPool implements Closeable {
.build());
logger.info("Successfully determined offsets for {} topics. Number of messages left to consume per topic: {}", messagesToConsumePerTopic.size(), messagesToConsumePerTopic);
+
+ if (readerFactory != null) {
+ if (toConsume > 0) {
+ final ConfigVerificationResult checkDataResult = checkRecordIsParsable(lease);
+ verificationResults.add(checkDataResult);
+ } else {
+ verificationResults.add(new ConfigVerificationResult.Builder()
+ .verificationStepName("Parse Records")
+ .outcome(Outcome.SKIPPED)
+ .explanation("There are no available Records to attempt parsing")
+ .build());
+ }
+ }
+
} else {
verificationResults.add(new ConfigVerificationResult.Builder()
.verificationStepName("Determine Topic Offsets")
.outcome(Outcome.SKIPPED)
.explanation("Cannot determine Topic Offsets because a Topic Wildcard was used instead of an explicit Topic Name")
.build());
+
+ if (readerFactory != null) {
+ final ConfigVerificationResult checkDataResult = checkRecordIsParsable(lease);
+ verificationResults.add(checkDataResult);
+ }
}
} catch (final Exception e) {
logger.error("Failed to determine Topic Offsets in order to verify configuration", e);
@@ -366,11 +381,12 @@ public class ConsumerPool implements Closeable {
.outcome(Outcome.FAILED)
.explanation("Could not fetch Topic Offsets: " + e)
.build());
- }
- if (readerFactory != null) {
- final ConfigVerificationResult checkDataResult = checkRecordIsParsable(lease);
- verificationResults.add(checkDataResult);
+ verificationResults.add(new ConfigVerificationResult.Builder()
+ .verificationStepName("Parse Records")
+ .outcome(Outcome.SKIPPED)
+ .explanation("Could not determine offsets so will not attempt to fetch records")
+ .build());
}
return verificationResults;