You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by bbende <gi...@git.apache.org> on 2018/05/23 16:39:12 UTC

[GitHub] nifi pull request #2735: NIFI-5229 Adding a DBCPService implementation that ...

GitHub user bbende opened a pull request:

    https://github.com/apache/nifi/pull/2735

    NIFI-5229 Adding a DBCPService implementation that can lookup other D…

    …BCPServices dynamically at runtime
    
    Thank you for submitting a contribution to Apache NiFi.
    
    In order to streamline the review of the contribution we ask you
    to ensure the following steps have been taken:
    
    ### For all changes:
    - [ ] Is there a JIRA ticket associated with this PR? Is it referenced 
         in the commit message?
    
    - [ ] Does your PR title start with NIFI-XXXX where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character.
    
    - [ ] Has your PR been rebased against the latest commit within the target branch (typically master)?
    
    - [ ] Is your initial contribution a single, squashed commit?
    
    ### For code changes:
    - [ ] Have you ensured that the full suite of tests is executed via mvn -Pcontrib-check clean install at the root nifi folder?
    - [ ] Have you written or updated unit tests to verify your changes?
    - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)? 
    - [ ] If applicable, have you updated the LICENSE file, including the main LICENSE file under nifi-assembly?
    - [ ] If applicable, have you updated the NOTICE file, including the main NOTICE file found under nifi-assembly?
    - [ ] If adding new Properties, have you added .displayName in addition to .name (programmatic access) for each of the new properties?
    
    ### For documentation related changes:
    - [ ] Have you ensured that format looks appropriate for the output in which it is rendered?
    
    ### Note:
    Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible.


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/bbende/nifi NIFI-5229

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/nifi/pull/2735.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2735
    
----
commit 6bfb90019cb1404c63c4859e24586136e3c47534
Author: Bryan Bende <bb...@...>
Date:   2018-05-23T15:21:12Z

    NIFI-5229 Adding a DBCPService implementation that can lookup other DBCPServices dynamically at runtime

----


---

[GitHub] nifi pull request #2735: NIFI-5229 Adding a DBCPService implementation that ...

Posted by mattyb149 <gi...@git.apache.org>.
Github user mattyb149 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2735#discussion_r191038329
  
    --- Diff: nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/test/java/org/apache/nifi/dbcp/TestDBCPConnectionPoolLookup.java ---
    @@ -0,0 +1,182 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.dbcp;
    +
    +import org.apache.nifi.controller.AbstractControllerService;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.reporting.InitializationException;
    +import org.apache.nifi.util.TestRunner;
    +import org.apache.nifi.util.TestRunners;
    +import org.junit.Before;
    +import org.junit.Test;
    +
    +import java.sql.Connection;
    +import java.util.HashMap;
    +import java.util.Map;
    +
    +import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.assertNotNull;
    +import static org.junit.Assert.assertTrue;
    +import static org.mockito.Mockito.mock;
    +import static org.mockito.Mockito.when;
    +
    +public class TestDBCPConnectionPoolLookup {
    --- End diff --
    
    Same comment on naming but IDEs might take care of it :)


---

[GitHub] nifi pull request #2735: NIFI-5229 Adding a DBCPService implementation that ...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/nifi/pull/2735


---

[GitHub] nifi pull request #2735: NIFI-5229 Adding a DBCPService implementation that ...

Posted by mattyb149 <gi...@git.apache.org>.
Github user mattyb149 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2735#discussion_r191038289
  
    --- Diff: nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/main/java/org/apache/nifi/dbcp/DBCPConnectionPoolLookup.java ---
    @@ -0,0 +1,140 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.dbcp;
    +
    +import org.apache.commons.lang3.StringUtils;
    +import org.apache.nifi.annotation.behavior.DynamicProperty;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnDisabled;
    +import org.apache.nifi.annotation.lifecycle.OnEnabled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.ValidationContext;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.controller.AbstractControllerService;
    +import org.apache.nifi.controller.ConfigurationContext;
    +import org.apache.nifi.expression.ExpressionLanguageScope;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +
    +import java.sql.Connection;
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +
    +@Tags({ "dbcp", "jdbc", "database", "connection", "pooling", "store" })
    +@CapabilityDescription("Provides a DBCPService that can be used to dynamically select another DBCPService. This service " +
    +        "requires an attribute named 'database.name' to be passed in when asking for a connection, and will throw an exception " +
    +        "if the attribute is missing. The value of 'database.name' will be used to select the DBCPService that has been " +
    +        "registered with that name. This will allow multiple DBCPServices to be defined and registered, and then selected " +
    +        "dynamically at runtime by tagging flow files with the appropriate 'database.name' attribute.")
    +@DynamicProperty(name = "The ", value = "JDBC property value", expressionLanguageScope = ExpressionLanguageScope.NONE,
    +        description = "")
    +public class DBCPConnectionPoolLookup extends AbstractControllerService implements DBCPService {
    --- End diff --
    
    This has the suffix "Lookup" but is really a Delegate in our terminology right? I'm thinking of (was it Joey's?) DB Lookup CS, or when/wherever we get a LookupService impl that can access RDBMS tables... 


---

[GitHub] nifi pull request #2735: NIFI-5229 Adding a DBCPService implementation that ...

Posted by mattyb149 <gi...@git.apache.org>.
Github user mattyb149 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2735#discussion_r191038313
  
    --- Diff: nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/main/java/org/apache/nifi/dbcp/DBCPConnectionPoolLookup.java ---
    @@ -0,0 +1,140 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.dbcp;
    +
    +import org.apache.commons.lang3.StringUtils;
    +import org.apache.nifi.annotation.behavior.DynamicProperty;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnDisabled;
    +import org.apache.nifi.annotation.lifecycle.OnEnabled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.ValidationContext;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.controller.AbstractControllerService;
    +import org.apache.nifi.controller.ConfigurationContext;
    +import org.apache.nifi.expression.ExpressionLanguageScope;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +
    +import java.sql.Connection;
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +
    +@Tags({ "dbcp", "jdbc", "database", "connection", "pooling", "store" })
    +@CapabilityDescription("Provides a DBCPService that can be used to dynamically select another DBCPService. This service " +
    +        "requires an attribute named 'database.name' to be passed in when asking for a connection, and will throw an exception " +
    +        "if the attribute is missing. The value of 'database.name' will be used to select the DBCPService that has been " +
    +        "registered with that name. This will allow multiple DBCPServices to be defined and registered, and then selected " +
    +        "dynamically at runtime by tagging flow files with the appropriate 'database.name' attribute.")
    +@DynamicProperty(name = "The ", value = "JDBC property value", expressionLanguageScope = ExpressionLanguageScope.NONE,
    +        description = "")
    +public class DBCPConnectionPoolLookup extends AbstractControllerService implements DBCPService {
    +
    +    public static final String DATABASE_NAME_ATTRIBUTE = "database.name";
    +
    +    private volatile Map<String,DBCPService> dbcpServiceMap;
    +
    +    @Override
    +    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
    +        return new PropertyDescriptor.Builder()
    +                .name(propertyDescriptorName)
    +                .description("The DBCPService to return when database.name = '" + propertyDescriptorName + "'")
    +                .identifiesControllerService(DBCPService.class)
    +                .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
    +                .build();
    +    }
    +
    +    @Override
    +    protected Collection<ValidationResult> customValidate(ValidationContext context) {
    +        final List<ValidationResult> results = new ArrayList<>();
    +
    +        int numDefinedServices = 0;
    +        for (final PropertyDescriptor descriptor : context.getProperties().keySet()) {
    +            if (descriptor.isDynamic()) {
    +                numDefinedServices++;
    +            }
    +
    +            final String referencedId = context.getProperty(descriptor).getValue();
    +            if (this.getIdentifier().equals(referencedId)) {
    +                results.add(new ValidationResult.Builder()
    +                        .subject(descriptor.getDisplayName())
    +                        .explanation("the current service cannot be registered as a DBCPService to lookup")
    +                        .valid(false)
    +                        .build());
    +            }
    +        }
    +
    +        if (numDefinedServices == 0) {
    +            results.add(new ValidationResult.Builder()
    +                    .subject(this.getClass().getSimpleName())
    +                    .explanation("at least one DBCPService must be defined via dynamic properties")
    +                    .valid(false)
    +                    .build());
    +        }
    +
    +        return results;
    +    }
    +
    +    @OnEnabled
    +    public void onEnabled(final ConfigurationContext context) {
    +        final Map<String,DBCPService> serviceMap = new HashMap<>();
    +
    +        for (final PropertyDescriptor descriptor : context.getProperties().keySet()) {
    +            if (descriptor.isDynamic()) {
    +                final DBCPService dbcpService = context.getProperty(descriptor).asControllerService(DBCPService.class);
    +                serviceMap.put(descriptor.getName(), dbcpService);
    +            }
    +        }
    +
    +        dbcpServiceMap = Collections.unmodifiableMap(serviceMap);
    +    }
    +
    +    @OnDisabled
    +    public void onDisabled() {
    +        dbcpServiceMap = null;
    +    }
    +
    +    @Override
    +    public Connection getConnection() throws ProcessException {
    +        throw new UnsupportedOperationException("Cannot lookup DBCPConnectionPool without attributes");
    --- End diff --
    
    Should there be a property or something for a default connection pool?


---

[GitHub] nifi issue #2735: NIFI-5229 Adding a DBCPService implementation that can loo...

Posted by mattyb149 <gi...@git.apache.org>.
Github user mattyb149 commented on the issue:

    https://github.com/apache/nifi/pull/2735
  
    @MikeThomsen could you include a blurb on any tests (functional, etc. outside of automated stuff) you ran? It's good for posterity :D I and many others are looking forward to this, so just wanted to make sure we had a record of the non-unit-test results, thanks in advance!


---

[GitHub] nifi pull request #2735: NIFI-5229 Adding a DBCPService implementation that ...

Posted by mattyb149 <gi...@git.apache.org>.
Github user mattyb149 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2735#discussion_r191038355
  
    --- Diff: nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/test/java/org/apache/nifi/dbcp/TestDBCPConnectionPoolLookup.java ---
    @@ -0,0 +1,182 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.dbcp;
    +
    +import org.apache.nifi.controller.AbstractControllerService;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.reporting.InitializationException;
    +import org.apache.nifi.util.TestRunner;
    +import org.apache.nifi.util.TestRunners;
    +import org.junit.Before;
    +import org.junit.Test;
    +
    +import java.sql.Connection;
    +import java.util.HashMap;
    +import java.util.Map;
    +
    +import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.assertNotNull;
    +import static org.junit.Assert.assertTrue;
    +import static org.mockito.Mockito.mock;
    +import static org.mockito.Mockito.when;
    +
    +public class TestDBCPConnectionPoolLookup {
    +
    +    private MockConnection connectionA;
    +    private MockConnection connectionB;
    +
    +    private MockDBCPService dbcpServiceA;
    +    private MockDBCPService dbcpServiceB;
    +
    +    private DBCPService dbcpLookupService;
    +    private TestRunner runner;
    +
    +    @Before
    +    public void setup() throws InitializationException {
    +        connectionA = mock(MockConnection.class);
    +        when(connectionA.getName()).thenReturn("A");
    +
    +        connectionB = mock(MockConnection.class);
    +        when(connectionB.getName()).thenReturn("B");
    +
    +        dbcpServiceA = new MockDBCPService(connectionA);
    +        dbcpServiceB = new MockDBCPService(connectionB);
    +
    +        dbcpLookupService = new DBCPConnectionPoolLookup();
    +
    +        runner = TestRunners.newTestRunner(TestProcessor.class);
    +
    +        final String dbcpServiceAIdentifier = "dbcp-a";
    +        runner.addControllerService(dbcpServiceAIdentifier, dbcpServiceA);
    +
    +        final String dbcpServiceBIdentifier = "dbcp-b";
    +        runner.addControllerService(dbcpServiceBIdentifier, dbcpServiceB);
    +
    +        runner.addControllerService("dbcp-lookup", dbcpLookupService);
    +        runner.setProperty(dbcpLookupService, "a", dbcpServiceAIdentifier);
    +        runner.setProperty(dbcpLookupService, "b", dbcpServiceBIdentifier);
    +
    +        runner.enableControllerService(dbcpServiceA);
    +        runner.enableControllerService(dbcpServiceB);
    +        runner.enableControllerService(dbcpLookupService);
    +
    +    }
    +
    +    @Test
    +    public void testLookupServiceA() {
    +        final Map<String,String> attributes = new HashMap<>();
    +        attributes.put(DBCPConnectionPoolLookup.DATABASE_NAME_ATTRIBUTE, "a");
    +
    +        final Connection connection = dbcpLookupService.getConnection(attributes);
    +        assertNotNull(connection);
    +        assertTrue(connection instanceof MockConnection);
    +
    +        final MockConnection mockConnection = (MockConnection)connection;
    +        assertEquals(connectionA.getName(), mockConnection.getName());
    +    }
    +
    +    @Test
    +    public void testLookupServiceB() {
    --- End diff --
    
    Our other "unit" tests use Derby, what about setting a service A to point to something that wouldn't connect, and B to a Derby, then make a valid query? It's more of an integration test but in this area it's not without precedent :)


---

[GitHub] nifi pull request #2735: NIFI-5229 Adding a DBCPService implementation that ...

Posted by MikeThomsen <gi...@git.apache.org>.
Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2735#discussion_r191044793
  
    --- Diff: nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/test/java/org/apache/nifi/dbcp/TestDBCPConnectionPoolLookup.java ---
    @@ -0,0 +1,182 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.dbcp;
    +
    +import org.apache.nifi.controller.AbstractControllerService;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.reporting.InitializationException;
    +import org.apache.nifi.util.TestRunner;
    +import org.apache.nifi.util.TestRunners;
    +import org.junit.Before;
    +import org.junit.Test;
    +
    +import java.sql.Connection;
    +import java.util.HashMap;
    +import java.util.Map;
    +
    +import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.assertNotNull;
    +import static org.junit.Assert.assertTrue;
    +import static org.mockito.Mockito.mock;
    +import static org.mockito.Mockito.when;
    +
    +public class TestDBCPConnectionPoolLookup {
    +
    +    private MockConnection connectionA;
    +    private MockConnection connectionB;
    +
    +    private MockDBCPService dbcpServiceA;
    +    private MockDBCPService dbcpServiceB;
    +
    +    private DBCPService dbcpLookupService;
    +    private TestRunner runner;
    +
    +    @Before
    +    public void setup() throws InitializationException {
    +        connectionA = mock(MockConnection.class);
    +        when(connectionA.getName()).thenReturn("A");
    +
    +        connectionB = mock(MockConnection.class);
    +        when(connectionB.getName()).thenReturn("B");
    +
    +        dbcpServiceA = new MockDBCPService(connectionA);
    +        dbcpServiceB = new MockDBCPService(connectionB);
    +
    +        dbcpLookupService = new DBCPConnectionPoolLookup();
    +
    +        runner = TestRunners.newTestRunner(TestProcessor.class);
    +
    +        final String dbcpServiceAIdentifier = "dbcp-a";
    +        runner.addControllerService(dbcpServiceAIdentifier, dbcpServiceA);
    +
    +        final String dbcpServiceBIdentifier = "dbcp-b";
    +        runner.addControllerService(dbcpServiceBIdentifier, dbcpServiceB);
    +
    +        runner.addControllerService("dbcp-lookup", dbcpLookupService);
    +        runner.setProperty(dbcpLookupService, "a", dbcpServiceAIdentifier);
    +        runner.setProperty(dbcpLookupService, "b", dbcpServiceBIdentifier);
    +
    +        runner.enableControllerService(dbcpServiceA);
    +        runner.enableControllerService(dbcpServiceB);
    +        runner.enableControllerService(dbcpLookupService);
    +
    +    }
    +
    +    @Test
    +    public void testLookupServiceA() {
    +        final Map<String,String> attributes = new HashMap<>();
    +        attributes.put(DBCPConnectionPoolLookup.DATABASE_NAME_ATTRIBUTE, "a");
    +
    +        final Connection connection = dbcpLookupService.getConnection(attributes);
    +        assertNotNull(connection);
    +        assertTrue(connection instanceof MockConnection);
    +
    +        final MockConnection mockConnection = (MockConnection)connection;
    +        assertEquals(connectionA.getName(), mockConnection.getName());
    +    }
    +
    +    @Test
    +    public void testLookupServiceB() {
    --- End diff --
    
    @mattyb149 Since it's a delegate, I focused on reviewing the unit tests because I felt that that would be enough here. The unit tests look correct and thorough to me.


---

[GitHub] nifi pull request #2735: NIFI-5229 Adding a DBCPService implementation that ...

Posted by bbende <gi...@git.apache.org>.
Github user bbende commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2735#discussion_r191057922
  
    --- Diff: nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/test/java/org/apache/nifi/dbcp/TestDBCPConnectionPoolLookup.java ---
    @@ -0,0 +1,182 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.dbcp;
    +
    +import org.apache.nifi.controller.AbstractControllerService;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.reporting.InitializationException;
    +import org.apache.nifi.util.TestRunner;
    +import org.apache.nifi.util.TestRunners;
    +import org.junit.Before;
    +import org.junit.Test;
    +
    +import java.sql.Connection;
    +import java.util.HashMap;
    +import java.util.Map;
    +
    +import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.assertNotNull;
    +import static org.junit.Assert.assertTrue;
    +import static org.mockito.Mockito.mock;
    +import static org.mockito.Mockito.when;
    +
    +public class TestDBCPConnectionPoolLookup {
    +
    +    private MockConnection connectionA;
    +    private MockConnection connectionB;
    +
    +    private MockDBCPService dbcpServiceA;
    +    private MockDBCPService dbcpServiceB;
    +
    +    private DBCPService dbcpLookupService;
    +    private TestRunner runner;
    +
    +    @Before
    +    public void setup() throws InitializationException {
    +        connectionA = mock(MockConnection.class);
    +        when(connectionA.getName()).thenReturn("A");
    +
    +        connectionB = mock(MockConnection.class);
    +        when(connectionB.getName()).thenReturn("B");
    +
    +        dbcpServiceA = new MockDBCPService(connectionA);
    +        dbcpServiceB = new MockDBCPService(connectionB);
    +
    +        dbcpLookupService = new DBCPConnectionPoolLookup();
    +
    +        runner = TestRunners.newTestRunner(TestProcessor.class);
    +
    +        final String dbcpServiceAIdentifier = "dbcp-a";
    +        runner.addControllerService(dbcpServiceAIdentifier, dbcpServiceA);
    +
    +        final String dbcpServiceBIdentifier = "dbcp-b";
    +        runner.addControllerService(dbcpServiceBIdentifier, dbcpServiceB);
    +
    +        runner.addControllerService("dbcp-lookup", dbcpLookupService);
    +        runner.setProperty(dbcpLookupService, "a", dbcpServiceAIdentifier);
    +        runner.setProperty(dbcpLookupService, "b", dbcpServiceBIdentifier);
    +
    +        runner.enableControllerService(dbcpServiceA);
    +        runner.enableControllerService(dbcpServiceB);
    +        runner.enableControllerService(dbcpLookupService);
    +
    +    }
    +
    +    @Test
    +    public void testLookupServiceA() {
    +        final Map<String,String> attributes = new HashMap<>();
    +        attributes.put(DBCPConnectionPoolLookup.DATABASE_NAME_ATTRIBUTE, "a");
    +
    +        final Connection connection = dbcpLookupService.getConnection(attributes);
    +        assertNotNull(connection);
    +        assertTrue(connection instanceof MockConnection);
    +
    +        final MockConnection mockConnection = (MockConnection)connection;
    +        assertEquals(connectionA.getName(), mockConnection.getName());
    +    }
    +
    +    @Test
    +    public void testLookupServiceB() {
    --- End diff --
    
    I felt like the main thing to test here was the logic of selecting/returning the correct connection... whether or not that connection can do something is specific to the impl it came from which I figured we already had tests for.
    
    FWIW the live flow I tested against had DBCPConnectionPools for MySQL and Postgres, and then sent flow files with the database.name attribute into a single ExecuteSQL and verified the SQL was executed against the correct DB.


---

[GitHub] nifi pull request #2735: NIFI-5229 Adding a DBCPService implementation that ...

Posted by bbende <gi...@git.apache.org>.
Github user bbende commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2735#discussion_r191057746
  
    --- Diff: nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/main/java/org/apache/nifi/dbcp/DBCPConnectionPoolLookup.java ---
    @@ -0,0 +1,140 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.dbcp;
    +
    +import org.apache.commons.lang3.StringUtils;
    +import org.apache.nifi.annotation.behavior.DynamicProperty;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnDisabled;
    +import org.apache.nifi.annotation.lifecycle.OnEnabled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.ValidationContext;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.controller.AbstractControllerService;
    +import org.apache.nifi.controller.ConfigurationContext;
    +import org.apache.nifi.expression.ExpressionLanguageScope;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +
    +import java.sql.Connection;
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +
    +@Tags({ "dbcp", "jdbc", "database", "connection", "pooling", "store" })
    +@CapabilityDescription("Provides a DBCPService that can be used to dynamically select another DBCPService. This service " +
    +        "requires an attribute named 'database.name' to be passed in when asking for a connection, and will throw an exception " +
    +        "if the attribute is missing. The value of 'database.name' will be used to select the DBCPService that has been " +
    +        "registered with that name. This will allow multiple DBCPServices to be defined and registered, and then selected " +
    +        "dynamically at runtime by tagging flow files with the appropriate 'database.name' attribute.")
    +@DynamicProperty(name = "The ", value = "JDBC property value", expressionLanguageScope = ExpressionLanguageScope.NONE,
    +        description = "")
    +public class DBCPConnectionPoolLookup extends AbstractControllerService implements DBCPService {
    --- End diff --
    
    Good point, I can see the term "lookup" possibly causing confusion, I'd be fine with renaming, what is your preferred name?
    
    DelegateDBCPConnectionPool
    DelegatingDBCPConnectionPool
    DBCPConnectionPoolDelegate
    
    ?


---

[GitHub] nifi pull request #2735: NIFI-5229 Adding a DBCPService implementation that ...

Posted by bbende <gi...@git.apache.org>.
Github user bbende commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2735#discussion_r191057795
  
    --- Diff: nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/main/java/org/apache/nifi/dbcp/DBCPConnectionPoolLookup.java ---
    @@ -0,0 +1,140 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.dbcp;
    +
    +import org.apache.commons.lang3.StringUtils;
    +import org.apache.nifi.annotation.behavior.DynamicProperty;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnDisabled;
    +import org.apache.nifi.annotation.lifecycle.OnEnabled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.ValidationContext;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.controller.AbstractControllerService;
    +import org.apache.nifi.controller.ConfigurationContext;
    +import org.apache.nifi.expression.ExpressionLanguageScope;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +
    +import java.sql.Connection;
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +
    +@Tags({ "dbcp", "jdbc", "database", "connection", "pooling", "store" })
    +@CapabilityDescription("Provides a DBCPService that can be used to dynamically select another DBCPService. This service " +
    +        "requires an attribute named 'database.name' to be passed in when asking for a connection, and will throw an exception " +
    +        "if the attribute is missing. The value of 'database.name' will be used to select the DBCPService that has been " +
    +        "registered with that name. This will allow multiple DBCPServices to be defined and registered, and then selected " +
    +        "dynamically at runtime by tagging flow files with the appropriate 'database.name' attribute.")
    +@DynamicProperty(name = "The ", value = "JDBC property value", expressionLanguageScope = ExpressionLanguageScope.NONE,
    +        description = "")
    +public class DBCPConnectionPoolLookup extends AbstractControllerService implements DBCPService {
    +
    +    public static final String DATABASE_NAME_ATTRIBUTE = "database.name";
    +
    +    private volatile Map<String,DBCPService> dbcpServiceMap;
    +
    +    @Override
    +    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
    +        return new PropertyDescriptor.Builder()
    +                .name(propertyDescriptorName)
    +                .description("The DBCPService to return when database.name = '" + propertyDescriptorName + "'")
    +                .identifiesControllerService(DBCPService.class)
    +                .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
    +                .build();
    +    }
    +
    +    @Override
    +    protected Collection<ValidationResult> customValidate(ValidationContext context) {
    +        final List<ValidationResult> results = new ArrayList<>();
    +
    +        int numDefinedServices = 0;
    +        for (final PropertyDescriptor descriptor : context.getProperties().keySet()) {
    +            if (descriptor.isDynamic()) {
    +                numDefinedServices++;
    +            }
    +
    +            final String referencedId = context.getProperty(descriptor).getValue();
    +            if (this.getIdentifier().equals(referencedId)) {
    +                results.add(new ValidationResult.Builder()
    +                        .subject(descriptor.getDisplayName())
    +                        .explanation("the current service cannot be registered as a DBCPService to lookup")
    +                        .valid(false)
    +                        .build());
    +            }
    +        }
    +
    +        if (numDefinedServices == 0) {
    +            results.add(new ValidationResult.Builder()
    +                    .subject(this.getClass().getSimpleName())
    +                    .explanation("at least one DBCPService must be defined via dynamic properties")
    +                    .valid(false)
    +                    .build());
    +        }
    +
    +        return results;
    +    }
    +
    +    @OnEnabled
    +    public void onEnabled(final ConfigurationContext context) {
    +        final Map<String,DBCPService> serviceMap = new HashMap<>();
    +
    +        for (final PropertyDescriptor descriptor : context.getProperties().keySet()) {
    +            if (descriptor.isDynamic()) {
    +                final DBCPService dbcpService = context.getProperty(descriptor).asControllerService(DBCPService.class);
    +                serviceMap.put(descriptor.getName(), dbcpService);
    +            }
    +        }
    +
    +        dbcpServiceMap = Collections.unmodifiableMap(serviceMap);
    +    }
    +
    +    @OnDisabled
    +    public void onDisabled() {
    +        dbcpServiceMap = null;
    +    }
    +
    +    @Override
    +    public Connection getConnection() throws ProcessException {
    +        throw new UnsupportedOperationException("Cannot lookup DBCPConnectionPool without attributes");
    --- End diff --
    
    Not opposed to the idea, would it be an optional property or required?
    
    I guess if it's optional then getConnection() can throw exception if no default is provided otherwise use the default, and getConnection(attributes) can use the default when the database.name attribute is missing, or if default isn't provided throw an exception.


---