You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by mc...@apache.org on 2021/10/20 15:07:26 UTC

[nifi] branch main updated: NIFI-9310: Addressed issues found while testing the component verification features. Added a supportsVerification flag to the ConfigAnalysis DTO (#5469)

This is an automated email from the ASF dual-hosted git repository.

mcgilman pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 90ae271  NIFI-9310: Addressed issues found while testing the component verification features. Added a supportsVerification flag to the ConfigAnalysis DTO (#5469)
90ae271 is described below

commit 90ae271692af69b2be99f32388a3088e19ef1cdd
Author: markap14 <ma...@hotmail.com>
AuthorDate: Wed Oct 20 11:07:17 2021 -0400

    NIFI-9310: Addressed issues found while testing the component verification features. Added a supportsVerification flag to the ConfigAnalysis DTO (#5469)
    
    This closes #5469
---
 .../nifi/web/api/dto/ConfigurationAnalysisDTO.java | 10 ++++++
 .../org/apache/nifi/web/NiFiServiceFacade.java     |  6 ++--
 .../apache/nifi/web/StandardNiFiServiceFacade.java | 22 +++++++++++--
 .../nifi/web/api/ControllerServiceResource.java    |  2 +-
 .../org/apache/nifi/web/api/ProcessorResource.java |  2 +-
 .../apache/nifi/web/api/ReportingTaskResource.java |  2 +-
 .../web/api/concurrent/AsyncRequestManager.java    |  4 +--
 .../concurrent/StandardAsynchronousWebRequest.java |  5 ++-
 .../nifi/processors/kafka/pubsub/ConsumerPool.java | 38 +++++++++++++++-------
 9 files changed, 68 insertions(+), 23 deletions(-)

diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ConfigurationAnalysisDTO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ConfigurationAnalysisDTO.java
index 29481f0..e8206b4 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ConfigurationAnalysisDTO.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ConfigurationAnalysisDTO.java
@@ -27,6 +27,7 @@ public class ConfigurationAnalysisDTO {
     private String componentId;
     private Map<String, String> properties;
     private Map<String, String> referencedAttributes;
+    private boolean supportsVerification;
 
     @ApiModelProperty("The ID of the component")
     public String getComponentId() {
@@ -54,4 +55,13 @@ public class ConfigurationAnalysisDTO {
     public void setReferencedAttributes(final Map<String, String> referencedAttributes) {
         this.referencedAttributes = referencedAttributes;
     }
+
+    @ApiModelProperty("Whether or not the component supports verification")
+    public boolean isSupportsVerification() {
+        return supportsVerification;
+    }
+
+    public void setSupportsVerification(final boolean supportsVerification) {
+        this.supportsVerification = supportsVerification;
+    }
 }
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
index 6e5f425..9ad65a3 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
@@ -642,7 +642,7 @@ public interface NiFiServiceFacade {
      * @param attributes a map of values that can be used for resolving FlowFile attributes for Expression Language
      * @return verification results
      */
-    List<ConfigVerificationResultDTO> verifyProcessorConfiguration(String processorId, Map<String, String> properties, Map<String, String> attributes);
+    List<ConfigVerificationResultDTO> performProcessorConfigVerification(String processorId, Map<String, String> properties, Map<String, String> attributes);
 
     /**
      * Performs analysis of the given properties, determining which attributes are referenced by properties
@@ -2058,7 +2058,7 @@ public interface NiFiServiceFacade {
      * @param variables a map of values that can be used for resolving FlowFile attributes for Expression Language
      * @return verification results
      */
-    List<ConfigVerificationResultDTO> verifyControllerServiceConfiguration(String controllerServiceId, Map<String, String> properties, Map<String, String> variables);
+    List<ConfigVerificationResultDTO> performControllerServiceConfigVerification(String controllerServiceId, Map<String, String> properties, Map<String, String> variables);
 
     /**
      * Performs analysis of the given properties, determining which attributes are referenced by properties
@@ -2159,7 +2159,7 @@ public interface NiFiServiceFacade {
      * @param properties the configured properties to verify
      * @return verification results
      */
-    List<ConfigVerificationResultDTO> verifyReportingTaskConfiguration(String reportingTaskId, Map<String, String> properties);
+    List<ConfigVerificationResultDTO> performReportingTaskConfigVerification(String reportingTaskId, Map<String, String> properties);
 
     /**
      * Performs analysis of the given properties, determining which attributes are referenced by properties
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
index 721d6a0..0d0ebe6 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
@@ -81,6 +81,7 @@ import org.apache.nifi.controller.ReportingTaskNode;
 import org.apache.nifi.controller.ScheduledState;
 import org.apache.nifi.controller.Snippet;
 import org.apache.nifi.controller.Template;
+import org.apache.nifi.controller.VerifiableControllerService;
 import org.apache.nifi.controller.flow.FlowManager;
 import org.apache.nifi.controller.label.Label;
 import org.apache.nifi.controller.leader.election.LeaderElectionManager;
@@ -118,6 +119,7 @@ import org.apache.nifi.parameter.ParameterDescriptor;
 import org.apache.nifi.parameter.ParameterLookup;
 import org.apache.nifi.parameter.ParameterReferenceManager;
 import org.apache.nifi.parameter.StandardParameterContext;
+import org.apache.nifi.processor.VerifiableProcessor;
 import org.apache.nifi.prometheus.util.BulletinMetricsRegistry;
 import org.apache.nifi.prometheus.util.ConnectionAnalyticsMetricsRegistry;
 import org.apache.nifi.prometheus.util.JvmMetricsRegistry;
@@ -158,6 +160,7 @@ import org.apache.nifi.reporting.Bulletin;
 import org.apache.nifi.reporting.BulletinQuery;
 import org.apache.nifi.reporting.BulletinRepository;
 import org.apache.nifi.reporting.ComponentType;
+import org.apache.nifi.reporting.VerifiableReportingTask;
 import org.apache.nifi.util.BundleUtils;
 import org.apache.nifi.util.FlowDifferenceFilters;
 import org.apache.nifi.util.NiFiProperties;
@@ -772,7 +775,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
     }
 
     @Override
-    public List<ConfigVerificationResultDTO> verifyProcessorConfiguration(final String processorId, final Map<String, String> properties, final Map<String, String> attributes) {
+    public List<ConfigVerificationResultDTO> performProcessorConfigVerification(final String processorId, final Map<String, String> properties, final Map<String, String> attributes) {
         return processorDAO.verifyProcessorConfiguration(processorId, properties, attributes);
     }
 
@@ -793,12 +796,25 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
         dto.setComponentId(componentNode.getIdentifier());
         dto.setProperties(properties);
         dto.setReferencedAttributes(referencedAttributes);
+        dto.setSupportsVerification(isVerificationSupported(componentNode));
 
         final ConfigurationAnalysisEntity entity = new ConfigurationAnalysisEntity();
         entity.setConfigurationAnalysis(dto);
         return entity;
     }
 
+    private boolean isVerificationSupported(final ComponentNode componentNode) {
+        if (componentNode instanceof ProcessorNode) {
+            return ((ProcessorNode) componentNode).getProcessor() instanceof VerifiableProcessor;
+        } else if (componentNode instanceof ControllerServiceNode) {
+            return ((ControllerServiceNode) componentNode).getControllerServiceImplementation() instanceof VerifiableControllerService;
+        } else if (componentNode instanceof ReportingTaskNode) {
+            return ((ReportingTaskNode) componentNode).getReportingTask() instanceof VerifiableReportingTask;
+        } else {
+            return false;
+        }
+    }
+
     private Map<String, String> determineReferencedAttributes(final Map<String, String> properties, final ComponentNode componentNode, final ParameterContext parameterContext) {
         final Map<String, String> mergedProperties = new LinkedHashMap<>();
         componentNode.getRawPropertyValues().forEach((desc, value) -> mergedProperties.put(desc.getName(), value));
@@ -2834,7 +2850,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
     }
 
     @Override
-    public List<ConfigVerificationResultDTO> verifyControllerServiceConfiguration(final String controllerServiceId, final Map<String, String> properties, final Map<String, String> variables) {
+    public List<ConfigVerificationResultDTO> performControllerServiceConfigVerification(final String controllerServiceId, final Map<String, String> properties, final Map<String, String> variables) {
         return controllerServiceDAO.verifyConfiguration(controllerServiceId, properties, variables);
     }
 
@@ -3227,7 +3243,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
     }
 
     @Override
-    public List<ConfigVerificationResultDTO> verifyReportingTaskConfiguration(final String reportingTaskId, final Map<String, String> properties) {
+    public List<ConfigVerificationResultDTO> performReportingTaskConfigVerification(final String reportingTaskId, final Map<String, String> properties) {
         return reportingTaskDAO.verifyConfiguration(reportingTaskId, properties);
     }
 
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ControllerServiceResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ControllerServiceResource.java
index 8e89ada..273c417 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ControllerServiceResource.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ControllerServiceResource.java
@@ -1086,7 +1086,7 @@ public class ControllerServiceResource extends ApplicationResource {
         final Consumer<AsynchronousWebRequest<VerifyConfigRequestEntity, List<ConfigVerificationResultDTO>>> updateTask = asyncRequest -> {
             try {
                 final Map<String, String> attributes = requestDto.getAttributes() == null ? Collections.emptyMap() : requestDto.getAttributes();
-                final List<ConfigVerificationResultDTO> results = serviceFacade.verifyControllerServiceConfiguration(serviceId, requestDto.getProperties(), attributes);
+                final List<ConfigVerificationResultDTO> results = serviceFacade.performControllerServiceConfigVerification(serviceId, requestDto.getProperties(), attributes);
                 asyncRequest.markStepComplete(results);
             } catch (final Exception e) {
                 logger.error("Failed to verify Controller Service configuration", e);
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessorResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessorResource.java
index 94906b1..75413de 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessorResource.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessorResource.java
@@ -1066,7 +1066,7 @@ public class ProcessorResource extends ApplicationResource {
         final Consumer<AsynchronousWebRequest<VerifyConfigRequestEntity, List<ConfigVerificationResultDTO>>> updateTask = asyncRequest -> {
             try {
                 final Map<String, String> attributes = requestDto.getAttributes() == null ? Collections.emptyMap() : requestDto.getAttributes();
-                final List<ConfigVerificationResultDTO> results = serviceFacade.verifyProcessorConfiguration(processorId, requestDto.getProperties(), attributes);
+                final List<ConfigVerificationResultDTO> results = serviceFacade.performProcessorConfigVerification(processorId, requestDto.getProperties(), attributes);
                 asyncRequest.markStepComplete(results);
             } catch (final Exception e) {
                 logger.error("Failed to verify Processor configuration", e);
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ReportingTaskResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ReportingTaskResource.java
index f9459ee..6f27ec5 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ReportingTaskResource.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ReportingTaskResource.java
@@ -891,7 +891,7 @@ public class ReportingTaskResource extends ApplicationResource {
         // Submit the request to be performed in the background
         final Consumer<AsynchronousWebRequest<VerifyConfigRequestEntity, List<ConfigVerificationResultDTO>>> updateTask = asyncRequest -> {
             try {
-                final List<ConfigVerificationResultDTO> results = serviceFacade.verifyReportingTaskConfiguration(taskId, requestDto.getProperties());
+                final List<ConfigVerificationResultDTO> results = serviceFacade.performReportingTaskConfigVerification(taskId, requestDto.getProperties());
                 asyncRequest.markStepComplete(results);
             } catch (final Exception e) {
                 logger.error("Failed to verify Reporting Task configuration", e);
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/concurrent/AsyncRequestManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/concurrent/AsyncRequestManager.java
index 5a6eec5..1e7fa03 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/concurrent/AsyncRequestManager.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/concurrent/AsyncRequestManager.java
@@ -56,8 +56,8 @@ public class AsyncRequestManager<R, T> implements RequestManager<R, T> {
         this.requestExpirationMillis = requestExpirationMillis;
         this.maxConcurrentRequests = maxConcurrentRequests;
 
-        this.threadPool = new ThreadPoolExecutor(1, 50, 5L, TimeUnit.SECONDS,
-            new ArrayBlockingQueue<>(maxConcurrentRequests),
+        this.threadPool = new ThreadPoolExecutor(1, maxConcurrentRequests, 5L, TimeUnit.SECONDS,
+            new ArrayBlockingQueue<>(1),
             new ThreadFactory() {
                 private final AtomicLong counter = new AtomicLong(0L);
 
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/concurrent/StandardAsynchronousWebRequest.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/concurrent/StandardAsynchronousWebRequest.java
index b8aba15..1320b6b 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/concurrent/StandardAsynchronousWebRequest.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/concurrent/StandardAsynchronousWebRequest.java
@@ -158,7 +158,10 @@ public class StandardAsynchronousWebRequest<R, T> implements AsynchronousWebRequ
         this.cancelled = true;
         percentComplete = 100;
         fail("Request cancelled by user");
-        cancelCallback.run();
+
+        if (cancelCallback != null) {
+            cancelCallback.run();
+        }
     }
 
     @Override
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java
index e10a75b..6c4cc49 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java
@@ -308,6 +308,7 @@ public class ConsumerPool implements Closeable {
                 if (topicPattern == null) {
                     final Map<String, Long> messagesToConsumePerTopic = new HashMap<>();
 
+                    long toConsume = 0L;
                     for (final String topicName : topics) {
                         final List<PartitionInfo> partitionInfos = consumer.partitionsFor(topicName);
 
@@ -320,13 +321,7 @@ public class ConsumerPool implements Closeable {
                         final Map<TopicPartition, OffsetAndMetadata> committedOffsets = consumer.committed(topicPartitions, Duration.ofSeconds(30));
 
                         for (final TopicPartition topicPartition : endOffsets.keySet()) {
-                            long endOffset = endOffsets.get(topicPartition);
-                            // When no messages have been added to a topic, end offset is 0. However, after the first message is added,
-                            // the end offset points to where the next message will be. I.e., it goes from 0 to 2. We want the offset
-                            // of the last message, not the offset of where the next one will be. So we subtract one.
-                            if (endOffset > 0) {
-                                endOffset--;
-                            }
+                            final long endOffset = endOffsets.get(topicPartition);
 
                             final long beginningOffset = beginningOffsets.getOrDefault(topicPartition, 0L);
                             if (endOffset <= beginningOffset) {
@@ -339,6 +334,7 @@ public class ConsumerPool implements Closeable {
 
                             final long currentOffset = Math.max(beginningOffset, committedOffset);
                             final long messagesToConsume = endOffset - currentOffset;
+                            toConsume += messagesToConsume;
 
                             messagesToConsumePerTopic.merge(topicPartition.topic(), messagesToConsume, Long::sum);
                         }
@@ -351,12 +347,31 @@ public class ConsumerPool implements Closeable {
                         .build());
 
                     logger.info("Successfully determined offsets for {} topics. Number of messages left to consume per topic: {}", messagesToConsumePerTopic.size(), messagesToConsumePerTopic);
+
+                    if (readerFactory != null) {
+                        if (toConsume > 0) {
+                            final ConfigVerificationResult checkDataResult = checkRecordIsParsable(lease);
+                            verificationResults.add(checkDataResult);
+                        } else {
+                            verificationResults.add(new ConfigVerificationResult.Builder()
+                                .verificationStepName("Parse Records")
+                                .outcome(Outcome.SKIPPED)
+                                .explanation("There are no available Records to attempt parsing")
+                                .build());
+                        }
+                    }
+
                 } else {
                     verificationResults.add(new ConfigVerificationResult.Builder()
                         .verificationStepName("Determine Topic Offsets")
                         .outcome(Outcome.SKIPPED)
                         .explanation("Cannot determine Topic Offsets because a Topic Wildcard was used instead of an explicit Topic Name")
                         .build());
+
+                    if (readerFactory != null) {
+                        final ConfigVerificationResult checkDataResult = checkRecordIsParsable(lease);
+                        verificationResults.add(checkDataResult);
+                    }
                 }
             } catch (final Exception e) {
                 logger.error("Failed to determine Topic Offsets in order to verify configuration", e);
@@ -366,11 +381,12 @@ public class ConsumerPool implements Closeable {
                     .outcome(Outcome.FAILED)
                     .explanation("Could not fetch Topic Offsets: " + e)
                     .build());
-            }
 
-            if (readerFactory != null) {
-                final ConfigVerificationResult checkDataResult = checkRecordIsParsable(lease);
-                verificationResults.add(checkDataResult);
+                verificationResults.add(new ConfigVerificationResult.Builder()
+                    .verificationStepName("Parse Records")
+                    .outcome(Outcome.SKIPPED)
+                    .explanation("Could not determine offsets so will not attempt to fetch records")
+                    .build());
             }
 
             return verificationResults;