You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@nifi.apache.org by Ryan H <ry...@gmail.com> on 2018/10/04 17:21:55 UTC

Dynamic Mapping

Hi All,

I have been working on an integration between two systems with NiFi in the
middle as the integration point. Basically when an event happens on
System-A, such as a record update, those changes need to be propagated to
System-B. For the first use case, I have set up a data flow that listens
for incoming requests from System-A, performs a mapping, the sends the
mapped data to System-B.

Generalized Flow for "Create_Event" (dumbed down significantly):
System-A "Create_Event" -> HandleHTTPRequest -> JoltTransformJSON ->
InvokeHTTP -> System-B "Create_Event"

This works great for the first case with a predefined mapping in
JoltTransformJSON. Now I want to generalize it so that the same data flow
can be used for all Create_Event's on System-A.

Here is where the issue comes in. There is a base schema for System-A that
has a base mapping to the base schema in System-B. Users of the System have
the ability to "extend" the base schema to add/remove fields and modify the
base mapping. So each time the Create_Event happens, the mapping that is
used should be the unique mapping spec associated to that user (call it a
GUID that comes along with the request).

The data flow is the exact same for all Create_Events, except for the
mapping, which will be unique to the user.

Does anyone know of a way to load up a different mapping to be used on a
per-request basis? I used JoltTransformJSON just as a proof of concept, so
it does not need to be a Jolt spec and can be modified to meet the needs of
whatever would work for this.

I started to look into schema registry, but kind of got lost a bit and
wasn't sure if it could be applied to this situation.

Any Help is, as always, greatly appreciated.


Cheers,

Ryan H.

RE: Dynamic Mapping

Posted by "Carlos Manuel Fernandes (DSI)" <ca...@cgd.pt>.
Hi Ryan,

If I understand well, you have a jolt map specification for each user. In that case you can put  the relation guid, map_spec  in some table of a external database.
After that in your flow  before JoltTransformJSON you create a custom processor to read the configuration from the table and transform the map_sec in an attribute  you can put in jolt specification property of JoltTransformJSON   processor,  using expression language.

I created a simple general processor for read from a relational database and transform the fields in attributes , I can  share with  you:


[cid:image001.png@01D45C17.2272F770]

In  pdaPool property you put the name of you DBCPConnectionPool.

The code of custom processor named executeSqlOnFlowV1.grovy:

import org.apache.nifi.processor.io.StreamCallback
import java.nio.charset.StandardCharsets
import org.apache.nifi.controller.ControllerService
import java.sql.DriverManager
import groovy.sql.Sql


def getConnFromPool(dbServiceName) {
       def lookup = context.controllerServiceLookup
       def dbcpServiceId = lookup.getControllerServiceIdentifiers(ControllerService).find { cs ->
             lookup.getControllerServiceName(cs) == dbServiceName
       }
       def conn = lookup.getControllerService(dbcpServiceId)?.getConnection()
       return conn
}


def writeFlowFile(flowFile, str) {
       flowFile = session.write(flowFile, {
             outputStream ->
             str.eachLine { line ->
                    outputStream.write(line.bytes)
                    outputStream.write('\n'.bytes)
             }
       } as OutputStreamCallback)

       return flowFile
}

//Main Program
def  pda,stmt,rows, cols=[]
def flowFile = session.get()
if (!flowFile) return

try {
       //create a DB Connection from Pool
       pda=new Sql( getConnFromPool(pdaPool.value))
       stmt=query.evaluateAttributeExpressions(flowFile).value.toString()

       log.info("executeQuery:${stmt}")

       //create new attributes which names are the columns names and the values are the values of the columns
       pda.firstRow(stmt, {meta-> (1..meta.columnCount).each {cols << meta.getColumnName(it)} })
       {
             cols.each {col->
               flowFile=session.putAttribute(flowFile,col,"${it.getAt(col)}")
             }
       }

       session.transfer(flowFile, REL_SUCCESS)

}
catch(Exception e) {
       log.error('executeQuery: error:', e)
       flowFile=writeFlowFile(flowFile,"Error executing query\n${e}\n${e.getStackTrace()}")
       session.transfer(flowFile, REL_FAILURE)
}
finally {
       pda?.close()
}


I hope this help.

Carlos


From: Ryan H [mailto:ryan.howell.development@gmail.com]
Sent: quinta-feira, 4 de outubro de 2018 18:22
To: users@nifi.apache.org
Subject: Dynamic Mapping

Hi All,

I have been working on an integration between two systems with NiFi in the middle as the integration point. Basically when an event happens on System-A, such as a record update, those changes need to be propagated to System-B. For the first use case, I have set up a data flow that listens for incoming requests from System-A, performs a mapping, the sends the mapped data to System-B.

Generalized Flow for "Create_Event" (dumbed down significantly):
System-A "Create_Event" -> HandleHTTPRequest -> JoltTransformJSON -> InvokeHTTP -> System-B "Create_Event"

This works great for the first case with a predefined mapping in JoltTransformJSON. Now I want to generalize it so that the same data flow can be used for all Create_Event's on System-A.

Here is where the issue comes in. There is a base schema for System-A that has a base mapping to the base schema in System-B. Users of the System have the ability to "extend" the base schema to add/remove fields and modify the base mapping. So each time the Create_Event happens, the mapping that is used should be the unique mapping spec associated to that user (call it a GUID that comes along with the request).

The data flow is the exact same for all Create_Events, except for the mapping, which will be unique to the user.

Does anyone know of a way to load up a different mapping to be used on a per-request basis? I used JoltTransformJSON just as a proof of concept, so it does not need to be a Jolt spec and can be modified to meet the needs of whatever would work for this.

I started to look into schema registry, but kind of got lost a bit and wasn't sure if it could be applied to this situation.

Any Help is, as always, greatly appreciated.


Cheers,

Ryan H.

Re: Dynamic Mapping

Posted by Matt Burgess <ma...@apache.org>.
Another thing I'd mention about JOLT is that it works by matching
"nodes" as it walks the "tree". That means in certain cases you can
include fields in the spec that may not match a node in one input, but
would match in another. So in the fullName case, you might be able to
write one spec that works for both examples, by matching the
"middleName" node along with the firstName and lastName nodes, and
building the fullName node using a (possible) match on middleName.
When middleName doesn't exist, it shouldn't contribute anything to the
fullName field.

Not to say that this will work for your case, but it is an interesting
feature of JOLT where you don't need exact schemas, because you're
writing the rules for each match. NiFi's Record API has some similar
interesting features, where you can often have an "overall" schema
that includes all possible fields, and if they don't exist in the
input/output, they will be ignored. Of course, both approaches have
their own caveats, but wanted to bring these up in case it helps you
for your flow.

Regards,
Matt

P.S.  NiFi 1.8.0 will have JoltTransformRecord, so you'll be able to
use JOLT on any input we have a Reader for, not just JSON.
On Thu, Oct 4, 2018 at 4:11 PM Bryan Bende <bb...@gmail.com> wrote:
>
> How are you currently defining schemas and transforms? or are you free
> to choose depending what works best for building the data flow?
>
> If you are willing to use JOLT to define the transform then you have a
> few options...
>
> 1) As Joe mentioned, you can send the GUID in a header which becomes a
> flow file attribute and use RouteOnAttribute to route to the
> appropriate JoltTransformJSON processor. This requires the processor
> is setup ahead of time for each GUID.
>
> 2) As Carlos mentioned, you can put all the JOLT specs somewhere like
> a relational database, or maybe the distributed map cache, and then
> build your flow to take the GUID from the flow file attributes and
> retrieve the JOLT spec into a flow file attribute, and then have a
> single JoltTransformJson that sets the Jolt Spec to a dynamic value
> from the flow file attribute.
>
> 3) If you don't want to store the JOLT specs somewhere, then you could
> actually put the spec in a header as part of the event creation that
> is sent over, the header then becomes a flow file attribute and would
> be referenced the same way as in #2. This may not be that great if the
> JOLT specs are very large.
>
> The record processors would work well if you had simple variations of
> the same schema, like System A has "first name, last name" and System
> B only wants "first name", but once you have more complex logic like
> your example where two fields have to be read as one field, etc, then
> it becomes hard to make the flow dynamic. You could apply that logic
> with UpdateRecord most likely, but you would need a pre-defined
> UpdateRecord for each transform, and it would likely not be as
> powerful as a JOLT transform.
>
>
> On Thu, Oct 4, 2018 at 3:14 PM Ryan H <ry...@gmail.com> wrote:
> >
> > Joe/Carlos,
> >
> > Thanks for the replies. Here is a little more information that hopefully clarifies the problem statement. We have System-A and System-B. Each time we bring on a new User, they get a new Organization (this will be the unique id) in both System-A and System-B. The new User will go in and set up their objects for their Organization (the schemas unique to the user). Say the first schema they set up is for a Customer object. They start with a base schema for a Customer that is predefined.
> >
> > The Base Schema and Mappings
> > System-A Base Schema
> > First_Name
> > Last_Name
> >
> > System-B Schema
> > Full_Name
> >
> > Base Mapping
> > First_Name + Last_Name -> Full_Name
> >
> > ----
> >
> > First User's Schema and mapping for their Organization:
> > System-A Organization 1
> > First_Name
> > Last_Name
> >
> > System-B Schema
> > Full_Name
> >
> > Organization 1 Mapping
> > First_Name + Last_Name -> Full_Name
> >
> > First user makes no changes to the schema or mapping.
> >
> > ----
> >
> > Second user comes onboard. They don't like the base schema, so they make theirs custom for their Organization.
> >
> > They do:
> > System-A Organization 2
> > First_Name
> > Middle_Name
> > Last_Name
> >
> > System-B Schema
> > Full_Name
> >
> > Organization 1 Mapping
> > First_Name + Middle_Name + Last_Name -> Full_Name
> >
> > I should mention that System-A is the only modifiable schema; System-B is fixed. The only thing that can change is the System-A schema and its mapping to System-B and will be unique to each Organization.
> >
> > Hope this clarifies a bit more.
> >
> > Cheers,
> >
> > Ryan H
> >
> >
> > On Thu, Oct 4, 2018 at 2:39 PM Joe Witt <jo...@gmail.com> wrote:
> >>
> >> Ryan
> >>
> >> I am not entirely sure I fully understand the scenario so read the
> >> following with a bit of caution.
> >>
> >> But the gist I think i'm reading is that system A generates data
> >> against a given schema referenceable with some guid say 'GUID1'.  For
> >> every GUID1 from System A there is a similar/but possibly different
> >> schema in System B referenceable again by some guid say GUID1_B.
> >>
> >> Are those base schemas you referenced say GUID1 and GUID1_B compatible
> >> in that all fields of import in GUID1 schema will be in GUID1_B
> >> schema?  As-in can you treat GUID1_B schemas as a superset of GUID1
> >> schemas?
> >>
> >> In any event, it is very possible that you can achieve this using the
> >> Record processors and Record oriented controller services.
> >>
> >> You can 'read' data from system A in NiFi using record readers that
> >> reference schemas from systemA and 'write' data using system B schemas
> >> in NiFi.
> >>
> >> The record oriented processors combined with their pluggable record
> >> readers and writers allows this to happen. The schema for the reader
> >> and writer can be different and the processors will map things over by
> >> field names/types.  That leaves the problem of 'how to access' the
> >> schema information so NiFi knows what to do during read/write phases.
> >> You can either ensure all these schemas are in the nifi schema
> >> registry and can be looked up by some well established name and naming
> >> scheme.  You could also do it via a SQL lookup if you have the schemas
> >> in some database (similar to what Carlos might be suggesting but
> >> without Jolt).  Or some other way..
> >>
> >> Now, if your mapping logic from System A schema to System B schema is
> >> more complex then you'll want something like JOLT most likely to help
> >> manage those transforms.
> >>
> >> You can also do some routing on the requests from System A to
> >> specialized JOLT mappers for each case of converting to System B
> >> schemas.  That will end up in a lot of config but it is likely you can
> >> paramaterize this well using versioned flows and expression langauge
> >> statements in the variable registry.
> >>
> >> Thanks
> >> Joe
> >> On Thu, Oct 4, 2018 at 1:35 PM Ryan H <ry...@gmail.com> wrote:
> >> >
> >> > Hi All,
> >> >
> >> > I have been working on an integration between two systems with NiFi in the middle as the integration point. Basically when an event happens on System-A, such as a record update, those changes need to be propagated to System-B. For the first use case, I have set up a data flow that listens for incoming requests from System-A, performs a mapping, the sends the mapped data to System-B.
> >> >
> >> > Generalized Flow for "Create_Event" (dumbed down significantly):
> >> > System-A "Create_Event" -> HandleHTTPRequest -> JoltTransformJSON -> InvokeHTTP -> System-B "Create_Event"
> >> >
> >> > This works great for the first case with a predefined mapping in JoltTransformJSON. Now I want to generalize it so that the same data flow can be used for all Create_Event's on System-A.
> >> >
> >> > Here is where the issue comes in. There is a base schema for System-A that has a base mapping to the base schema in System-B. Users of the System have the ability to "extend" the base schema to add/remove fields and modify the base mapping. So each time the Create_Event happens, the mapping that is used should be the unique mapping spec associated to that user (call it a GUID that comes along with the request).
> >> >
> >> > The data flow is the exact same for all Create_Events, except for the mapping, which will be unique to the user.
> >> >
> >> > Does anyone know of a way to load up a different mapping to be used on a per-request basis? I used JoltTransformJSON just as a proof of concept, so it does not need to be a Jolt spec and can be modified to meet the needs of whatever would work for this.
> >> >
> >> > I started to look into schema registry, but kind of got lost a bit and wasn't sure if it could be applied to this situation.
> >> >
> >> > Any Help is, as always, greatly appreciated.
> >> >
> >> >
> >> > Cheers,
> >> >
> >> > Ryan H.

Re: Dynamic Mapping

Posted by Bryan Bende <bb...@gmail.com>.
How are you currently defining schemas and transforms? or are you free
to choose depending what works best for building the data flow?

If you are willing to use JOLT to define the transform then you have a
few options...

1) As Joe mentioned, you can send the GUID in a header which becomes a
flow file attribute and use RouteOnAttribute to route to the
appropriate JoltTransformJSON processor. This requires the processor
is setup ahead of time for each GUID.

2) As Carlos mentioned, you can put all the JOLT specs somewhere like
a relational database, or maybe the distributed map cache, and then
build your flow to take the GUID from the flow file attributes and
retrieve the JOLT spec into a flow file attribute, and then have a
single JoltTransformJson that sets the Jolt Spec to a dynamic value
from the flow file attribute.

3) If you don't want to store the JOLT specs somewhere, then you could
actually put the spec in a header as part of the event creation that
is sent over, the header then becomes a flow file attribute and would
be referenced the same way as in #2. This may not be that great if the
JOLT specs are very large.

The record processors would work well if you had simple variations of
the same schema, like System A has "first name, last name" and System
B only wants "first name", but once you have more complex logic like
your example where two fields have to be read as one field, etc, then
it becomes hard to make the flow dynamic. You could apply that logic
with UpdateRecord most likely, but you would need a pre-defined
UpdateRecord for each transform, and it would likely not be as
powerful as a JOLT transform.


On Thu, Oct 4, 2018 at 3:14 PM Ryan H <ry...@gmail.com> wrote:
>
> Joe/Carlos,
>
> Thanks for the replies. Here is a little more information that hopefully clarifies the problem statement. We have System-A and System-B. Each time we bring on a new User, they get a new Organization (this will be the unique id) in both System-A and System-B. The new User will go in and set up their objects for their Organization (the schemas unique to the user). Say the first schema they set up is for a Customer object. They start with a base schema for a Customer that is predefined.
>
> The Base Schema and Mappings
> System-A Base Schema
> First_Name
> Last_Name
>
> System-B Schema
> Full_Name
>
> Base Mapping
> First_Name + Last_Name -> Full_Name
>
> ----
>
> First User's Schema and mapping for their Organization:
> System-A Organization 1
> First_Name
> Last_Name
>
> System-B Schema
> Full_Name
>
> Organization 1 Mapping
> First_Name + Last_Name -> Full_Name
>
> First user makes no changes to the schema or mapping.
>
> ----
>
> Second user comes onboard. They don't like the base schema, so they make theirs custom for their Organization.
>
> They do:
> System-A Organization 2
> First_Name
> Middle_Name
> Last_Name
>
> System-B Schema
> Full_Name
>
> Organization 1 Mapping
> First_Name + Middle_Name + Last_Name -> Full_Name
>
> I should mention that System-A is the only modifiable schema; System-B is fixed. The only thing that can change is the System-A schema and its mapping to System-B and will be unique to each Organization.
>
> Hope this clarifies a bit more.
>
> Cheers,
>
> Ryan H
>
>
> On Thu, Oct 4, 2018 at 2:39 PM Joe Witt <jo...@gmail.com> wrote:
>>
>> Ryan
>>
>> I am not entirely sure I fully understand the scenario so read the
>> following with a bit of caution.
>>
>> But the gist I think i'm reading is that system A generates data
>> against a given schema referenceable with some guid say 'GUID1'.  For
>> every GUID1 from System A there is a similar/but possibly different
>> schema in System B referenceable again by some guid say GUID1_B.
>>
>> Are those base schemas you referenced say GUID1 and GUID1_B compatible
>> in that all fields of import in GUID1 schema will be in GUID1_B
>> schema?  As-in can you treat GUID1_B schemas as a superset of GUID1
>> schemas?
>>
>> In any event, it is very possible that you can achieve this using the
>> Record processors and Record oriented controller services.
>>
>> You can 'read' data from system A in NiFi using record readers that
>> reference schemas from systemA and 'write' data using system B schemas
>> in NiFi.
>>
>> The record oriented processors combined with their pluggable record
>> readers and writers allows this to happen. The schema for the reader
>> and writer can be different and the processors will map things over by
>> field names/types.  That leaves the problem of 'how to access' the
>> schema information so NiFi knows what to do during read/write phases.
>> You can either ensure all these schemas are in the nifi schema
>> registry and can be looked up by some well established name and naming
>> scheme.  You could also do it via a SQL lookup if you have the schemas
>> in some database (similar to what Carlos might be suggesting but
>> without Jolt).  Or some other way..
>>
>> Now, if your mapping logic from System A schema to System B schema is
>> more complex then you'll want something like JOLT most likely to help
>> manage those transforms.
>>
>> You can also do some routing on the requests from System A to
>> specialized JOLT mappers for each case of converting to System B
>> schemas.  That will end up in a lot of config but it is likely you can
>> paramaterize this well using versioned flows and expression langauge
>> statements in the variable registry.
>>
>> Thanks
>> Joe
>> On Thu, Oct 4, 2018 at 1:35 PM Ryan H <ry...@gmail.com> wrote:
>> >
>> > Hi All,
>> >
>> > I have been working on an integration between two systems with NiFi in the middle as the integration point. Basically when an event happens on System-A, such as a record update, those changes need to be propagated to System-B. For the first use case, I have set up a data flow that listens for incoming requests from System-A, performs a mapping, the sends the mapped data to System-B.
>> >
>> > Generalized Flow for "Create_Event" (dumbed down significantly):
>> > System-A "Create_Event" -> HandleHTTPRequest -> JoltTransformJSON -> InvokeHTTP -> System-B "Create_Event"
>> >
>> > This works great for the first case with a predefined mapping in JoltTransformJSON. Now I want to generalize it so that the same data flow can be used for all Create_Event's on System-A.
>> >
>> > Here is where the issue comes in. There is a base schema for System-A that has a base mapping to the base schema in System-B. Users of the System have the ability to "extend" the base schema to add/remove fields and modify the base mapping. So each time the Create_Event happens, the mapping that is used should be the unique mapping spec associated to that user (call it a GUID that comes along with the request).
>> >
>> > The data flow is the exact same for all Create_Events, except for the mapping, which will be unique to the user.
>> >
>> > Does anyone know of a way to load up a different mapping to be used on a per-request basis? I used JoltTransformJSON just as a proof of concept, so it does not need to be a Jolt spec and can be modified to meet the needs of whatever would work for this.
>> >
>> > I started to look into schema registry, but kind of got lost a bit and wasn't sure if it could be applied to this situation.
>> >
>> > Any Help is, as always, greatly appreciated.
>> >
>> >
>> > Cheers,
>> >
>> > Ryan H.

Re: Dynamic Mapping

Posted by Ryan H <ry...@gmail.com>.
Joe/Carlos,

Thanks for the replies. Here is a little more information that hopefully
clarifies the problem statement. We have System-A and System-B. Each time
we bring on a new User, they get a new Organization (this will be the
unique id) in both System-A and System-B. The new User will go in and set
up their objects for their Organization (the schemas unique to the user).
Say the first schema they set up is for a Customer object. They start with
a base schema for a Customer that is predefined.

The Base Schema and Mappings
System-A Base Schema
First_Name
Last_Name

System-B Schema
Full_Name

Base Mapping
First_Name + Last_Name -> Full_Name

----

First User's Schema and mapping for their Organization:
System-A Organization 1
First_Name
Last_Name

System-B Schema
Full_Name

Organization 1 Mapping
First_Name + Last_Name -> Full_Name

First user makes no changes to the schema or mapping.

----

Second user comes onboard. They don't like the base schema, so they make
theirs custom for their Organization.

They do:
System-A Organization 2
First_Name
Middle_Name
Last_Name

System-B Schema
Full_Name

Organization 1 Mapping
First_Name + Middle_Name + Last_Name -> Full_Name

I should mention that System-A is the only modifiable schema; System-B is
fixed. The only thing that can change is the System-A schema and its
mapping to System-B and will be unique to each Organization.

Hope this clarifies a bit more.

Cheers,

Ryan H


On Thu, Oct 4, 2018 at 2:39 PM Joe Witt <jo...@gmail.com> wrote:

> Ryan
>
> I am not entirely sure I fully understand the scenario so read the
> following with a bit of caution.
>
> But the gist I think i'm reading is that system A generates data
> against a given schema referenceable with some guid say 'GUID1'.  For
> every GUID1 from System A there is a similar/but possibly different
> schema in System B referenceable again by some guid say GUID1_B.
>
> Are those base schemas you referenced say GUID1 and GUID1_B compatible
> in that all fields of import in GUID1 schema will be in GUID1_B
> schema?  As-in can you treat GUID1_B schemas as a superset of GUID1
> schemas?
>
> In any event, it is very possible that you can achieve this using the
> Record processors and Record oriented controller services.
>
> You can 'read' data from system A in NiFi using record readers that
> reference schemas from systemA and 'write' data using system B schemas
> in NiFi.
>
> The record oriented processors combined with their pluggable record
> readers and writers allows this to happen. The schema for the reader
> and writer can be different and the processors will map things over by
> field names/types.  That leaves the problem of 'how to access' the
> schema information so NiFi knows what to do during read/write phases.
> You can either ensure all these schemas are in the nifi schema
> registry and can be looked up by some well established name and naming
> scheme.  You could also do it via a SQL lookup if you have the schemas
> in some database (similar to what Carlos might be suggesting but
> without Jolt).  Or some other way..
>
> Now, if your mapping logic from System A schema to System B schema is
> more complex then you'll want something like JOLT most likely to help
> manage those transforms.
>
> You can also do some routing on the requests from System A to
> specialized JOLT mappers for each case of converting to System B
> schemas.  That will end up in a lot of config but it is likely you can
> paramaterize this well using versioned flows and expression langauge
> statements in the variable registry.
>
> Thanks
> Joe
> On Thu, Oct 4, 2018 at 1:35 PM Ryan H <ry...@gmail.com>
> wrote:
> >
> > Hi All,
> >
> > I have been working on an integration between two systems with NiFi in
> the middle as the integration point. Basically when an event happens on
> System-A, such as a record update, those changes need to be propagated to
> System-B. For the first use case, I have set up a data flow that listens
> for incoming requests from System-A, performs a mapping, the sends the
> mapped data to System-B.
> >
> > Generalized Flow for "Create_Event" (dumbed down significantly):
> > System-A "Create_Event" -> HandleHTTPRequest -> JoltTransformJSON ->
> InvokeHTTP -> System-B "Create_Event"
> >
> > This works great for the first case with a predefined mapping in
> JoltTransformJSON. Now I want to generalize it so that the same data flow
> can be used for all Create_Event's on System-A.
> >
> > Here is where the issue comes in. There is a base schema for System-A
> that has a base mapping to the base schema in System-B. Users of the System
> have the ability to "extend" the base schema to add/remove fields and
> modify the base mapping. So each time the Create_Event happens, the mapping
> that is used should be the unique mapping spec associated to that user
> (call it a GUID that comes along with the request).
> >
> > The data flow is the exact same for all Create_Events, except for the
> mapping, which will be unique to the user.
> >
> > Does anyone know of a way to load up a different mapping to be used on a
> per-request basis? I used JoltTransformJSON just as a proof of concept, so
> it does not need to be a Jolt spec and can be modified to meet the needs of
> whatever would work for this.
> >
> > I started to look into schema registry, but kind of got lost a bit and
> wasn't sure if it could be applied to this situation.
> >
> > Any Help is, as always, greatly appreciated.
> >
> >
> > Cheers,
> >
> > Ryan H.
>

Re: Dynamic Mapping

Posted by Joe Witt <jo...@gmail.com>.
Ryan

I am not entirely sure I fully understand the scenario so read the
following with a bit of caution.

But the gist I think i'm reading is that system A generates data
against a given schema referenceable with some guid say 'GUID1'.  For
every GUID1 from System A there is a similar/but possibly different
schema in System B referenceable again by some guid say GUID1_B.

Are those base schemas you referenced say GUID1 and GUID1_B compatible
in that all fields of import in GUID1 schema will be in GUID1_B
schema?  As-in can you treat GUID1_B schemas as a superset of GUID1
schemas?

In any event, it is very possible that you can achieve this using the
Record processors and Record oriented controller services.

You can 'read' data from system A in NiFi using record readers that
reference schemas from systemA and 'write' data using system B schemas
in NiFi.

The record oriented processors combined with their pluggable record
readers and writers allows this to happen. The schema for the reader
and writer can be different and the processors will map things over by
field names/types.  That leaves the problem of 'how to access' the
schema information so NiFi knows what to do during read/write phases.
You can either ensure all these schemas are in the nifi schema
registry and can be looked up by some well established name and naming
scheme.  You could also do it via a SQL lookup if you have the schemas
in some database (similar to what Carlos might be suggesting but
without Jolt).  Or some other way..

Now, if your mapping logic from System A schema to System B schema is
more complex then you'll want something like JOLT most likely to help
manage those transforms.

You can also do some routing on the requests from System A to
specialized JOLT mappers for each case of converting to System B
schemas.  That will end up in a lot of config but it is likely you can
paramaterize this well using versioned flows and expression langauge
statements in the variable registry.

Thanks
Joe
On Thu, Oct 4, 2018 at 1:35 PM Ryan H <ry...@gmail.com> wrote:
>
> Hi All,
>
> I have been working on an integration between two systems with NiFi in the middle as the integration point. Basically when an event happens on System-A, such as a record update, those changes need to be propagated to System-B. For the first use case, I have set up a data flow that listens for incoming requests from System-A, performs a mapping, the sends the mapped data to System-B.
>
> Generalized Flow for "Create_Event" (dumbed down significantly):
> System-A "Create_Event" -> HandleHTTPRequest -> JoltTransformJSON -> InvokeHTTP -> System-B "Create_Event"
>
> This works great for the first case with a predefined mapping in JoltTransformJSON. Now I want to generalize it so that the same data flow can be used for all Create_Event's on System-A.
>
> Here is where the issue comes in. There is a base schema for System-A that has a base mapping to the base schema in System-B. Users of the System have the ability to "extend" the base schema to add/remove fields and modify the base mapping. So each time the Create_Event happens, the mapping that is used should be the unique mapping spec associated to that user (call it a GUID that comes along with the request).
>
> The data flow is the exact same for all Create_Events, except for the mapping, which will be unique to the user.
>
> Does anyone know of a way to load up a different mapping to be used on a per-request basis? I used JoltTransformJSON just as a proof of concept, so it does not need to be a Jolt spec and can be modified to meet the needs of whatever would work for this.
>
> I started to look into schema registry, but kind of got lost a bit and wasn't sure if it could be applied to this situation.
>
> Any Help is, as always, greatly appreciated.
>
>
> Cheers,
>
> Ryan H.