You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by tu...@apache.org on 2019/11/05 13:21:36 UTC
[nifi] branch master updated: NIFI-6729 - Created
AbstractSingleAttributeBasedControllerServiceLookup and updated
DBCPConnectionPoolLookup and AzureStorageCredentialsControllerServiceLookup
to inherit from it.
This is an automated email from the ASF dual-hosted git repository.
turcsanyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push:
new 84a05c8 NIFI-6729 - Created AbstractSingleAttributeBasedControllerServiceLookup and updated DBCPConnectionPoolLookup and AzureStorageCredentialsControllerServiceLookup to inherit from it.
84a05c8 is described below
commit 84a05c8595a5aa61c33723aa20418048230bf7e3
Author: Tamas Palfy <tp...@cloudera.com>
AuthorDate: Tue Oct 1 11:25:30 2019 +0200
NIFI-6729 - Created AbstractSingleAttributeBasedControllerServiceLookup and updated DBCPConnectionPoolLookup and AzureStorageCredentialsControllerServiceLookup to inherit from it.
This closes #3774.
Signed-off-by: Peter Turcsanyi <tu...@apache.org>
---
.../nifi-azure-processors/pom.xml | 5 +
...eStorageCredentialsControllerServiceLookup.java | 108 +-------
.../{ => nifi-service-utils}/pom.xml | 43 ++--
...ingleAttributeBasedControllerServiceLookup.java | 162 ++++++++++++
...ingleAttributeBasedControllerServiceLookup.java | 276 +++++++++++++++++++++
nifi-nar-bundles/nifi-extension-utils/pom.xml | 1 +
.../nifi-dbcp-service/pom.xml | 5 +
.../apache/nifi/dbcp/DBCPConnectionPoolLookup.java | 104 ++------
8 files changed, 501 insertions(+), 203 deletions(-)
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml
index 2831fef..a32c4bd 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml
@@ -35,6 +35,11 @@
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-service-utils</artifactId>
+ <version>1.11.0-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
<artifactId>nifi-record-serialization-service-api</artifactId>
</dependency>
<dependency>
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/services/azure/storage/AzureStorageCredentialsControllerServiceLookup.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/services/azure/storage/AzureStorageCredentialsControllerServiceLookup.java
index 4899715..4ac2f07 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/services/azure/storage/AzureStorageCredentialsControllerServiceLookup.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/services/azure/storage/AzureStorageCredentialsControllerServiceLookup.java
@@ -16,26 +16,11 @@
*/
package org.apache.nifi.services.azure.storage;
-import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
-import org.apache.nifi.annotation.lifecycle.OnDisabled;
-import org.apache.nifi.annotation.lifecycle.OnEnabled;
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.components.ValidationContext;
-import org.apache.nifi.components.ValidationResult;
-import org.apache.nifi.controller.AbstractControllerService;
-import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.expression.ExpressionLanguageScope;
-import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processor.util.StandardValidators;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
+import org.apache.nifi.service.lookup.AbstractSingleAttributeBasedControllerServiceLookup;
import java.util.Map;
@Tags({ "azure", "microsoft", "cloud", "storage", "blob", "queue", "credentials" })
@@ -45,97 +30,26 @@ import java.util.Map;
"This will allow multiple AzureStorageCredentialsServices to be defined and registered, and then selected dynamically at runtime by tagging flow files " +
"with the appropriate 'azure.storage.credentials.name' attribute.")
@DynamicProperty(name = "The name to register AzureStorageCredentialsService", value = "The AzureStorageCredentialsService",
- description = "If 'azure.storage.credentials.name' attribute contains the name of the dynamic property, then the AzureStorageCredentialsService (registered in the value) will be selected.",
+ description = "If '" + AzureStorageCredentialsControllerServiceLookup.AZURE_STORAGE_CREDENTIALS_NAME_ATTRIBUTE + "' attribute contains " +
+ "the name of the dynamic property, then the AzureStorageCredentialsService (registered in the value) will be selected.",
expressionLanguageScope = ExpressionLanguageScope.NONE)
-public class AzureStorageCredentialsControllerServiceLookup extends AbstractControllerService implements AzureStorageCredentialsService {
+public class AzureStorageCredentialsControllerServiceLookup
+ extends AbstractSingleAttributeBasedControllerServiceLookup<AzureStorageCredentialsService> implements AzureStorageCredentialsService {
public static final String AZURE_STORAGE_CREDENTIALS_NAME_ATTRIBUTE = "azure.storage.credentials.name";
- private volatile Map<String, AzureStorageCredentialsService> serviceMap;
-
@Override
- protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
- return new PropertyDescriptor.Builder()
- .name(propertyDescriptorName)
- .description("The " + AzureStorageCredentialsService.class.getSimpleName() + " to return when " + AZURE_STORAGE_CREDENTIALS_NAME_ATTRIBUTE + " = '" + propertyDescriptorName + "'")
- .identifiesControllerService(AzureStorageCredentialsService.class)
- .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
- .build();
+ protected String getLookupAttribute() {
+ return AZURE_STORAGE_CREDENTIALS_NAME_ATTRIBUTE;
}
@Override
- protected Collection<ValidationResult> customValidate(ValidationContext context) {
- final List<ValidationResult> results = new ArrayList<>();
-
- int numDefinedServices = 0;
- for (final PropertyDescriptor descriptor : context.getProperties().keySet()) {
- if (descriptor.isDynamic()) {
- numDefinedServices++;
- }
-
- final String referencedId = context.getProperty(descriptor).getValue();
- if (this.getIdentifier().equals(referencedId)) {
- results.add(new ValidationResult.Builder()
- .subject(descriptor.getDisplayName())
- .explanation("the current service cannot be registered as an " + AzureStorageCredentialsService.class.getSimpleName() + " to lookup")
- .valid(false)
- .build());
- }
- }
-
- if (numDefinedServices == 0) {
- results.add(new ValidationResult.Builder()
- .subject(this.getClass().getSimpleName())
- .explanation("at least one " + AzureStorageCredentialsService.class.getSimpleName() + " must be defined via dynamic properties")
- .valid(false)
- .build());
- }
-
- return results;
- }
-
- @OnEnabled
- public void onEnabled(final ConfigurationContext context) {
- final Map<String, AzureStorageCredentialsService> map = new HashMap<>();
-
- for (final PropertyDescriptor descriptor : context.getProperties().keySet()) {
- if (descriptor.isDynamic()) {
- final AzureStorageCredentialsService service = context.getProperty(descriptor).asControllerService(AzureStorageCredentialsService.class);
- map.put(descriptor.getName(), service);
- }
- }
-
- serviceMap = Collections.unmodifiableMap(map);
- }
-
- @OnDisabled
- public void onDisabled() {
- serviceMap = null;
+ public Class<AzureStorageCredentialsService> getServiceType() {
+ return AzureStorageCredentialsService.class;
}
@Override
- public AzureStorageCredentialsDetails getStorageCredentialsDetails(Map<String, String> attributes) throws ProcessException {
- final AzureStorageCredentialsService service = lookupAzureStorageCredentialsService(attributes);
-
- return service.getStorageCredentialsDetails(attributes);
- }
-
- private AzureStorageCredentialsService lookupAzureStorageCredentialsService(Map<String, String> attributes) {
- if (!attributes.containsKey(AZURE_STORAGE_CREDENTIALS_NAME_ATTRIBUTE)) {
- throw new ProcessException("Attributes must contain an attribute name '" + AZURE_STORAGE_CREDENTIALS_NAME_ATTRIBUTE + "'");
- }
-
- final String storageCredentialService = attributes.get(AZURE_STORAGE_CREDENTIALS_NAME_ATTRIBUTE);
- if (StringUtils.isBlank(storageCredentialService)) {
- throw new ProcessException(AZURE_STORAGE_CREDENTIALS_NAME_ATTRIBUTE + " cannot be null or blank");
- }
-
- final AzureStorageCredentialsService service = serviceMap.get(storageCredentialService);
- if (service == null) {
- throw new ProcessException("No " + AzureStorageCredentialsService.class.getSimpleName() + " was found for " +
- AZURE_STORAGE_CREDENTIALS_NAME_ATTRIBUTE + " '" + storageCredentialService + "'");
- }
-
- return service;
+ public AzureStorageCredentialsDetails getStorageCredentialsDetails(Map<String, String> attributes) {
+ return lookupService(attributes).getStorageCredentialsDetails(attributes);
}
}
diff --git a/nifi-nar-bundles/nifi-extension-utils/pom.xml b/nifi-nar-bundles/nifi-extension-utils/nifi-service-utils/pom.xml
similarity index 50%
copy from nifi-nar-bundles/nifi-extension-utils/pom.xml
copy to nifi-nar-bundles/nifi-extension-utils/nifi-service-utils/pom.xml
index 3d1ce48..c6c9f2e 100644
--- a/nifi-nar-bundles/nifi-extension-utils/pom.xml
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-service-utils/pom.xml
@@ -17,23 +17,34 @@
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
- <artifactId>nifi-nar-bundles</artifactId>
+ <artifactId>nifi-extension-utils</artifactId>
<version>1.11.0-SNAPSHOT</version>
</parent>
- <packaging>pom</packaging>
- <artifactId>nifi-extension-utils</artifactId>
+ <artifactId>nifi-service-utils</artifactId>
+ <packaging>jar</packaging>
<description>
- This module contains reusable utilities related to extensions that can be shared across NARs.
+ This nifi-service-utils module is designed to capture common patterns
+ and utilities that can be leveraged by other services or components to
+ help promote reuse. These patterns may become framework level features
+ or may simply be made available through this utility. It is ok for this
+ module to have dependencies but care should be taken when adding dependencies
+ as this increases the cost of utilizing this module in various nars.
</description>
-
- <modules>
- <module>nifi-record-utils</module>
- <module>nifi-hadoop-utils</module>
- <module>nifi-processor-utils</module>
- <module>nifi-reporting-utils</module>
- <module>nifi-syslog-utils</module>
- <module>nifi-database-utils</module>
- <module>nifi-database-test-utils</module>
- </modules>
-
-</project>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-utils</artifactId>
+ <version>1.11.0-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-mock</artifactId>
+ <version>1.11.0-SNAPSHOT</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+</project>
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-service-utils/src/main/java/org/apache/nifi/service/lookup/AbstractSingleAttributeBasedControllerServiceLookup.java b/nifi-nar-bundles/nifi-extension-utils/nifi-service-utils/src/main/java/org/apache/nifi/service/lookup/AbstractSingleAttributeBasedControllerServiceLookup.java
new file mode 100644
index 0000000..ed204ec
--- /dev/null
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-service-utils/src/main/java/org/apache/nifi/service/lookup/AbstractSingleAttributeBasedControllerServiceLookup.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions andf
+ * limitations under the License.
+ */
+package org.apache.nifi.service.lookup;
+
+import org.apache.nifi.annotation.lifecycle.OnDisabled;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.controller.ControllerService;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * A lookup ControllerService that can choose one (from probably multiple) ControllerServices of the given type {@link S}.
+ * <p>
+ * Selection is based on a single {@linkplain String} lookup key.
+ * <p>
+ * Lookup key is provided as a value in an attribute map (usually coming form a flowfile)
+ * with a predefined key (see {@link #getLookupAttribute()}).
+ *
+ * @param <S> The type of service to be looked up
+ */
+public abstract class AbstractSingleAttributeBasedControllerServiceLookup<S extends ControllerService> extends AbstractControllerService {
+ protected volatile Map<String, S> serviceMap;
+
+ /**
+ * @return the Class that represents the type of service that will be returned by {@link #lookupService(Map)}
+ */
+ public abstract Class<S> getServiceType();
+
+ /**
+ * @return the name of attribute (usually from a flowfile) the value of which serves as the lookup key
+ * for the desired service (of type {@link S})
+ */
+ protected abstract String getLookupAttribute();
+
+ /**
+ * Returns a ControllerService (of type {@link S}) based on the provided attributes map (usually by retrieving
+ * a lookup attribute from it via {@link #getLookupAttribute()} and use it to identify the appropriate service).
+ * @param attributes Map containing the lookup attribute based on which ControllerService is chosen
+ * @return the chosen ControllerService
+ */
+ public S lookupService(Map<String, String> attributes) {
+ if (attributes == null) {
+ throw new ProcessException("Attributes map is null");
+ } else if (!attributes.containsKey(getLookupAttribute())) {
+ throw new ProcessException("Attributes must contain an attribute name '" + getLookupAttribute() + "'");
+ }
+
+ Object lookupKey = Optional.of(getLookupAttribute())
+ .map(attributes::get)
+ .orElseThrow(() -> new ProcessException(getLookupAttribute() + " cannot be null or blank"));
+
+ S service = serviceMap.get(lookupKey);
+
+ if (service == null) {
+ throw new ProcessException("No " + getServiceName() + " found for " + getLookupAttribute());
+ }
+
+ return service;
+ }
+
+ @OnEnabled
+ public void onEnabled(final ConfigurationContext context) {
+ Map<String, S> serviceMap = new HashMap<>();
+
+ context.getProperties().keySet().stream()
+ .filter(PropertyDescriptor::isDynamic)
+ .forEach(propertyDescriptor -> {
+ S service = context.getProperty(propertyDescriptor).asControllerService(getServiceType());
+
+ serviceMap.put(propertyDescriptor.getName(), service);
+ });
+
+ this.serviceMap = Collections.unmodifiableMap(serviceMap);
+ }
+
+ @OnDisabled
+ public void onDisabled() {
+ serviceMap = null;
+ }
+
+ @Override
+ protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
+ return lookupKeyPropertyDescriptor(propertyDescriptorName);
+ }
+
+ @Override
+ protected Collection<ValidationResult> customValidate(ValidationContext context) {
+ return validateForAtLeastOneService(context);
+ }
+
+ protected PropertyDescriptor lookupKeyPropertyDescriptor(String propertyDescriptorName) {
+ return new PropertyDescriptor.Builder()
+ .name(propertyDescriptorName)
+ .description("The " + getServiceName() + " to return when " + getLookupAttribute() + " = '" + propertyDescriptorName + "'")
+ .identifiesControllerService(getServiceType())
+ .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+ .build();
+ }
+
+ private Collection<ValidationResult> validateForAtLeastOneService(ValidationContext context) {
+ final List<ValidationResult> results = new ArrayList<>();
+
+ int numDefinedServices = 0;
+ for (final PropertyDescriptor descriptor : context.getProperties().keySet()) {
+ if (descriptor.isDynamic()) {
+ numDefinedServices++;
+ }
+
+ final String referencedId = context.getProperty(descriptor).getValue();
+ if (this.getIdentifier().equals(referencedId)) {
+ numDefinedServices--;
+
+ results.add(new ValidationResult.Builder()
+ .subject(descriptor.getDisplayName())
+ .explanation("the current service cannot be registered as a " + getServiceName() + " to lookup")
+ .valid(false)
+ .build());
+ }
+ }
+
+ if (numDefinedServices == 0) {
+ results.add(new ValidationResult.Builder()
+ .subject(this.getClass().getSimpleName())
+ .explanation("at least one " + getServiceName() + " must be defined via dynamic properties")
+ .valid(false)
+ .build());
+ }
+
+ return results;
+ }
+
+ protected String getServiceName() {
+ return getServiceType().getSimpleName();
+ }
+}
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-service-utils/src/test/java/org/apache/nifi/service/lookup/TestAbstractSingleAttributeBasedControllerServiceLookup.java b/nifi-nar-bundles/nifi-extension-utils/nifi-service-utils/src/test/java/org/apache/nifi/service/lookup/TestAbstractSingleAttributeBasedControllerServiceLookup.java
new file mode 100644
index 0000000..92bd751
--- /dev/null
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-service-utils/src/test/java/org/apache/nifi/service/lookup/TestAbstractSingleAttributeBasedControllerServiceLookup.java
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions andf
+ * limitations under the License.
+ */
+package org.apache.nifi.service.lookup;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.controller.ControllerService;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.util.MockConfigurationContext;
+import org.apache.nifi.util.MockControllerServiceInitializationContext;
+import org.apache.nifi.util.MockProcessContext;
+import org.apache.nifi.util.MockValidationContext;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.hamcrest.CoreMatchers.hasItem;
+import static org.hamcrest.CoreMatchers.hasItems;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class TestAbstractSingleAttributeBasedControllerServiceLookup {
+ private static final String LOOKUP_ATTRIBUTE = "lookupAttribute";
+ private static final String TEST_SUBJECT_IDENTIFIER = "testSubjectIdentifier";
+ private static final Class<ControllerService> SERVICE_TYPE = ControllerService.class;
+
+ private final AbstractSingleAttributeBasedControllerServiceLookup<ControllerService> testSubject = Mockito.spy(AbstractSingleAttributeBasedControllerServiceLookup.class);
+
+ private Map<PropertyDescriptor, String> properties;
+
+ @Before
+ public void setUp() throws Exception {
+ when(testSubject.getLookupAttribute()).thenReturn(LOOKUP_ATTRIBUTE);
+ when(testSubject.getServiceType()).thenReturn(SERVICE_TYPE);
+ when(testSubject.getIdentifier()).thenReturn(TEST_SUBJECT_IDENTIFIER);
+
+ properties = new HashMap<>();
+ }
+
+ @Test(expected = Exception.class)
+ public void testLookupShouldThrowExceptionWhenQueriedServiceMappedInPropertiesButWasntCreated() {
+ // GIVEN
+ String mappedCreatedServiceID = "mappedCreatedServiceID";
+ String mappedNotCreatedServiceID = "mappedNotCreatedServiceID";
+
+ ControllerService mappedCreatedService = mock(SERVICE_TYPE);
+
+ MockControllerServiceInitializationContext serviceLookup = new MockControllerServiceInitializationContext(mappedCreatedService, mappedCreatedServiceID);
+
+ String dynamicProperty1 = "property1";
+ String dynamicProperty2 = "property2";
+
+ mapService(dynamicProperty1, mappedCreatedServiceID);
+ mapService(dynamicProperty2, mappedNotCreatedServiceID);
+
+ // WHEN
+ testSubject.onEnabled(new MockConfigurationContext(properties, serviceLookup));
+ }
+
+ @Test
+ public void testLookupShouldThrowExceptionWhenAttributeMapIsNull() {
+ // GIVEN
+ String mappedCreatedServiceID = "mappedCreatedServiceID";
+ ControllerService mappedCreatedService = mock(SERVICE_TYPE);
+ MockControllerServiceInitializationContext serviceLookup = new MockControllerServiceInitializationContext(mappedCreatedService, mappedCreatedServiceID);
+
+ // WHEN
+ testSubject.onEnabled(new MockConfigurationContext(properties, serviceLookup));
+ try {
+ testSubject.lookupService(null);
+ fail();
+ } catch (ProcessException e) {
+ assertEquals("Attributes map is null", e.getMessage());
+ }
+ }
+
+ @Test
+ public void testLookupShouldThrowExceptionWhenAttributeMapHasNoLookupAttribute() {
+ // GIVEN
+ String mappedCreatedServiceID = "mappedCreatedServiceID";
+ ControllerService mappedCreatedService = mock(SERVICE_TYPE);
+ MockControllerServiceInitializationContext serviceLookup = new MockControllerServiceInitializationContext(mappedCreatedService, mappedCreatedServiceID);
+
+ // WHEN
+ testSubject.onEnabled(new MockConfigurationContext(properties, serviceLookup));
+ try {
+ testSubject.lookupService(new HashMap<>());
+ fail();
+ } catch (ProcessException e) {
+ assertEquals("Attributes must contain an attribute name '" + LOOKUP_ATTRIBUTE + "'", e.getMessage());
+ }
+ }
+
+ @Test
+ public void testLookupShouldThrowExceptionWhenQueriedServiceWasCreatedButWasntMappedInProperties() {
+ // GIVEN
+ String mappedCreatedServiceID = "mappedCreatedServiceID";
+ String notMappedCreatedServiceID = "notMappedCreatedServiceID";
+
+ ControllerService mappedCreatedService = mock(SERVICE_TYPE);
+ ControllerService notMappedCreatedService = mock(SERVICE_TYPE);
+
+ MockControllerServiceInitializationContext serviceLookup = new MockControllerServiceInitializationContext(mappedCreatedService, mappedCreatedServiceID);
+ serviceLookup.addControllerService(notMappedCreatedService, notMappedCreatedServiceID);
+
+ String dynamicProperty1 = "property1";
+ String dynamicProperty2 = "property2";
+
+ mapService(dynamicProperty1, mappedCreatedServiceID);
+
+ String lookupServiceKey = dynamicProperty2;
+
+ // WHEN
+ testSubject.onEnabled(new MockConfigurationContext(properties, serviceLookup));
+ try {
+ testSubject.lookupService(createAttributes(lookupServiceKey));
+ fail();
+ } catch (ProcessException e) {
+ assertEquals("No ControllerService found for lookupAttribute", e.getMessage());
+ }
+ }
+
+ @Test
+ public void testLookupShouldReturnQueriedService() {
+ // GIVEN
+ String mappedCreatedServiceID1 = "mappedCreatedServiceID1";
+ String mappedCreatedServiceID2 = "mappedCreatedServiceID2";
+
+ ControllerService mappedCreatedService1 = mock(SERVICE_TYPE);
+ ControllerService mappedCreatedService2 = mock(SERVICE_TYPE);
+
+ MockControllerServiceInitializationContext serviceLookup = new MockControllerServiceInitializationContext(mappedCreatedService1, mappedCreatedServiceID1);
+ serviceLookup.addControllerService(mappedCreatedService2, mappedCreatedServiceID2);
+
+ String dynamicProperty1 = "property1";
+ String dynamicProperty2 = "property2";
+
+ mapService(dynamicProperty1, mappedCreatedServiceID1);
+ mapService(dynamicProperty2, mappedCreatedServiceID2);
+
+ String lookupServiceKey = dynamicProperty2;
+ ControllerService expected = mappedCreatedService2;
+
+ // WHEN
+ testSubject.onEnabled(new MockConfigurationContext(properties, serviceLookup));
+ ControllerService actual = testSubject.lookupService(createAttributes(lookupServiceKey));
+
+ // THEN
+ assertEquals(expected, actual);
+ }
+
+ @Test
+ public void testCustomValidateShouldReturnErrorWhenNoServiceIsDefined() {
+ // GIVEN
+ ValidationContext context = new MockValidationContext(new MockProcessContext(testSubject));
+
+ // WHEN
+ Collection<ValidationResult> actual = testSubject.customValidate(context);
+
+ // THEN
+ assertThat(
+ actual.stream().map(ValidationResult::getExplanation).collect(Collectors.toList()),
+ hasItem(containsString("at least one " + SERVICE_TYPE.getSimpleName() + " must be defined via dynamic properties"))
+ );
+ }
+
+ @Test
+ public void testCustomValidateShouldReturnErrorWhenSelfAndOtherServiceIsMapped() {
+ MockProcessContext processContext = new MockProcessContext(testSubject);
+ processContext.setProperty("property1", "service1");
+ processContext.setProperty("property2", TEST_SUBJECT_IDENTIFIER);
+
+ ValidationContext context = new MockValidationContext(processContext);
+
+ // WHEN
+ Collection<ValidationResult> actual = testSubject.customValidate(context);
+
+ // THEN
+ assertThat(
+ actual.stream().map(ValidationResult::getExplanation).collect(Collectors.toList()),
+ hasItem(containsString("the current service cannot be registered as a " + SERVICE_TYPE.getSimpleName() + " to lookup"))
+ );
+ }
+
+ @Test
+ public void testCustomValidateShouldReturnErrorsWhenOnlySelfIsMapped() {
+ MockProcessContext processContext = new MockProcessContext(testSubject);
+ processContext.setProperty("property1", TEST_SUBJECT_IDENTIFIER);
+
+ ValidationContext context = new MockValidationContext(processContext);
+
+ // WHEN
+ Collection<ValidationResult> actual = testSubject.customValidate(context);
+
+ // THEN
+ assertThat(
+ actual.stream().map(ValidationResult::getExplanation).collect(Collectors.toList()),
+ hasItems(
+ containsString("the current service cannot be registered as a " + SERVICE_TYPE.getSimpleName() + " to lookup"),
+ containsString("at least one " + SERVICE_TYPE.getSimpleName() + " must be defined via dynamic properties")
+ )
+ );
+ }
+
+ @Test
+ public void testCustomValidateShouldReturnNoErrorWhenAServiceIsDefined() {
+ MockProcessContext processContext = new MockProcessContext(testSubject);
+ processContext.setProperty("property1", "service1");
+
+ ValidationContext context = new MockValidationContext(processContext);
+
+ // WHEN
+ Collection<ValidationResult> actual = testSubject.customValidate(context);
+
+ // THEN
+ assertEquals(Collections.emptyList(), new ArrayList<>(actual));
+ }
+
+ @Test
+ public void testGetServiceType() {
+ Class<ControllerService> actual = testSubject.getServiceType();
+ assertEquals(SERVICE_TYPE, actual);
+ }
+
+ @Test
+ public void testLookupAttribute() {
+ String actual = testSubject.getLookupAttribute();
+ assertEquals(LOOKUP_ATTRIBUTE, actual);
+ }
+
+ private void mapService(String dynamicProperty, String registeredService) {
+ properties.put(
+ new PropertyDescriptor.Builder()
+ .name(dynamicProperty)
+ .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+ .dynamic(true)
+ .build(),
+ registeredService
+ );
+ }
+
+ private Map<String, String> createAttributes(final String lookupValue) {
+ Map<String, String> attributes = new HashMap<String, String>() {{
+ put(LOOKUP_ATTRIBUTE, lookupValue);
+ }};
+
+ return attributes;
+ }
+}
diff --git a/nifi-nar-bundles/nifi-extension-utils/pom.xml b/nifi-nar-bundles/nifi-extension-utils/pom.xml
index 3d1ce48..6d9febf 100644
--- a/nifi-nar-bundles/nifi-extension-utils/pom.xml
+++ b/nifi-nar-bundles/nifi-extension-utils/pom.xml
@@ -34,6 +34,7 @@
<module>nifi-syslog-utils</module>
<module>nifi-database-utils</module>
<module>nifi-database-test-utils</module>
+ <module>nifi-service-utils</module>
</modules>
</project>
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/pom.xml b/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/pom.xml
index e8c54c8..c09b9ff 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/pom.xml
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/pom.xml
@@ -39,6 +39,11 @@
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-service-utils</artifactId>
+ <version>1.11.0-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
<artifactId>nifi-security-utils</artifactId>
<version>1.11.0-SNAPSHOT</version>
</dependency>
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/main/java/org/apache/nifi/dbcp/DBCPConnectionPoolLookup.java b/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/main/java/org/apache/nifi/dbcp/DBCPConnectionPoolLookup.java
index 1198eb5..b0fb964 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/main/java/org/apache/nifi/dbcp/DBCPConnectionPoolLookup.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/main/java/org/apache/nifi/dbcp/DBCPConnectionPoolLookup.java
@@ -16,27 +16,15 @@
*/
package org.apache.nifi.dbcp;
-import org.apache.commons.lang3.StringUtils;
+
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
-import org.apache.nifi.annotation.lifecycle.OnDisabled;
-import org.apache.nifi.annotation.lifecycle.OnEnabled;
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.components.ValidationContext;
-import org.apache.nifi.components.ValidationResult;
-import org.apache.nifi.controller.AbstractControllerService;
-import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.service.lookup.AbstractSingleAttributeBasedControllerServiceLookup;
import java.sql.Connection;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
import java.util.Map;
@Tags({ "dbcp", "jdbc", "database", "connection", "pooling", "store" })
@@ -45,72 +33,23 @@ import java.util.Map;
"if the attribute is missing. The value of 'database.name' will be used to select the DBCPService that has been " +
"registered with that name. This will allow multiple DBCPServices to be defined and registered, and then selected " +
"dynamically at runtime by tagging flow files with the appropriate 'database.name' attribute.")
-@DynamicProperty(name = "The ", value = "JDBC property value", expressionLanguageScope = ExpressionLanguageScope.NONE,
- description = "")
-public class DBCPConnectionPoolLookup extends AbstractControllerService implements DBCPService {
+@DynamicProperty(name = "The name to register DBCPService", value = "The DBCPService",
+ description = "If '"+ DBCPConnectionPoolLookup.DATABASE_NAME_ATTRIBUTE +"' attribute contains " +
+ "the name of the dynamic property, then the DBCPService (registered in the value) will be selected.",
+ expressionLanguageScope = ExpressionLanguageScope.NONE)
+public class DBCPConnectionPoolLookup
+ extends AbstractSingleAttributeBasedControllerServiceLookup<DBCPService> implements DBCPService {
public static final String DATABASE_NAME_ATTRIBUTE = "database.name";
- private volatile Map<String,DBCPService> dbcpServiceMap;
-
@Override
- protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
- return new PropertyDescriptor.Builder()
- .name(propertyDescriptorName)
- .description("The DBCPService to return when database.name = '" + propertyDescriptorName + "'")
- .identifiesControllerService(DBCPService.class)
- .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
- .build();
+ protected String getLookupAttribute() {
+ return DATABASE_NAME_ATTRIBUTE;
}
@Override
- protected Collection<ValidationResult> customValidate(ValidationContext context) {
- final List<ValidationResult> results = new ArrayList<>();
-
- int numDefinedServices = 0;
- for (final PropertyDescriptor descriptor : context.getProperties().keySet()) {
- if (descriptor.isDynamic()) {
- numDefinedServices++;
- }
-
- final String referencedId = context.getProperty(descriptor).getValue();
- if (this.getIdentifier().equals(referencedId)) {
- results.add(new ValidationResult.Builder()
- .subject(descriptor.getDisplayName())
- .explanation("the current service cannot be registered as a DBCPService to lookup")
- .valid(false)
- .build());
- }
- }
-
- if (numDefinedServices == 0) {
- results.add(new ValidationResult.Builder()
- .subject(this.getClass().getSimpleName())
- .explanation("at least one DBCPService must be defined via dynamic properties")
- .valid(false)
- .build());
- }
-
- return results;
- }
-
- @OnEnabled
- public void onEnabled(final ConfigurationContext context) {
- final Map<String,DBCPService> serviceMap = new HashMap<>();
-
- for (final PropertyDescriptor descriptor : context.getProperties().keySet()) {
- if (descriptor.isDynamic()) {
- final DBCPService dbcpService = context.getProperty(descriptor).asControllerService(DBCPService.class);
- serviceMap.put(descriptor.getName(), dbcpService);
- }
- }
-
- dbcpServiceMap = Collections.unmodifiableMap(serviceMap);
- }
-
- @OnDisabled
- public void onDisabled() {
- dbcpServiceMap = null;
+ public Class<DBCPService> getServiceType() {
+ return DBCPService.class;
}
@Override
@@ -119,22 +58,7 @@ public class DBCPConnectionPoolLookup extends AbstractControllerService implemen
}
@Override
- public Connection getConnection(Map<String, String> attributes) throws ProcessException {
- if (!attributes.containsKey(DATABASE_NAME_ATTRIBUTE)) {
- throw new ProcessException("Attributes must contain an attribute name '" + DATABASE_NAME_ATTRIBUTE + "'");
- }
-
- final String databaseName = attributes.get(DATABASE_NAME_ATTRIBUTE);
- if (StringUtils.isBlank(databaseName)) {
- throw new ProcessException(DATABASE_NAME_ATTRIBUTE + " cannot be null or blank");
- }
-
- final DBCPService dbcpService = dbcpServiceMap.get(databaseName);
- if (dbcpService == null) {
- throw new ProcessException("No DBCPService was found for database.name '" + databaseName + "'");
- }
-
- return dbcpService.getConnection(attributes);
+ public Connection getConnection(Map<String, String> attributes) {
+ return lookupService(attributes).getConnection(attributes);
}
-
}