You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jp...@apache.org on 2016/07/19 17:16:56 UTC

nifi git commit: NIFI-2156: Add ListDatabaseTables processor

Repository: nifi
Updated Branches:
  refs/heads/master 8e2663c54 -> f1ba24032


NIFI-2156: Add ListDatabaseTables processor

This closes #642

Signed-off-by: jpercivall <jo...@yahoo.com>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/f1ba2403
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/f1ba2403
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/f1ba2403

Branch: refs/heads/master
Commit: f1ba2403265b7f8b170862039212c341bb9e352c
Parents: 8e2663c
Author: Matt Burgess <ma...@apache.org>
Authored: Wed Jul 6 22:34:37 2016 -0400
Committer: jpercivall <jo...@yahoo.com>
Committed: Tue Jul 19 13:00:43 2016 -0400

----------------------------------------------------------------------
 .../processors/standard/ListDatabaseTables.java | 317 +++++++++++++++++++
 .../org.apache.nifi.processor.Processor         |   1 +
 .../standard/TestListDatabaseTables.java        | 247 +++++++++++++++
 3 files changed, 565 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/f1ba2403/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListDatabaseTables.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListDatabaseTables.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListDatabaseTables.java
new file mode 100644
index 0000000..d594913
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListDatabaseTables.java
@@ -0,0 +1,317 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.Stateful;
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateManager;
+import org.apache.nifi.components.state.StateMap;
+import org.apache.nifi.dbcp.DBCPService;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.util.StringUtils;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * A processor to retrieve a list of tables (and their metadata) from a database connection
+ */
+@TriggerSerially
+@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
+@Tags({"sql", "list", "jdbc", "table", "database"})
+@CapabilityDescription("Generates a set of flow files, each containing attributes corresponding to metadata about a table from a database connection. Once "
+        + "metadata about a table has been fetched, it will not be fetched again until the Refresh Interval (if set) has elapsed, or until state has been "
+        + "manually cleared.")
+@WritesAttributes({
+        @WritesAttribute(attribute = "db.table.name", description = "Contains the name of a database table from the connection"),
+        @WritesAttribute(attribute = "db.table.catalog", description = "Contains the name of the catalog to which the table belongs (may be null)"),
+        @WritesAttribute(attribute = "db.table.schema", description = "Contains the name of the schema to which the table belongs (may be null)"),
+        @WritesAttribute(attribute = "db.table.fullname", description = "Contains the fully-qualifed table name (possibly including catalog, schema, etc.)"),
+        @WritesAttribute(attribute = "db.table.type",
+                description = "Contains the type of the database table from the connection. Typical types are \"TABLE\", \"VIEW\", \"SYSTEM TABLE\", "
+                        + "\"GLOBAL TEMPORARY\", \"LOCAL TEMPORARY\", \"ALIAS\", \"SYNONYM\""),
+        @WritesAttribute(attribute = "db.table.remarks", description = "Contains the name of a database table from the connection"),
+        @WritesAttribute(attribute = "db.table.count", description = "Contains the number of rows in the table")
+})
+@Stateful(scopes = {Scope.CLUSTER}, description = "After performing a listing of tables, the timestamp of the query is stored. "
+        + "This allows the Processor to not re-list tables the next time that the Processor is run. Specifying the refresh interval in the processor properties will "
+        + "indicate that when the processor detects the interval has elapsed, the state will be reset and tables will be re-listed as a result. "
+        + "This processor is meant to be run on the primary node only.")
+public class ListDatabaseTables extends AbstractProcessor {
+
+    // Attribute names
+    public static final String DB_TABLE_NAME = "db.table.name";
+    public static final String DB_TABLE_CATALOG = "db.table.catalog";
+    public static final String DB_TABLE_SCHEMA = "db.table.schema";
+    public static final String DB_TABLE_FULLNAME = "db.table.fullname";
+    public static final String DB_TABLE_TYPE = "db.table.type";
+    public static final String DB_TABLE_REMARKS = "db.table.remarks";
+    public static final String DB_TABLE_COUNT = "db.table.count";
+
+    // Relationships
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("All FlowFiles that are received are routed to success")
+            .build();
+
+    // Property descriptors
+    public static final PropertyDescriptor DBCP_SERVICE = new PropertyDescriptor.Builder()
+            .name("list-db-tables-db-connection")
+            .displayName("Database Connection Pooling Service")
+            .description("The Controller Service that is used to obtain connection to database")
+            .required(true)
+            .identifiesControllerService(DBCPService.class)
+            .build();
+
+    public static final PropertyDescriptor CATALOG = new PropertyDescriptor.Builder()
+            .name("list-db-tables-catalog")
+            .displayName("Catalog")
+            .description("The name of a catalog from which to list database tables. The name must match the catalog name as it is stored in the database. "
+                    + "If the property is not set, the catalog name will not be used to narrow the search for tables. If the property is set to an empty string, "
+                    + "tables without a catalog will be listed.")
+            .required(false)
+            .addValidator(Validator.VALID)
+            .build();
+
+    public static final PropertyDescriptor SCHEMA_PATTERN = new PropertyDescriptor.Builder()
+            .name("list-db-tables-schema-pattern")
+            .displayName("Schema Pattern")
+            .description("A pattern for matching schemas in the database. Within a pattern, \"%\" means match any substring of 0 or more characters, "
+                    + "and \"_\" means match any one character. The pattern must match the schema name as it is stored in the database. "
+                    + "If the property is not set, the schema name will not be used to narrow the search for tables. If the property is set to an empty string, "
+                    + "tables without a schema will be listed.")
+            .required(false)
+            .addValidator(Validator.VALID)
+            .build();
+
+    public static final PropertyDescriptor TABLE_NAME_PATTERN = new PropertyDescriptor.Builder()
+            .name("list-db-tables-name-pattern")
+            .displayName("Table Name Pattern")
+            .description("A pattern for matching tables in the database. Within a pattern, \"%\" means match any substring of 0 or more characters, "
+                    + "and \"_\" means match any one character. The pattern must match the table name as it is stored in the database. "
+                    + "If the property is not set, all tables will be retrieved.")
+            .required(false)
+            .addValidator(Validator.VALID)
+            .build();
+
+    public static final PropertyDescriptor TABLE_TYPES = new PropertyDescriptor.Builder()
+            .name("list-db-tables-types")
+            .displayName("Table Types")
+            .description("A comma-separated list of table types to include. For example, some databases support TABLE and VIEW types. If the property is not set, "
+                    + "tables of all types will be returned.")
+            .required(false)
+            .defaultValue("TABLE")
+            .addValidator(Validator.VALID)
+            .build();
+
+    public static final PropertyDescriptor INCLUDE_COUNT = new PropertyDescriptor.Builder()
+            .name("list-db-include-count")
+            .displayName("Include Count")
+            .description("Whether to include the table's row count as a flow file attribute. This affects performance as a database query will be generated "
+                    + "for each table in the retrieved list.")
+            .required(true)
+            .allowableValues("true", "false")
+            .defaultValue("false")
+            .build();
+
+    public static final PropertyDescriptor REFRESH_INTERVAL = new PropertyDescriptor.Builder()
+            .name("list-db-refresh-interval")
+            .displayName("Refresh Interval")
+            .description("The amount of time to elapse before resetting the processor state, thereby causing all current tables to be listed. "
+                    + "During this interval, the processor may continue to run, but tables that have already been listed will not be re-listed. However new/added "
+                    + "tables will be listed as the processor runs. A value of zero means the state will never be automatically reset, the user must "
+                    + "Clear State manually.")
+            .required(true)
+            .defaultValue("0 sec")
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .build();
+
+    private static final List<PropertyDescriptor> propertyDescriptors;
+    private static final Set<Relationship> relationships;
+
+    /*
+     * Will ensure that the list of property descriptors is build only once.
+     * Will also create a Set of relationships
+     */
+    static {
+        List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>();
+        _propertyDescriptors.add(DBCP_SERVICE);
+        _propertyDescriptors.add(CATALOG);
+        _propertyDescriptors.add(SCHEMA_PATTERN);
+        _propertyDescriptors.add(TABLE_NAME_PATTERN);
+        _propertyDescriptors.add(TABLE_TYPES);
+        _propertyDescriptors.add(INCLUDE_COUNT);
+        _propertyDescriptors.add(REFRESH_INTERVAL);
+        propertyDescriptors = Collections.unmodifiableList(_propertyDescriptors);
+
+        Set<Relationship> _relationships = new HashSet<>();
+        _relationships.add(REL_SUCCESS);
+        relationships = Collections.unmodifiableSet(_relationships);
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return propertyDescriptors;
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return relationships;
+    }
+
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
+        final ComponentLog logger = getLogger();
+        final DBCPService dbcpService = context.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class);
+        final String catalog = context.getProperty(CATALOG).getValue();
+        final String schemaPattern = context.getProperty(SCHEMA_PATTERN).getValue();
+        final String tableNamePattern = context.getProperty(TABLE_NAME_PATTERN).getValue();
+        final String[] tableTypes = context.getProperty(TABLE_TYPES).isSet()
+                ? context.getProperty(TABLE_TYPES).getValue().split("\\s*,\\s*")
+                : null;
+        final boolean includeCount = context.getProperty(INCLUDE_COUNT).asBoolean();
+        final long refreshInterval = context.getProperty(REFRESH_INTERVAL).asTimePeriod(TimeUnit.MILLISECONDS);
+
+        final StateManager stateManager = context.getStateManager();
+        final StateMap stateMap;
+        final Map<String, String> stateMapProperties;
+        try {
+            stateMap = stateManager.getState(Scope.CLUSTER);
+            stateMapProperties = new HashMap<>(stateMap.toMap());
+        } catch (IOException ioe) {
+            throw new ProcessException(ioe);
+        }
+
+        try (final Connection con = dbcpService.getConnection()) {
+
+            DatabaseMetaData dbMetaData = con.getMetaData();
+            ResultSet rs = dbMetaData.getTables(catalog, schemaPattern, tableNamePattern, tableTypes);
+            while (rs.next()) {
+                final String tableCatalog = rs.getString(1);
+                final String tableSchema = rs.getString(2);
+                final String tableName = rs.getString(3);
+                final String tableType = rs.getString(4);
+                final String tableRemarks = rs.getString(5);
+
+                // Build fully-qualified name
+                String fqn = Stream.of(tableCatalog, tableSchema, tableName)
+                        .filter(segment -> !StringUtils.isEmpty(segment))
+                        .collect(Collectors.joining("."));
+
+                String lastTimestampForTable = stateMapProperties.get(fqn);
+                boolean refreshTable = true;
+                try {
+                    // Refresh state if the interval has elapsed
+                    long lastRefreshed = -1;
+                    final long currentTime = System.currentTimeMillis();
+                    if (!StringUtils.isEmpty(lastTimestampForTable)) {
+                        lastRefreshed = Long.parseLong(lastTimestampForTable);
+                    }
+                    if (lastRefreshed == -1 || (refreshInterval > 0 && currentTime >= (lastRefreshed + refreshInterval))) {
+                        stateMapProperties.remove(lastTimestampForTable);
+                    } else {
+                        refreshTable = false;
+                    }
+                } catch (final NumberFormatException nfe) {
+                    getLogger().error("Failed to retrieve observed last table fetches from the State Manager. Will not perform "
+                            + "query until this is accomplished.", nfe);
+                    context.yield();
+                    return;
+                }
+                if (refreshTable) {
+                    FlowFile flowFile = session.create();
+                    logger.info("Found {}: {}", new Object[]{tableType, fqn});
+                    if (includeCount) {
+                        try (Statement st = con.createStatement()) {
+                            final String countQuery = "SELECT COUNT(1) FROM " + fqn;
+
+                            logger.debug("Executing query: {}", new Object[]{countQuery});
+                            ResultSet countResult = st.executeQuery(countQuery);
+                            if (countResult.next()) {
+                                flowFile = session.putAttribute(flowFile, DB_TABLE_COUNT, Long.toString(countResult.getLong(1)));
+                            }
+                        } catch (SQLException se) {
+                            logger.error("Couldn't get row count for {}", new Object[]{fqn});
+                            session.remove(flowFile);
+                            continue;
+                        }
+                    }
+                    if (tableCatalog != null) {
+                        flowFile = session.putAttribute(flowFile, DB_TABLE_CATALOG, tableCatalog);
+                    }
+                    if (tableSchema != null) {
+                        flowFile = session.putAttribute(flowFile, DB_TABLE_SCHEMA, tableSchema);
+                    }
+                    flowFile = session.putAttribute(flowFile, DB_TABLE_NAME, tableName);
+                    flowFile = session.putAttribute(flowFile, DB_TABLE_FULLNAME, fqn);
+                    flowFile = session.putAttribute(flowFile, DB_TABLE_TYPE, tableType);
+                    if (tableRemarks != null) {
+                        flowFile = session.putAttribute(flowFile, DB_TABLE_REMARKS, tableRemarks);
+                    }
+
+                    String transitUri;
+                    try {
+                        transitUri = dbMetaData.getURL();
+                    } catch (SQLException sqle) {
+                        transitUri = "<unknown>";
+                    }
+                    session.getProvenanceReporter().receive(flowFile, transitUri);
+                    session.transfer(flowFile, REL_SUCCESS);
+                    stateMapProperties.put(fqn, Long.toString(System.currentTimeMillis()));
+                }
+            }
+            // Update the timestamps for listed tables
+            if (stateMap.getVersion() == -1) {
+                stateManager.setState(stateMapProperties, Scope.CLUSTER);
+            } else {
+                stateManager.replace(stateMap, stateMapProperties, Scope.CLUSTER);
+            }
+
+        } catch (final SQLException | IOException e) {
+            throw new ProcessException(e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/f1ba2403/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index 2ff466e..d23d88d 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -46,6 +46,7 @@ org.apache.nifi.processors.standard.JoltTransformJSON
 org.apache.nifi.processors.standard.GenerateTableFetch
 org.apache.nifi.processors.standard.GetJMSQueue
 org.apache.nifi.processors.standard.GetJMSTopic
+org.apache.nifi.processors.standard.ListDatabaseTables
 org.apache.nifi.processors.standard.ListFile
 org.apache.nifi.processors.standard.ListenHTTP
 org.apache.nifi.processors.standard.ListenRELP

http://git-wip-us.apache.org/repos/asf/nifi/blob/f1ba2403/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListDatabaseTables.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListDatabaseTables.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListDatabaseTables.java
new file mode 100644
index 0000000..1ff39f7
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListDatabaseTables.java
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.dbcp.DBCPService;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.apache.nifi.util.file.FileUtils;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.SQLNonTransientConnectionException;
+import java.sql.Statement;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
+
+/**
+ * Unit tests for ListDatabaseTables processor.
+ */
+public class TestListDatabaseTables {
+
+    TestRunner runner;
+    ListDatabaseTables processor;
+
+    private final static String DB_LOCATION = "target/db_ldt";
+
+    @BeforeClass
+    public static void setupBeforeClass() throws IOException {
+        System.setProperty("derby.stream.error.file", "target/derby.log");
+
+        // remove previous test database, if any
+        final File dbLocation = new File(DB_LOCATION);
+        try {
+            FileUtils.deleteFile(dbLocation, true);
+        } catch (IOException ioe) {
+            // Do nothing, may not have existed
+        }
+    }
+
+    @AfterClass
+    public static void cleanUpAfterClass() throws Exception {
+        try {
+            DriverManager.getConnection("jdbc:derby:" + DB_LOCATION + ";shutdown=true");
+        } catch (SQLNonTransientConnectionException e) {
+            // Do nothing, this is what happens at Derby shutdown
+        }
+        // remove previous test database, if any
+        final File dbLocation = new File(DB_LOCATION);
+        try {
+            FileUtils.deleteFile(dbLocation, true);
+        } catch (IOException ioe) {
+            // Do nothing, may not have existed
+        }
+    }
+
+    @Before
+    public void setUp() throws Exception {
+        processor = new ListDatabaseTables();
+        final DBCPService dbcp = new DBCPServiceSimpleImpl();
+        final Map<String, String> dbcpProperties = new HashMap<>();
+
+        runner = TestRunners.newTestRunner(ListDatabaseTables.class);
+        runner.addControllerService("dbcp", dbcp, dbcpProperties);
+        runner.enableControllerService(dbcp);
+        runner.setProperty(ListDatabaseTables.DBCP_SERVICE, "dbcp");
+    }
+
+    @Test
+    public void testListTablesNoCount() throws Exception {
+
+        // load test data to database
+        final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
+        Statement stmt = con.createStatement();
+
+        try {
+            stmt.execute("drop table TEST_TABLE1");
+            stmt.execute("drop table TEST_TABLE2");
+        } catch (final SQLException sqle) {
+            // Do nothing, may not have existed
+        }
+
+        stmt.execute("create table TEST_TABLE1 (id integer not null, val1 integer, val2 integer, constraint my_pk1 primary key (id))");
+        stmt.execute("create table TEST_TABLE2 (id integer not null, val1 integer, val2 integer, constraint my_pk2 primary key (id))");
+
+        runner.run();
+        runner.assertTransferCount(ListDatabaseTables.REL_SUCCESS, 2);
+        // Already got these tables, shouldn't get them again
+        runner.clearTransferState();
+        runner.run();
+        runner.assertTransferCount(ListDatabaseTables.REL_SUCCESS, 0);
+    }
+
+    @Test
+    public void testListTablesWithCount() throws Exception {
+        runner.setProperty(ListDatabaseTables.INCLUDE_COUNT, "true");
+
+        // load test data to database
+        final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
+        Statement stmt = con.createStatement();
+
+        try {
+            stmt.execute("drop table TEST_TABLE1");
+            stmt.execute("drop table TEST_TABLE2");
+        } catch (final SQLException sqle) {
+            // Do nothing, may not have existed
+        }
+
+        stmt.execute("create table TEST_TABLE1 (id integer not null, val1 integer, val2 integer, constraint my_pk1 primary key (id))");
+        stmt.execute("insert into TEST_TABLE1 (id, val1, val2) VALUES (0, NULL, 1)");
+        stmt.execute("insert into TEST_TABLE1 (id, val1, val2) VALUES (1, 1, 1)");
+        stmt.execute("create table TEST_TABLE2 (id integer not null, val1 integer, val2 integer, constraint my_pk2 primary key (id))");
+
+        runner.run();
+        runner.assertTransferCount(ListDatabaseTables.REL_SUCCESS, 2);
+        List<MockFlowFile> results = runner.getFlowFilesForRelationship(ListDatabaseTables.REL_SUCCESS);
+        assertEquals("2", results.get(0).getAttribute(ListDatabaseTables.DB_TABLE_COUNT));
+        assertEquals("0", results.get(1).getAttribute(ListDatabaseTables.DB_TABLE_COUNT));
+
+    }
+
+    @Test
+    public void testListTablesAfterRefresh() throws Exception {
+
+        // load test data to database
+        final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
+        Statement stmt = con.createStatement();
+
+        try {
+            stmt.execute("drop table TEST_TABLE1");
+            stmt.execute("drop table TEST_TABLE2");
+        } catch (final SQLException sqle) {
+            // Do nothing, may not have existed
+        }
+
+        stmt.execute("create table TEST_TABLE1 (id integer not null, val1 integer, val2 integer, constraint my_pk1 primary key (id))");
+        stmt.execute("insert into TEST_TABLE1 (id, val1, val2) VALUES (0, NULL, 1)");
+        stmt.execute("insert into TEST_TABLE1 (id, val1, val2) VALUES (1, 1, 1)");
+        stmt.execute("create table TEST_TABLE2 (id integer not null, val1 integer, val2 integer, constraint my_pk2 primary key (id))");
+        stmt.close();
+
+        runner.setProperty(ListDatabaseTables.INCLUDE_COUNT, "true");
+        runner.setProperty(ListDatabaseTables.REFRESH_INTERVAL, "100 millis");
+        runner.run();
+        runner.assertTransferCount(ListDatabaseTables.REL_SUCCESS, 2);
+        List<MockFlowFile> results = runner.getFlowFilesForRelationship(ListDatabaseTables.REL_SUCCESS);
+        assertEquals("2", results.get(0).getAttribute(ListDatabaseTables.DB_TABLE_COUNT));
+        assertEquals("0", results.get(1).getAttribute(ListDatabaseTables.DB_TABLE_COUNT));
+        runner.clearTransferState();
+        runner.run();
+        runner.assertTransferCount(ListDatabaseTables.REL_SUCCESS, 0);
+
+        // Now wait longer than 100 millis and assert the refresh has happened (the two tables are re-listed)
+        Thread.sleep(200);
+        runner.run();
+        runner.assertTransferCount(ListDatabaseTables.REL_SUCCESS, 2);
+    }
+
+    @Test
+    public void testListTablesMultipleRefresh() throws Exception {
+
+        // load test data to database
+        final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection();
+        Statement stmt = con.createStatement();
+
+        try {
+            stmt.execute("drop table TEST_TABLE1");
+            stmt.execute("drop table TEST_TABLE2");
+        } catch (final SQLException sqle) {
+            // Do nothing, may not have existed
+        }
+
+        stmt.execute("create table TEST_TABLE1 (id integer not null, val1 integer, val2 integer, constraint my_pk1 primary key (id))");
+        stmt.execute("insert into TEST_TABLE1 (id, val1, val2) VALUES (0, NULL, 1)");
+        stmt.execute("insert into TEST_TABLE1 (id, val1, val2) VALUES (1, 1, 1)");
+
+        runner.setProperty(ListDatabaseTables.INCLUDE_COUNT, "true");
+        runner.setProperty(ListDatabaseTables.REFRESH_INTERVAL, "200 millis");
+        runner.run();
+        runner.assertTransferCount(ListDatabaseTables.REL_SUCCESS, 1);
+        List<MockFlowFile> results = runner.getFlowFilesForRelationship(ListDatabaseTables.REL_SUCCESS);
+        assertEquals("2", results.get(0).getAttribute(ListDatabaseTables.DB_TABLE_COUNT));
+        runner.clearTransferState();
+
+        // Add another table immediately, the first table should not be listed again but the second should
+        stmt.execute("create table TEST_TABLE2 (id integer not null, val1 integer, val2 integer, constraint my_pk2 primary key (id))");
+        stmt.close();
+        runner.run();
+        runner.assertTransferCount(ListDatabaseTables.REL_SUCCESS, 1);
+        results = runner.getFlowFilesForRelationship(ListDatabaseTables.REL_SUCCESS);
+        assertEquals("0", results.get(0).getAttribute(ListDatabaseTables.DB_TABLE_COUNT));
+        runner.clearTransferState();
+
+        // Now wait longer than the refresh interval and assert the refresh has happened (i.e. the two tables are re-listed)
+        Thread.sleep(500);
+        runner.run();
+        runner.assertTransferCount(ListDatabaseTables.REL_SUCCESS, 2);
+    }
+
+    /**
+     * Simple implementation only for ListDatabaseTables processor testing.
+     */
+    private class DBCPServiceSimpleImpl extends AbstractControllerService implements DBCPService {
+
+        @Override
+        public String getIdentifier() {
+            return "dbcp";
+        }
+
+        @Override
+        public Connection getConnection() throws ProcessException {
+            try {
+                Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
+                return DriverManager.getConnection("jdbc:derby:" + DB_LOCATION + ";create=true");
+            } catch (final Exception e) {
+                throw new ProcessException("getConnection failed: " + e);
+            }
+        }
+    }
+}
\ No newline at end of file