You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@nifi.apache.org by Ramaninder Singh Jhajj <jh...@gmail.com> on 2018/03/14 00:30:36 UTC

Issues with QueryDatabaseTable processor's State

Hello Everyone,

I am facing an issue with QueryDatabaseTable processor.

I have 4 identical mySQL database tables on 4 different AWS instances. The
structure is same but data is different. What I am trying to do is, have 4
QueryDatabaseTable processors and fetch the data from all the 4 instances
to process it further and store in elasticsearch.


This is the structure of the flow, now my issue is:
*When any one of the processors run, it stores the "Maximum-value Columns"
value in the state as shown below and this state is global in the cluster. *



*​​*

*Now when second QueryDatabaseTable processor runs, it overwrites the state
value written by the first. I am facing issue with maintaining state for
all 4 processors. Processors runs fine without any issue but obviously, the
data being fetches is not consistent as "id" column values gets overwritten
in the state.*

Is there any solution to this problem. I need to do incremental fetch to
all 4 identical tables on 4 instances in a single flow and single cluster.

Please let me know if anyone faced similar problem and if there is any
solution for this.

Kind Regards,
Raman

Re: Issues with QueryDatabaseTable processor's State

Posted by Matt Burgess <ma...@apache.org>.
Marcio,

These were choices based on user experience (UX), there's always a tradeoff
between ease-of-use and flexibility. We could offer a property for a state
prefix (that uses Expression Language), please feel free to write a Jira
for that improvement. The original idea was just to specify a column that
would always increase, so future executions of the SQL statement would only
grab new rows.

Re: arbitrary queries, that has been proposed and a solution submitted [1].
In general, it seems a popular concept is that you can provide whatever
query you want, along with whatever columns you want to keep track of, and
somehow we'd always be able to generate the appropriate SQL, even with all
the discrepancies between drivers, dialects, etc. It seems easy to
implement (from a user perspective), but once you have to touch the SQL you
need to parse it, then you need to consider the dialects, etc. which is a
common problem for many software projects. Or else the user has to generate
the appropriate SQL but somehow specify which state is persisted for the
next run. Not saying it can't be done, but as you say it is meant for more
advanced users.

Basically, I think you're describing the convergence of ExecuteSQL with
QueryDatabaseTable, and IMO I think it might be better to have a third
processor that can do both, for the advanced users that don't mind fiddling
with configuration when things go wrong. The existing processors are for
the general user, to offer some incremental-fetch / layman-CDC capabilities
to cover a large number of basic-to-intermediate use cases.

Regards,
Matt

[1] https://github.com/apache/nifi/pull/2162

On Tue, Mar 13, 2018 at 10:33 PM, Márcio Faria <fa...@ymail.com>
wrote:

> Matt,
>
> Just curious: Why is it necessary to identify that global state using
> catalog, schema, table or column names? Those are database concepts. As a
> user, I'd prefer to have some flexibility here. Surely the component needs
> to know how to identify the database object, but beyond that, we should be
> free to choose how to identify that "global"state ourselves. Would that be
> feasible, maybe as an optional attribute that by default would be set with
> the database.table.column string?
>
> I have another example, this time regarding the "while" attribute of
> QueryDatabaseTable. Simply put, it's not enough. I have data that for some
> reason was originally inserted out of order into a source table (or the
> database just decided to show it like that), and without an "order by"
> clause I can't avoid having gaps in the data I'm trying to extract from it
> now. Not a total blocker since I could use a little SQL injection to work
> around the limitation, but it should not be like that. What if one needs a
> more elaborate "select", for example? I would be far preferable to trust
> the user and let them inform the whole SQL command, of course with
> placeholders so NiFi could properly set the values under its control. An
> optional attribute that would take precedence over the "while" when
> informed would be enough. I'd expect that kind of thing to be not so hard
> to implement, and it wold make the processor much more powerful for more
> advanced users without complicating the lives of those who are happy with a
> simpler configuration.
>
> In general, when dealing with SQL and databases, the less assumptions we
> make about how the component is going to be used, the better. SQL is very
> powerful, and its power should be fully available to this kind of
> processor, IMHO.
>
> What do you think?
>
> Thank you,
>
> Marcio
>
>
> On Tuesday, March 13, 2018 8:56 PM, Matt Burgess <ma...@apache.org>
> wrote:
>
>
> Raman,
>
> Originally, we stored state only based on column names, which could
> obviously cause problems when you have two different tables with the same
> column name. However this was because the original DB Fetch processors
> (QueryDatabaseTable, GenerateTableFetch, e.g.) did not accept incoming flow
> files and thus acted on a single table.  Then we added the ability for
> GenerateTableFetch to accept incoming flow files, and realized we should be
> storing state at least based on table name + column name, since GTF might
> get multiple tables in.  I believe you are running into the issue where
> we'd need to qualify the state based on database name + table name + column
> name, and please feel free to write up a Jira for that improvement.  The
> cutoff at table name was a tradeoff against complexity, as a database might
> not be fully-qualified by its name either (imagine multiple hosts with
> replicated DBs inside, then how do we know that two hosts don't point at
> the same place?).
>
> For your use case, I think we'd need to store state by the aforementioned
> "better-qualified" key, but that might be the limit to our name
> qualification. We will have to deal with backwards-compatibility as we did
> before we added table names, but since we have precedence I wouldn't think
> it would be too difficult to implement.
>
> As a workaround, you might try swapping QueryDatabaseTable with
> GenerateTableFetch, and trying to distribute the flow files for a
> particular DB to a particular instance of ExecuteSQL to actually fetch the
> rows. You should be able to use RouteOnAttribute for this, assuming your
> table name is an attribute on the flow file.
>
> Regards,
> Matt
>
>
> On Tue, Mar 13, 2018 at 8:30 PM, Ramaninder Singh Jhajj <
> jhajj.raman094@gmail.com> wrote:
>
> Hello Everyone,
>
> I am facing an issue with QueryDatabaseTable processor.
>
> I have 4 identical mySQL database tables on 4 different AWS instances. The
> structure is same but data is different. What I am trying to do is, have 4
> QueryDatabaseTable processors and fetch the data from all the 4 instances
> to process it further and store in elasticsearch.
>
>
> This is the structure of the flow, now my issue is:
> *When any one of the processors run, it stores the "Maximum-value Columns"
> value in the state as shown below and this state is global in the cluster. *
>
>
>
> *​​*
>
> *Now when second QueryDatabaseTable processor runs, it overwrites the
> state value written by the first. I am facing issue with maintaining state
> for all 4 processors. Processors runs fine without any issue but obviously,
> the data being fetches is not consistent as "id" column values gets
> overwritten in the state.*
>
> Is there any solution to this problem. I need to do incremental fetch to
> all 4 identical tables on 4 instances in a single flow and single cluster.
>
> Please let me know if anyone faced similar problem and if there is any
> solution for this.
>
> Kind Regards,
> Raman
>
>
>
>
>

Re: Issues with QueryDatabaseTable processor's State

Posted by Márcio Faria <fa...@ymail.com>.
Matt, 
Just curious: Why is it necessary to identify that global state using catalog, schema, table or column names? Those are database concepts. As a user, I'd prefer to have some flexibility here. Surely the component needs to know how to identify the database object, but beyond that, we should be free to choose how to identify that "global"state ourselves. Would that be feasible, maybe as an optional attribute that by default would be set with the database.table.column string?
I have another example, this time regarding the "while" attribute of QueryDatabaseTable. Simply put, it's not enough. I have data that for some reason was originally inserted out of order into a source table (or the database just decided to show it like that), and without an "order by" clause I can't avoid having gaps in the data I'm trying to extract from it now. Not a total blocker since I could use a little SQL injection to work around the limitation, but it should not be like that. What if one needs a more elaborate "select", for example? I would be far preferable to trust the user and let them inform the whole SQL command, of course with placeholders so NiFi could properly set the values under its control. An optional attribute that would take precedence over the "while" when informed would be enough. I'd expect that kind of thing to be not so hard to implement, and it wold make the processor much more powerful for more advanced users without complicating the lives of those who are happy with a simpler configuration.
In general, when dealing with SQL and databases, the less assumptions we make about how the component is going to be used, the better. SQL is very powerful, and its power should be fully available to this kind of processor, IMHO.
What do you think?
Thank you,

Marcio 

    On Tuesday, March 13, 2018 8:56 PM, Matt Burgess <ma...@apache.org> wrote:
 

 Raman,
Originally, we stored state only based on column names, which could obviously cause problems when you have two different tables with the same column name. However this was because the original DB Fetch processors (QueryDatabaseTable, GenerateTableFetch, e.g.) did not accept incoming flow files and thus acted on a single table.  Then we added the ability for GenerateTableFetch to accept incoming flow files, and realized we should be storing state at least based on table name + column name, since GTF might get multiple tables in.  I believe you are running into the issue where we'd need to qualify the state based on database name + table name + column name, and please feel free to write up a Jira for that improvement.  The cutoff at table name was a tradeoff against complexity, as a database might not be fully-qualified by its name either (imagine multiple hosts with replicated DBs inside, then how do we know that two hosts don't point at the same place?).
For your use case, I think we'd need to store state by the aforementioned "better-qualified" key, but that might be the limit to our name qualification. We will have to deal with backwards-compatibility as we did before we added table names, but since we have precedence I wouldn't think it would be too difficult to implement.
As a workaround, you might try swapping QueryDatabaseTable with GenerateTableFetch, and trying to distribute the flow files for a particular DB to a particular instance of ExecuteSQL to actually fetch the rows. You should be able to use RouteOnAttribute for this, assuming your table name is an attribute on the flow file.
Regards,Matt

On Tue, Mar 13, 2018 at 8:30 PM, Ramaninder Singh Jhajj <jh...@gmail.com> wrote:

Hello Everyone,
I am facing an issue with QueryDatabaseTable processor.
I have 4 identical mySQL database tables on 4 different AWS instances. The structure is same but data is different. What I am trying to do is, have 4 QueryDatabaseTable processors and fetch the data from all the 4 instances to process it further and store in elasticsearch. 

This is the structure of the flow, now my issue is:When any one of the processors run, it stores the "Maximum-value Columns" value in the state as shown below and this state is global in the cluster. 
​
​

Now when second QueryDatabaseTable processor runs, it overwrites the state value written by the first. I am facing issue with maintaining state for all 4 processors. Processors runs fine without any issue but obviously, the data being fetches is not consistent as "id" column values gets overwritten in the state.
Is there any solution to this problem. I need to do incremental fetch to all 4 identical tables on 4 instances in a single flow and single cluster.
Please let me know if anyone faced similar problem and if there is any solution for this.
Kind Regards,Raman



   

Re: Issues with QueryDatabaseTable processor's State

Posted by Matt Burgess <ma...@apache.org>.
Raman,

Originally, we stored state only based on column names, which could
obviously cause problems when you have two different tables with the same
column name. However this was because the original DB Fetch processors
(QueryDatabaseTable, GenerateTableFetch, e.g.) did not accept incoming flow
files and thus acted on a single table.  Then we added the ability for
GenerateTableFetch to accept incoming flow files, and realized we should be
storing state at least based on table name + column name, since GTF might
get multiple tables in.  I believe you are running into the issue where
we'd need to qualify the state based on database name + table name + column
name, and please feel free to write up a Jira for that improvement.  The
cutoff at table name was a tradeoff against complexity, as a database might
not be fully-qualified by its name either (imagine multiple hosts with
replicated DBs inside, then how do we know that two hosts don't point at
the same place?).

For your use case, I think we'd need to store state by the aforementioned
"better-qualified" key, but that might be the limit to our name
qualification. We will have to deal with backwards-compatibility as we did
before we added table names, but since we have precedence I wouldn't think
it would be too difficult to implement.

As a workaround, you might try swapping QueryDatabaseTable with
GenerateTableFetch, and trying to distribute the flow files for a
particular DB to a particular instance of ExecuteSQL to actually fetch the
rows. You should be able to use RouteOnAttribute for this, assuming your
table name is an attribute on the flow file.

Regards,
Matt


On Tue, Mar 13, 2018 at 8:30 PM, Ramaninder Singh Jhajj <
jhajj.raman094@gmail.com> wrote:

> Hello Everyone,
>
> I am facing an issue with QueryDatabaseTable processor.
>
> I have 4 identical mySQL database tables on 4 different AWS instances. The
> structure is same but data is different. What I am trying to do is, have 4
> QueryDatabaseTable processors and fetch the data from all the 4 instances
> to process it further and store in elasticsearch.
>
>
> This is the structure of the flow, now my issue is:
> *When any one of the processors run, it stores the "Maximum-value Columns"
> value in the state as shown below and this state is global in the cluster. *
>
>
>
> *​​*
>
> *Now when second QueryDatabaseTable processor runs, it overwrites the
> state value written by the first. I am facing issue with maintaining state
> for all 4 processors. Processors runs fine without any issue but obviously,
> the data being fetches is not consistent as "id" column values gets
> overwritten in the state.*
>
> Is there any solution to this problem. I need to do incremental fetch to
> all 4 identical tables on 4 instances in a single flow and single cluster.
>
> Please let me know if anyone faced similar problem and if there is any
> solution for this.
>
> Kind Regards,
> Raman
>