You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by mattyb149 <gi...@git.apache.org> on 2016/07/13 16:06:05 UTC

[GitHub] nifi pull request #642: NIFI-2156: Add ListDatabaseTables processor

GitHub user mattyb149 opened a pull request:

    https://github.com/apache/nifi/pull/642

    NIFI-2156: Add ListDatabaseTables processor

    

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/mattyb149/nifi NIFI-2156

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/nifi/pull/642.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #642
    
----
commit b33817d90cee0514deb4c9bd71b57852e0d8b02e
Author: Matt Burgess <ma...@apache.org>
Date:   2016-07-07T02:34:37Z

    NIFI-2156: Add ListDatabaseTables processor

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi issue #642: NIFI-2156: Add ListDatabaseTables processor

Posted by mattyb149 <gi...@git.apache.org>.
Github user mattyb149 commented on the issue:

    https://github.com/apache/nifi/pull/642
  
    Yes, at one point I had a "Refresh Interval" property but I think that was in another branch, will restore it. Also, currently any change to properties will reset the state (since the tables fetched may have changed), I'm thinking of taking that part out. The Refresh Interval would cause all tables to be re-fetched, and/or the user could always manually clear state. What do you think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #642: NIFI-2156: Add ListDatabaseTables processor

Posted by JPercivall <gi...@git.apache.org>.
Github user JPercivall commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/642#discussion_r71348259
  
    --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListDatabaseTables.java ---
    @@ -304,7 +304,6 @@ public void onTrigger(ProcessContext context, ProcessSession session) throws Pro
                     }
                 }
                 // Update the last time the processor finished successfully
    --- End diff --
    
    I believe this can be removed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #642: NIFI-2156: Add ListDatabaseTables processor

Posted by mattyb149 <gi...@git.apache.org>.
Github user mattyb149 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/642#discussion_r71093459
  
    --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListDatabaseTables.java ---
    @@ -0,0 +1,304 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.standard;
    +
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.Stateful;
    +import org.apache.nifi.annotation.behavior.TriggerSerially;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.components.state.Scope;
    +import org.apache.nifi.components.state.StateManager;
    +import org.apache.nifi.components.state.StateMap;
    +import org.apache.nifi.dbcp.DBCPService;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.logging.ComponentLog;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.util.StringUtils;
    +
    +import java.io.IOException;
    +import java.sql.Connection;
    +import java.sql.DatabaseMetaData;
    +import java.sql.ResultSet;
    +import java.sql.SQLException;
    +import java.sql.Statement;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.stream.Collectors;
    +import java.util.stream.Stream;
    +
    +/**
    + * A processor to retrieve a list of tables (and their metadata) from a database connection
    + */
    +@TriggerSerially
    +@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
    +@Tags({"sql", "list", "jdbc", "table", "database"})
    +@CapabilityDescription("Generates a set of flow files, each containing attributes corresponding to metadata about a table from a database connection.")
    +@WritesAttributes({
    +        @WritesAttribute(attribute = "db.table.name", description = "Contains the name of a database table from the connection"),
    +        @WritesAttribute(attribute = "db.table.catalog", description = "Contains the name of the catalog to which the table belongs (may be null)"),
    +        @WritesAttribute(attribute = "db.table.schema", description = "Contains the name of the schema to which the table belongs (may be null)"),
    +        @WritesAttribute(attribute = "db.table.fullname", description = "Contains the fully-qualifed table name (possibly including catalog, schema, etc.)"),
    +        @WritesAttribute(attribute = "db.table.type",
    +                description = "Contains the type of the database table from the connection. Typical types are \"TABLE\", \"VIEW\", \"SYSTEM TABLE\", "
    +                        + "\"GLOBAL TEMPORARY\", \"LOCAL TEMPORARY\", \"ALIAS\", \"SYNONYM\""),
    +        @WritesAttribute(attribute = "db.table.remarks", description = "Contains the name of a database table from the connection"),
    +        @WritesAttribute(attribute = "db.table.count", description = "Contains the number of rows in the table")
    +})
    +@Stateful(scopes = {Scope.LOCAL}, description = "After performing a listing of tables, the timestamp of the query is stored. "
    +        + "This allows the Processor to not re-list tables the next time that the Processor is run. Changing any of the processor properties will "
    +        + "indicate that the processor should reset state and thus re-list the tables using the new configuration. This processor is meant to be "
    +        + "run on the primary node only.")
    +public class ListDatabaseTables extends AbstractProcessor {
    +
    +    // Attribute names
    +    public static final String DB_TABLE_NAME = "db.table.name";
    +    public static final String DB_TABLE_CATALOG = "db.table.catalog";
    +    public static final String DB_TABLE_SCHEMA = "db.table.schema";
    +    public static final String DB_TABLE_FULLNAME = "db.table.fullname";
    +    public static final String DB_TABLE_TYPE = "db.table.type";
    +    public static final String DB_TABLE_REMARKS = "db.table.remarks";
    +    public static final String DB_TABLE_COUNT = "db.table.count";
    +
    +    // Relationships
    +    public static final Relationship REL_SUCCESS = new Relationship.Builder()
    +            .name("success")
    +            .description("All FlowFiles that are received are routed to success")
    +            .build();
    +
    +    // Property descriptors
    +    public static final PropertyDescriptor DBCP_SERVICE = new PropertyDescriptor.Builder()
    +            .name("list-db-tables-db-connection")
    +            .displayName("Database Connection Pooling Service")
    +            .description("The Controller Service that is used to obtain connection to database")
    +            .required(true)
    +            .identifiesControllerService(DBCPService.class)
    +            .build();
    +
    +    public static final PropertyDescriptor CATALOG = new PropertyDescriptor.Builder()
    +            .name("list-db-tables-catalog")
    +            .displayName("Catalog")
    +            .description("The name of a catalog from which to list database tables. The name must match the catalog name as it is stored in the database. "
    +                    + "If the property is not set, the catalog name will not be used to narrow the search for tables. If the property is set to an empty string, "
    +                    + "tables without a catalog will be listed.")
    +            .required(false)
    +            .addValidator(Validator.VALID)
    +            .build();
    +
    +    public static final PropertyDescriptor SCHEMA_PATTERN = new PropertyDescriptor.Builder()
    +            .name("list-db-tables-schema-pattern")
    +            .displayName("Schema Pattern")
    +            .description("A pattern for matching schemas in the database. Within a pattern, \"%\" means match any substring of 0 or more characters, "
    +                    + "and \"_\" means match any one character. The pattern must match the schema name as it is stored in the database. "
    +                    + "If the property is not set, the schema name will not be used to narrow the search for tables. If the property is set to an empty string, "
    +                    + "tables without a schema will be listed.")
    +            .required(false)
    +            .addValidator(Validator.VALID)
    +            .build();
    +
    +    public static final PropertyDescriptor TABLE_NAME_PATTERN = new PropertyDescriptor.Builder()
    +            .name("list-db-tables-name-pattern")
    +            .displayName("Table Name Pattern")
    +            .description("A pattern for matching tables in the database. Within a pattern, \"%\" means match any substring of 0 or more characters, "
    +                    + "and \"_\" means match any one character. The pattern must match the table name as it is stored in the database. "
    +                    + "If the property is not set, all tables will be retrieved.")
    +            .required(false)
    +            .addValidator(Validator.VALID)
    +            .build();
    +
    +    public static final PropertyDescriptor TABLE_TYPES = new PropertyDescriptor.Builder()
    +            .name("list-db-tables-types")
    +            .displayName("Table Types")
    +            .description("A comma-separated list of table types to include. For example, some databases support TABLE and VIEW types. If the property is not set, "
    +                    + "tables of all types will be returned.")
    +            .required(false)
    +            .defaultValue("TABLE")
    +            .addValidator(Validator.VALID)
    +            .build();
    +
    +    public static final PropertyDescriptor INCLUDE_COUNT = new PropertyDescriptor.Builder()
    +            .name("list-db-include-count")
    +            .displayName("Include Count")
    +            .description("Whether to include the table's row count as a flow file attribute. This affects performance as a database query will be generated "
    +                    + "for each table in the retrieved list.")
    +            .required(true)
    +            .allowableValues("true", "false")
    +            .defaultValue("false")
    +            .build();
    +
    +    private static final List<PropertyDescriptor> propertyDescriptors;
    +    private static final Set<Relationship> relationships;
    +
    +    private boolean resetState = false;
    +
    +    /*
    +     * Will ensure that the list of property descriptors is build only once.
    +     * Will also create a Set of relationships
    +     */
    +    static {
    +        List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>();
    +        _propertyDescriptors.add(DBCP_SERVICE);
    +        _propertyDescriptors.add(CATALOG);
    +        _propertyDescriptors.add(SCHEMA_PATTERN);
    +        _propertyDescriptors.add(TABLE_NAME_PATTERN);
    +        _propertyDescriptors.add(TABLE_TYPES);
    +        _propertyDescriptors.add(INCLUDE_COUNT);
    +        propertyDescriptors = Collections.unmodifiableList(_propertyDescriptors);
    +
    +        Set<Relationship> _relationships = new HashSet<>();
    +        _relationships.add(REL_SUCCESS);
    +        relationships = Collections.unmodifiableSet(_relationships);
    +    }
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return propertyDescriptors;
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return relationships;
    +    }
    +
    +    @OnScheduled
    +    public void setup(ProcessContext context) {
    +        try {
    +            if (resetState) {
    +                context.getStateManager().clear(Scope.LOCAL);
    +                resetState = false;
    +            }
    +        } catch (IOException ioe) {
    +            throw new ProcessException(ioe);
    +        }
    +    }
    +
    +    @Override
    +    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
    +        final ComponentLog logger = getLogger();
    +        final DBCPService dbcpService = context.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class);
    +        final String catalog = context.getProperty(CATALOG).getValue();
    +        final String schemaPattern = context.getProperty(SCHEMA_PATTERN).getValue();
    +        final String tableNamePattern = context.getProperty(TABLE_NAME_PATTERN).getValue();
    +        final String[] tableTypes = context.getProperty(TABLE_TYPES).isSet()
    +                ? context.getProperty(TABLE_TYPES).getValue().split("\\s*,\\s*")
    +                : null;
    +        final boolean includeCount = context.getProperty(INCLUDE_COUNT).asBoolean();
    +
    +        final StateManager stateManager = context.getStateManager();
    +        final StateMap stateMap;
    +        final Map<String, String> stateMapProperties;
    +        try {
    +            stateMap = stateManager.getState(Scope.LOCAL);
    +            stateMapProperties = new HashMap<>(stateMap.toMap());
    +        } catch (IOException ioe) {
    +            throw new ProcessException(ioe);
    +        }
    +
    +        try (final Connection con = dbcpService.getConnection()) {
    +
    +            DatabaseMetaData dbMetaData = con.getMetaData();
    +            ResultSet rs = dbMetaData.getTables(catalog, schemaPattern, tableNamePattern, tableTypes);
    +            while (rs.next()) {
    +                final String tableCatalog = rs.getString(1);
    +                final String tableSchema = rs.getString(2);
    +                final String tableName = rs.getString(3);
    +                final String tableType = rs.getString(4);
    +                final String tableRemarks = rs.getString(5);
    +
    +                // Build fully-qualified name
    +                String fqn = Stream.of(tableCatalog, tableSchema, tableName)
    +                        .filter(segment -> !StringUtils.isEmpty(segment))
    +                        .collect(Collectors.joining("."));
    +
    +                String fqTableName = stateMap.get(fqn);
    +                if (fqTableName == null) {
    +                    FlowFile flowFile = session.create();
    +                    logger.info("Found {}: {}", new Object[]{tableType, fqn});
    +                    if (includeCount) {
    +                        try (Statement st = con.createStatement()) {
    +                            final String countQuery = "SELECT COUNT(1) FROM " + fqn;
    +
    +                            logger.debug("Executing query: {}", new Object[]{countQuery});
    +                            ResultSet countResult = st.executeQuery(countQuery);
    +                            if (countResult.next()) {
    +                                flowFile = session.putAttribute(flowFile, DB_TABLE_COUNT, Long.toString(countResult.getLong(1)));
    +                            }
    +                        } catch (SQLException se) {
    +                            logger.error("Couldn't get row count for {}", new Object[]{fqn});
    +                            session.remove(flowFile);
    --- End diff --
    
    It's a data generator, so not sure what counts as data loss. I could add a "failure" relationship but I didn't see any precedence for transferring newly-created flowfiles.
    If you mean that an error getting the count shouldn't prevent the flow file (minus the count) from being transferred, then yeah I can make that change. Just wasn't sure from a UX perspective if they want a count and can't get one, should the flow file be transferred?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #642: NIFI-2156: Add ListDatabaseTables processor

Posted by JPercivall <gi...@git.apache.org>.
Github user JPercivall commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/642#discussion_r71091358
  
    --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListDatabaseTables.java ---
    @@ -0,0 +1,304 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.standard;
    +
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.Stateful;
    +import org.apache.nifi.annotation.behavior.TriggerSerially;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.components.state.Scope;
    +import org.apache.nifi.components.state.StateManager;
    +import org.apache.nifi.components.state.StateMap;
    +import org.apache.nifi.dbcp.DBCPService;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.logging.ComponentLog;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.util.StringUtils;
    +
    +import java.io.IOException;
    +import java.sql.Connection;
    +import java.sql.DatabaseMetaData;
    +import java.sql.ResultSet;
    +import java.sql.SQLException;
    +import java.sql.Statement;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.stream.Collectors;
    +import java.util.stream.Stream;
    +
    +/**
    + * A processor to retrieve a list of tables (and their metadata) from a database connection
    + */
    +@TriggerSerially
    +@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
    +@Tags({"sql", "list", "jdbc", "table", "database"})
    +@CapabilityDescription("Generates a set of flow files, each containing attributes corresponding to metadata about a table from a database connection.")
    +@WritesAttributes({
    +        @WritesAttribute(attribute = "db.table.name", description = "Contains the name of a database table from the connection"),
    +        @WritesAttribute(attribute = "db.table.catalog", description = "Contains the name of the catalog to which the table belongs (may be null)"),
    +        @WritesAttribute(attribute = "db.table.schema", description = "Contains the name of the schema to which the table belongs (may be null)"),
    +        @WritesAttribute(attribute = "db.table.fullname", description = "Contains the fully-qualifed table name (possibly including catalog, schema, etc.)"),
    +        @WritesAttribute(attribute = "db.table.type",
    +                description = "Contains the type of the database table from the connection. Typical types are \"TABLE\", \"VIEW\", \"SYSTEM TABLE\", "
    +                        + "\"GLOBAL TEMPORARY\", \"LOCAL TEMPORARY\", \"ALIAS\", \"SYNONYM\""),
    +        @WritesAttribute(attribute = "db.table.remarks", description = "Contains the name of a database table from the connection"),
    +        @WritesAttribute(attribute = "db.table.count", description = "Contains the number of rows in the table")
    +})
    +@Stateful(scopes = {Scope.LOCAL}, description = "After performing a listing of tables, the timestamp of the query is stored. "
    +        + "This allows the Processor to not re-list tables the next time that the Processor is run. Changing any of the processor properties will "
    +        + "indicate that the processor should reset state and thus re-list the tables using the new configuration. This processor is meant to be "
    +        + "run on the primary node only.")
    +public class ListDatabaseTables extends AbstractProcessor {
    +
    +    // Attribute names
    +    public static final String DB_TABLE_NAME = "db.table.name";
    +    public static final String DB_TABLE_CATALOG = "db.table.catalog";
    +    public static final String DB_TABLE_SCHEMA = "db.table.schema";
    +    public static final String DB_TABLE_FULLNAME = "db.table.fullname";
    +    public static final String DB_TABLE_TYPE = "db.table.type";
    +    public static final String DB_TABLE_REMARKS = "db.table.remarks";
    +    public static final String DB_TABLE_COUNT = "db.table.count";
    +
    +    // Relationships
    +    public static final Relationship REL_SUCCESS = new Relationship.Builder()
    +            .name("success")
    +            .description("All FlowFiles that are received are routed to success")
    +            .build();
    +
    +    // Property descriptors
    +    public static final PropertyDescriptor DBCP_SERVICE = new PropertyDescriptor.Builder()
    +            .name("list-db-tables-db-connection")
    +            .displayName("Database Connection Pooling Service")
    +            .description("The Controller Service that is used to obtain connection to database")
    +            .required(true)
    +            .identifiesControllerService(DBCPService.class)
    +            .build();
    +
    +    public static final PropertyDescriptor CATALOG = new PropertyDescriptor.Builder()
    +            .name("list-db-tables-catalog")
    +            .displayName("Catalog")
    +            .description("The name of a catalog from which to list database tables. The name must match the catalog name as it is stored in the database. "
    +                    + "If the property is not set, the catalog name will not be used to narrow the search for tables. If the property is set to an empty string, "
    +                    + "tables without a catalog will be listed.")
    +            .required(false)
    +            .addValidator(Validator.VALID)
    +            .build();
    +
    +    public static final PropertyDescriptor SCHEMA_PATTERN = new PropertyDescriptor.Builder()
    +            .name("list-db-tables-schema-pattern")
    +            .displayName("Schema Pattern")
    +            .description("A pattern for matching schemas in the database. Within a pattern, \"%\" means match any substring of 0 or more characters, "
    +                    + "and \"_\" means match any one character. The pattern must match the schema name as it is stored in the database. "
    +                    + "If the property is not set, the schema name will not be used to narrow the search for tables. If the property is set to an empty string, "
    +                    + "tables without a schema will be listed.")
    +            .required(false)
    +            .addValidator(Validator.VALID)
    +            .build();
    +
    +    public static final PropertyDescriptor TABLE_NAME_PATTERN = new PropertyDescriptor.Builder()
    +            .name("list-db-tables-name-pattern")
    +            .displayName("Table Name Pattern")
    +            .description("A pattern for matching tables in the database. Within a pattern, \"%\" means match any substring of 0 or more characters, "
    +                    + "and \"_\" means match any one character. The pattern must match the table name as it is stored in the database. "
    +                    + "If the property is not set, all tables will be retrieved.")
    +            .required(false)
    +            .addValidator(Validator.VALID)
    +            .build();
    +
    +    public static final PropertyDescriptor TABLE_TYPES = new PropertyDescriptor.Builder()
    +            .name("list-db-tables-types")
    +            .displayName("Table Types")
    +            .description("A comma-separated list of table types to include. For example, some databases support TABLE and VIEW types. If the property is not set, "
    +                    + "tables of all types will be returned.")
    +            .required(false)
    +            .defaultValue("TABLE")
    +            .addValidator(Validator.VALID)
    +            .build();
    +
    +    public static final PropertyDescriptor INCLUDE_COUNT = new PropertyDescriptor.Builder()
    +            .name("list-db-include-count")
    +            .displayName("Include Count")
    +            .description("Whether to include the table's row count as a flow file attribute. This affects performance as a database query will be generated "
    +                    + "for each table in the retrieved list.")
    +            .required(true)
    +            .allowableValues("true", "false")
    +            .defaultValue("false")
    +            .build();
    +
    +    private static final List<PropertyDescriptor> propertyDescriptors;
    +    private static final Set<Relationship> relationships;
    +
    +    private boolean resetState = false;
    +
    +    /*
    +     * Will ensure that the list of property descriptors is build only once.
    +     * Will also create a Set of relationships
    +     */
    +    static {
    +        List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>();
    +        _propertyDescriptors.add(DBCP_SERVICE);
    +        _propertyDescriptors.add(CATALOG);
    +        _propertyDescriptors.add(SCHEMA_PATTERN);
    +        _propertyDescriptors.add(TABLE_NAME_PATTERN);
    +        _propertyDescriptors.add(TABLE_TYPES);
    +        _propertyDescriptors.add(INCLUDE_COUNT);
    +        propertyDescriptors = Collections.unmodifiableList(_propertyDescriptors);
    +
    +        Set<Relationship> _relationships = new HashSet<>();
    +        _relationships.add(REL_SUCCESS);
    +        relationships = Collections.unmodifiableSet(_relationships);
    +    }
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return propertyDescriptors;
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return relationships;
    +    }
    +
    +    @OnScheduled
    +    public void setup(ProcessContext context) {
    +        try {
    +            if (resetState) {
    +                context.getStateManager().clear(Scope.LOCAL);
    +                resetState = false;
    +            }
    +        } catch (IOException ioe) {
    +            throw new ProcessException(ioe);
    +        }
    +    }
    +
    +    @Override
    +    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
    +        final ComponentLog logger = getLogger();
    +        final DBCPService dbcpService = context.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class);
    +        final String catalog = context.getProperty(CATALOG).getValue();
    +        final String schemaPattern = context.getProperty(SCHEMA_PATTERN).getValue();
    +        final String tableNamePattern = context.getProperty(TABLE_NAME_PATTERN).getValue();
    +        final String[] tableTypes = context.getProperty(TABLE_TYPES).isSet()
    +                ? context.getProperty(TABLE_TYPES).getValue().split("\\s*,\\s*")
    +                : null;
    +        final boolean includeCount = context.getProperty(INCLUDE_COUNT).asBoolean();
    +
    +        final StateManager stateManager = context.getStateManager();
    +        final StateMap stateMap;
    +        final Map<String, String> stateMapProperties;
    +        try {
    +            stateMap = stateManager.getState(Scope.LOCAL);
    +            stateMapProperties = new HashMap<>(stateMap.toMap());
    +        } catch (IOException ioe) {
    +            throw new ProcessException(ioe);
    +        }
    +
    +        try (final Connection con = dbcpService.getConnection()) {
    +
    +            DatabaseMetaData dbMetaData = con.getMetaData();
    +            ResultSet rs = dbMetaData.getTables(catalog, schemaPattern, tableNamePattern, tableTypes);
    +            while (rs.next()) {
    +                final String tableCatalog = rs.getString(1);
    +                final String tableSchema = rs.getString(2);
    +                final String tableName = rs.getString(3);
    +                final String tableType = rs.getString(4);
    +                final String tableRemarks = rs.getString(5);
    +
    +                // Build fully-qualified name
    +                String fqn = Stream.of(tableCatalog, tableSchema, tableName)
    +                        .filter(segment -> !StringUtils.isEmpty(segment))
    +                        .collect(Collectors.joining("."));
    +
    +                String fqTableName = stateMap.get(fqn);
    +                if (fqTableName == null) {
    +                    FlowFile flowFile = session.create();
    +                    logger.info("Found {}: {}", new Object[]{tableType, fqn});
    +                    if (includeCount) {
    +                        try (Statement st = con.createStatement()) {
    +                            final String countQuery = "SELECT COUNT(1) FROM " + fqn;
    +
    +                            logger.debug("Executing query: {}", new Object[]{countQuery});
    +                            ResultSet countResult = st.executeQuery(countQuery);
    +                            if (countResult.next()) {
    +                                flowFile = session.putAttribute(flowFile, DB_TABLE_COUNT, Long.toString(countResult.getLong(1)));
    +                            }
    +                        } catch (SQLException se) {
    +                            logger.error("Couldn't get row count for {}", new Object[]{fqn});
    +                            session.remove(flowFile);
    --- End diff --
    
    Wouldn't removing a flowfile essentially skip what ever information would've been in that row, leading to data loss?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #642: NIFI-2156: Add ListDatabaseTables processor

Posted by mattyb149 <gi...@git.apache.org>.
Github user mattyb149 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/642#discussion_r71249117
  
    --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListDatabaseTables.java ---
    @@ -211,17 +213,37 @@ public void onTrigger(ProcessContext context, ProcessSession session) throws Pro
                     ? context.getProperty(TABLE_TYPES).getValue().split("\\s*,\\s*")
                     : null;
             final boolean includeCount = context.getProperty(INCLUDE_COUNT).asBoolean();
    +        final long refreshInterval = context.getProperty(REFRESH_INTERVAL).asTimePeriod(TimeUnit.MILLISECONDS);
     
             final StateManager stateManager = context.getStateManager();
             final StateMap stateMap;
             final Map<String, String> stateMapProperties;
             try {
    -            stateMap = stateManager.getState(Scope.LOCAL);
    +            stateMap = stateManager.getState(Scope.CLUSTER);
                 stateMapProperties = new HashMap<>(stateMap.toMap());
             } catch (IOException ioe) {
                 throw new ProcessException(ioe);
             }
     
    +        try {
    +            // Refresh state if the interval has elapsed
    +            long lastRefreshed = -1;
    +            final long currentTime = System.currentTimeMillis();
    +            String lastTimestamp = stateMapProperties.get(this.getIdentifier());
    +            if (!StringUtils.isEmpty(lastTimestamp)) {
    +                lastRefreshed = Long.parseLong(lastTimestamp);
    +            }
    +            if (lastRefreshed > 0 && refreshInterval > 0 && currentTime >= (lastRefreshed + refreshInterval)) {
    +                stateManager.clear(Scope.CLUSTER);
    +                stateMapProperties.clear();
    --- End diff --
    
    Good point, will add.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #642: NIFI-2156: Add ListDatabaseTables processor

Posted by JPercivall <gi...@git.apache.org>.
Github user JPercivall commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/642#discussion_r71242383
  
    --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListDatabaseTables.java ---
    @@ -211,17 +213,37 @@ public void onTrigger(ProcessContext context, ProcessSession session) throws Pro
                     ? context.getProperty(TABLE_TYPES).getValue().split("\\s*,\\s*")
                     : null;
             final boolean includeCount = context.getProperty(INCLUDE_COUNT).asBoolean();
    +        final long refreshInterval = context.getProperty(REFRESH_INTERVAL).asTimePeriod(TimeUnit.MILLISECONDS);
     
             final StateManager stateManager = context.getStateManager();
             final StateMap stateMap;
             final Map<String, String> stateMapProperties;
             try {
    -            stateMap = stateManager.getState(Scope.LOCAL);
    +            stateMap = stateManager.getState(Scope.CLUSTER);
                 stateMapProperties = new HashMap<>(stateMap.toMap());
             } catch (IOException ioe) {
                 throw new ProcessException(ioe);
             }
     
    +        try {
    +            // Refresh state if the interval has elapsed
    +            long lastRefreshed = -1;
    +            final long currentTime = System.currentTimeMillis();
    +            String lastTimestamp = stateMapProperties.get(this.getIdentifier());
    +            if (!StringUtils.isEmpty(lastTimestamp)) {
    +                lastRefreshed = Long.parseLong(lastTimestamp);
    +            }
    +            if (lastRefreshed > 0 && refreshInterval > 0 && currentTime >= (lastRefreshed + refreshInterval)) {
    +                stateManager.clear(Scope.CLUSTER);
    +                stateMapProperties.clear();
    --- End diff --
    
    Why clear all the properties at once and not just have an expiration time that applies to each table? Where if you have it set to 5 minutes, the processor will end up reporting the count of the table every minutes (while it still exists). You already store the timestamp under the FQN.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi issue #642: NIFI-2156: Add ListDatabaseTables processor

Posted by JPercivall <gi...@git.apache.org>.
Github user JPercivall commented on the issue:

    https://github.com/apache/nifi/pull/642
  
    Alright, after giving it some time and a cup of coffee I realize how off I was at first, lol. This processor reaches out to the DB asking for the tables. Then for each table that isn't already stored in state it creates a flowfile. If the processor is configured to give the count, it needs to send a SQL query asking for it. If that query fails it will remove the flowfile it created and continue onto the next table. If successful, the FQN of the table will then be added to state (after queuing it to transfer).
    
    That realization makes my comment about data loss void (was afraid it would get stored in state after un-successfully getting the count). 
    
    One new comment, would a user want to set an expiration for tables in state? That way they could get updates on the count of a table every X seconds/minutes. In it's current form it will get the table once but never again. You're already storing the timestamp as the value so it should be an easy addition.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #642: NIFI-2156: Add ListDatabaseTables processor

Posted by mattyb149 <gi...@git.apache.org>.
Github user mattyb149 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/642#discussion_r71091826
  
    --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListDatabaseTables.java ---
    @@ -0,0 +1,304 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.standard;
    +
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.Stateful;
    +import org.apache.nifi.annotation.behavior.TriggerSerially;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.components.state.Scope;
    +import org.apache.nifi.components.state.StateManager;
    +import org.apache.nifi.components.state.StateMap;
    +import org.apache.nifi.dbcp.DBCPService;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.logging.ComponentLog;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.util.StringUtils;
    +
    +import java.io.IOException;
    +import java.sql.Connection;
    +import java.sql.DatabaseMetaData;
    +import java.sql.ResultSet;
    +import java.sql.SQLException;
    +import java.sql.Statement;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.stream.Collectors;
    +import java.util.stream.Stream;
    +
    +/**
    + * A processor to retrieve a list of tables (and their metadata) from a database connection
    + */
    +@TriggerSerially
    +@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
    +@Tags({"sql", "list", "jdbc", "table", "database"})
    +@CapabilityDescription("Generates a set of flow files, each containing attributes corresponding to metadata about a table from a database connection.")
    +@WritesAttributes({
    +        @WritesAttribute(attribute = "db.table.name", description = "Contains the name of a database table from the connection"),
    +        @WritesAttribute(attribute = "db.table.catalog", description = "Contains the name of the catalog to which the table belongs (may be null)"),
    +        @WritesAttribute(attribute = "db.table.schema", description = "Contains the name of the schema to which the table belongs (may be null)"),
    +        @WritesAttribute(attribute = "db.table.fullname", description = "Contains the fully-qualifed table name (possibly including catalog, schema, etc.)"),
    +        @WritesAttribute(attribute = "db.table.type",
    +                description = "Contains the type of the database table from the connection. Typical types are \"TABLE\", \"VIEW\", \"SYSTEM TABLE\", "
    +                        + "\"GLOBAL TEMPORARY\", \"LOCAL TEMPORARY\", \"ALIAS\", \"SYNONYM\""),
    +        @WritesAttribute(attribute = "db.table.remarks", description = "Contains the name of a database table from the connection"),
    +        @WritesAttribute(attribute = "db.table.count", description = "Contains the number of rows in the table")
    +})
    +@Stateful(scopes = {Scope.LOCAL}, description = "After performing a listing of tables, the timestamp of the query is stored. "
    --- End diff --
    
    Wasn't sure about that but makes sense to me :) will change to cluster.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #642: NIFI-2156: Add ListDatabaseTables processor

Posted by JPercivall <gi...@git.apache.org>.
Github user JPercivall commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/642#discussion_r71347101
  
    --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListDatabaseTables.java ---
    @@ -260,8 +241,27 @@ public void onTrigger(ProcessContext context, ProcessSession session) throws Pro
                             .filter(segment -> !StringUtils.isEmpty(segment))
                             .collect(Collectors.joining("."));
     
    -                String fqTableName = stateMapProperties.get(fqn);
    -                if (fqTableName == null) {
    +                String lastTimestampForTable = stateMapProperties.get(fqn);
    +                boolean refreshTable = true;
    +                try {
    +                    // Refresh state if the interval has elapsed
    +                    long lastRefreshed = -1;
    +                    final long currentTime = System.currentTimeMillis();
    +                    if (!StringUtils.isEmpty(lastTimestampForTable)) {
    +                        lastRefreshed = Long.parseLong(lastTimestampForTable);
    +                    }
    +                    if (lastRefreshed == -1 || (refreshInterval > 0 && currentTime >= (lastRefreshed + refreshInterval))) {
    +                        stateMapProperties.remove(lastTimestampForTable);
    +                    } else {
    +                        refreshTable = false;
    +                    }
    +                } catch (final NumberFormatException nfe) {
    +                    getLogger().error("Failed to retrieve observed last table fetches from the State Manager. Will not perform "
    --- End diff --
    
    This exception really *shouldn't* happen since we are setting the long ourselves but if it does it will fail entirely until state is cleared by the user. In addition to returning and yielding, it should probably clear the offending state entry (and log in the error message that this is hapenning and the ramifications). This will at least give the processor a chance to continue working if it every reaches this state. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #642: NIFI-2156: Add ListDatabaseTables processor

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/nifi/pull/642


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi issue #642: NIFI-2156: Add ListDatabaseTables processor

Posted by JPercivall <gi...@git.apache.org>.
Github user JPercivall commented on the issue:

    https://github.com/apache/nifi/pull/642
  
    +1
    
    Visually verified code and any comments were addressed. Ran a contrib check build and verified functionality in a standalone cluster hitting a MySQL DB. Thanks @mattyb149, I will squash and merge


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #642: NIFI-2156: Add ListDatabaseTables processor

Posted by JPercivall <gi...@git.apache.org>.
Github user JPercivall commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/642#discussion_r71372009
  
    --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListDatabaseTables.java ---
    @@ -303,8 +303,8 @@ public void onTrigger(ProcessContext context, ProcessSession session) throws Pro
                         stateMapProperties.put(fqn, Long.toString(System.currentTimeMillis()));
                     }
                 }
    -            // Update the last time the processor finished successfully
    -            stateManager.replace(stateMap, stateMapProperties, Scope.CLUSTER);
    +            // Update the timestamps for listed tables
    +            stateManager.setState(stateMapProperties, Scope.CLUSTER);
    --- End diff --
    
    I would not change this to setState. It will overwrite anything that is in State. If someone does end up running clustered and not primary Node it will blow it away without warnings. 
    
    Instead just check if the prior map version was -1 and do a set instead of replace.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] nifi pull request #642: NIFI-2156: Add ListDatabaseTables processor

Posted by JPercivall <gi...@git.apache.org>.
Github user JPercivall commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/642#discussion_r71091290
  
    --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListDatabaseTables.java ---
    @@ -0,0 +1,304 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.standard;
    +
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.Stateful;
    +import org.apache.nifi.annotation.behavior.TriggerSerially;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.components.state.Scope;
    +import org.apache.nifi.components.state.StateManager;
    +import org.apache.nifi.components.state.StateMap;
    +import org.apache.nifi.dbcp.DBCPService;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.logging.ComponentLog;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.util.StringUtils;
    +
    +import java.io.IOException;
    +import java.sql.Connection;
    +import java.sql.DatabaseMetaData;
    +import java.sql.ResultSet;
    +import java.sql.SQLException;
    +import java.sql.Statement;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.stream.Collectors;
    +import java.util.stream.Stream;
    +
    +/**
    + * A processor to retrieve a list of tables (and their metadata) from a database connection
    + */
    +@TriggerSerially
    +@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
    +@Tags({"sql", "list", "jdbc", "table", "database"})
    +@CapabilityDescription("Generates a set of flow files, each containing attributes corresponding to metadata about a table from a database connection.")
    +@WritesAttributes({
    +        @WritesAttribute(attribute = "db.table.name", description = "Contains the name of a database table from the connection"),
    +        @WritesAttribute(attribute = "db.table.catalog", description = "Contains the name of the catalog to which the table belongs (may be null)"),
    +        @WritesAttribute(attribute = "db.table.schema", description = "Contains the name of the schema to which the table belongs (may be null)"),
    +        @WritesAttribute(attribute = "db.table.fullname", description = "Contains the fully-qualifed table name (possibly including catalog, schema, etc.)"),
    +        @WritesAttribute(attribute = "db.table.type",
    +                description = "Contains the type of the database table from the connection. Typical types are \"TABLE\", \"VIEW\", \"SYSTEM TABLE\", "
    +                        + "\"GLOBAL TEMPORARY\", \"LOCAL TEMPORARY\", \"ALIAS\", \"SYNONYM\""),
    +        @WritesAttribute(attribute = "db.table.remarks", description = "Contains the name of a database table from the connection"),
    +        @WritesAttribute(attribute = "db.table.count", description = "Contains the number of rows in the table")
    +})
    +@Stateful(scopes = {Scope.LOCAL}, description = "After performing a listing of tables, the timestamp of the query is stored. "
    --- End diff --
    
    Shouldn't this be "cluster"? That way when primary node changes it will keep the same listing of tables


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---