You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@falcon.apache.org by "Srikanth Sundarrajan (JIRA)" <ji...@apache.org> on 2014/03/05 15:27:46 UTC

[jira] [Commented] (FALCON-267) Add CDC feature

    [ https://issues.apache.org/jira/browse/FALCON-267?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13920883#comment-13920883 ] 

Srikanth Sundarrajan commented on FALCON-267:
---------------------------------------------

[~jb@nanthrax.net] and I had a chat conversation on this. Reproducing the transcripts with [~jb@nanthrax.net]'s permission to enable others to chime in as well.
{code}
Jean-Baptiste Onofré: second question, about CDC and late-cut-off element
Srikanth Sundarrajan: yes
Jean-Baptiste Onofré: regarding in the sources, the late-cut-off element creates a job in Oozie to check if a feed has changed
Jean-Baptiste Onofré: right ?
Srikanth Sundarrajan: no it doesn't
Srikanth Sundarrajan: let me walk you through the dtails
Jean-Baptiste Onofré: yes please
Srikanth Sundarrajan: Some notes here:http://falcon.incubator.apache.org/docs/FalconArchitecture.html#Handling_late_input_data
Srikanth Sundarrajan: but to cover more details
Srikanth Sundarrajan: Falcon in the parent workflow (which is actually what is scheduled in Oozie, user's workflow is setup as a child sub flow within that parent workflow), there are two steps
Srikanth Sundarrajan: 1. record size (first step)
(process execution, retention or replication)

Srikanth Sundarrajan: record-size essentially is enabled only if the process or feed replication is enabled for late handling
Jean-Baptiste Onofré: ok
Srikanth Sundarrajan: if it is enabled, record-size step, walks through all the input data dirs and records their sizes in a separate file under the falcon working dir
Srikanth Sundarrajan: in the post processing, parent workflow sends a message to active-mq saying that the workflow is successful
Jean-Baptiste Onofré: ok, so you have the feed size at time T in the falcon working dir
Srikanth Sundarrajan: yes
Srikanth Sundarrajan: now falcon server listens to the active-mq message
Jean-Baptiste Onofré: so, we can compare at T+1 to see if it changed or not, right ?
Srikanth Sundarrajan: a little more involved there....
Jean-Baptiste Onofré: ok
Srikanth Sundarrajan: and when falcon server gets the message, if the process or feed replication for which we got the message is enabled for late handling, it puts the message for re-run into a delayed queue
Srikanth Sundarrajan: the delay is based on the late-rerun policy (periodic, exp-backoff, final etc)
Srikanth Sundarrajan: now when the delay expires falcon server checks the current sizes of all the feed input for that process / replication
Srikanth Sundarrajan: if anything has changed, falcon simply triggers a workflow re-run through oozie to re-run it
Srikanth Sundarrajan: this keeps happening till the late-cut off of all the dependent feeds expire when we no that we  no longer need to consider changes
Srikanth Sundarrajan: makes sense ?
Jean-Baptiste Onofré: catcha
Jean-Baptiste Onofré: the use case is mostly replication, right ?
Srikanth Sundarrajan: also for process which is generating new data
Jean-Baptiste Onofré: yes, I see
Srikanth Sundarrajan: essentially it covers all cases
Srikanth Sundarrajan: there are no special handling for replication or any specific recipe for that matter
Jean-Baptiste Onofré: ok
Srikanth Sundarrajan: solution is very generic
Jean-Baptiste Onofré: as you said in Jira comment, maybe I can leverage late-cut off for CDC
Jean-Baptiste Onofré: my purpose for CDC is:
Srikanth Sundarrajan: absolutely
Srikanth Sundarrajan: from your description, it seemed like there is a lot of overlap
Jean-Baptiste Onofré: yes
Jean-Baptiste Onofré: the "missing" part would be the "gap" file in the falcon working directory
Jean-Baptiste Onofré: let say
Jean-Baptiste Onofré: 1/ an user submit a feed with late-cut off
Jean-Baptiste Onofré: thanks to late-cut off, using the "size file" in falcon working directory, we can know if a feed has changed or not
Jean-Baptiste Onofré: 2/ if the feed changes, we send an message to ActiveMQ topic containing a location in the falcon working dir containing the gap file (what has changed on the feed)
Jean-Baptiste Onofré: can I change the schedule of the sub-workflow in Oozie ?
Srikanth Sundarrajan: what is a gap file ? can you elaborate. sorry if it s standard terminology.
Jean-Baptiste Onofré: no worries, it's my "wording" (it's probably not good)
Jean-Baptiste Onofré: the gap file is a file with a diff between two states of a feed
Jean-Baptiste Onofré: for instance, feed "in" has size 10000
Jean-Baptiste Onofré: the late-cut off runs (let say every 10mn) and now the feed "in" has size 15000
Jean-Baptiste Onofré: the gap file would be the diff between feed "in" when its size was 10000 and now that the size is 15000
Jean-Baptiste Onofré: maybe my use case doesn't make sense
Jean-Baptiste Onofré: I just evaluate and think loud ;)
Srikanth Sundarrajan: since the input has changed, the existing mechanism already identifies this change
Jean-Baptiste Onofré: oh ok
Srikanth Sundarrajan: It is not very clear to me, what is missing
Jean-Baptiste Onofré: so, let me try to summarize
Jean-Baptiste Onofré: 1/ if I create a feed "in" with late-cut off element, when I submit this feed, I will have a job in Oozie which periodically check the size of the feed, right ?
Srikanth Sundarrajan: does this feed have a replication or are you considering it purely as input for a process which generates new data based on this data "in"
Srikanth Sundarrajan: or is it both ?
Jean-Baptiste Onofré: no replication for now
Jean-Baptiste Onofré: let say I have a MapReduce job that "populate" the location of the feed
Srikanth Sundarrajan: so we have only replication use case in consideration
Srikanth Sundarrajan: in that case, when the feed is "SCHEDULED", at submission nothing happens mind you
Jean-Baptiste Onofré: it's what I saw ;)
Srikanth Sundarrajan: upon schedule, it create a oozie coord for replication
Srikanth Sundarrajan: that has 3 steps in the workflow
Jean-Baptiste Onofré: but replication is to replicate/sync between two clusters, right ?
Srikanth Sundarrajan: 1st step is record size
sync/distributed copy/replication
post processing
Srikanth Sundarrajan: yes replication is sync
Srikanth Sundarrajan: record size records saying "in" for time instance "T0" is say S0, then the replication happens and the notification comes to falcon server
Jean-Baptiste Onofré: catcha
Jean-Baptiste Onofré: for CDC, I'm considering only one cluster (no replication), focused on the feed itself
Srikanth Sundarrajan: then we are talking about no replication where this feed's change has to impact the processes consuming this feed.correct ?
Jean-Baptiste Onofré: correct
Srikanth Sundarrajan: in which case, feed has no role to play
Srikanth Sundarrajan: upon feed submission or schedule, size is not recorded
Jean-Baptiste Onofré: ok
Srikanth Sundarrajan: size is actually recorded during that process execution
Srikanth Sundarrajan: so the responsibility of what has changed with the process and not for the feed
Jean-Baptiste Onofré: hmmm, I see
Jean-Baptiste Onofré: maybe we can provide a "CDC" process so
Jean-Baptiste Onofré: taking the feed
Srikanth Sundarrajan: so you are interested to simply audit changes to the feed independent of it usage by any process or replication
Jean-Baptiste Onofré: right
Srikanth Sundarrajan: do I understand your ask correctly ?
Jean-Baptiste Onofré: it's pure Change Data Capture
Jean-Baptiste Onofré: the purpose is to "monitore" HDFS locations, detect change on the locations, and send a message to a topic
Srikanth Sundarrajan: when the change is detected, what action need to be performed ?
Jean-Baptiste Onofré: the subscribers on the topic will receive a notification that a location changed
Jean-Baptiste Onofré: the action is the responsability of the subscribers
Srikanth Sundarrajan: so simple notification to the subscribers
Jean-Baptiste Onofré: they can trigger a process, do that they want
Jean-Baptiste Onofré: yes
Srikanth Sundarrajan: now the question is. you mentioned that there is a M/R job that generates data for this feed. correct ?
Srikanth Sundarrajan: is that a falcon process ?
Jean-Baptiste Onofré: for instance, my demo would be to use a Camel route to get the notificiation, and send e-mail, etc
Jean-Baptiste Onofré: the M/R job is a pure job, not necessary a falcon process (but it could if it's easier)
Srikanth Sundarrajan: the reason I am asking is
Srikanth Sundarrajan: this is a feature that is already built into falcon
Srikanth Sundarrajan: Every falcon process on post-processing has an ability to send notification to both a falcon topic that the system listens to and a user topic
Srikanth Sundarrajan: this is different from the standard server topic
Jean-Baptiste Onofré: if it already exists in Falcon, and the "constraint" is that the M/R job has to be a falcon process, it's fine for me
Srikanth Sundarrajan: so essentially messages can goto two separate topic
Jean-Baptiste Onofré: but I need a process in any case right ?
Srikanth Sundarrajan: yes
Jean-Baptiste Onofré: that was my mistake, I was just on feed level
Srikanth Sundarrajan: no issues. So it looks the existing features map well to your requirement
Srikanth Sundarrajan: except that the data generating job has to be mapped inside a process
Srikanth Sundarrajan: then it should work well
Srikanth Sundarrajan: I just realised that these info is not in the docs
Srikanth Sundarrajan: it might be useful to add them
Jean-Baptiste Onofré: yes, I will prepare some HowTo with detail like this
Jean-Baptiste Onofré: it could be interesting for the users
Srikanth Sundarrajan: great. that will be super helpful
Srikanth Sundarrajan: Do you mind if I shared this transcript on dev mailing list or you could share as well if that is ok
Jean-Baptiste Onofré: not at all !
Jean-Baptiste Onofré: I will prepare the CDC use case adding a job as a falcon process
Jean-Baptiste Onofré: is there any way to tune the late-cut off workflow interval ?
Jean-Baptiste Onofré: I saw in Oozie that the interval is very large
Jean-Baptiste Onofré: for instance, I would like to check the feed every 10mn
Srikanth Sundarrajan: that is customisable in the process definition
Jean-Baptiste Onofré: it's in the late-cut off value itself, right ?
Srikanth Sundarrajan: no
Srikanth Sundarrajan:     <xs:complexType name="late-process">
     <xs:sequence>
         <xs:element type="late-input" name="late-input" maxOccurs="unbounded" minOccurs="1">
             <xs:annotation>
                 <xs:documentation>
                     For each input, defines the workflow that should be run when late data is detected
                 </xs:documentation>
             </xs:annotation>
         </xs:element>
     </xs:sequence>
     <xs:attribute type="policy-type" name="policy" use="required"/>
     <xs:attribute type="frequency-type" name="delay" use="required"/>
 </xs:complexType>
Srikanth Sundarrajan: here the policy-type can be periodic etc
Srikanth Sundarrajan: and frequency-type can be minutes(10)
Jean-Baptiste Onofré: ok
Jean-Baptiste Onofré: awesome
Jean-Baptiste Onofré: it makes lot of sense
Jean-Baptiste Onofré: thanks a lot, I move this way (I will work on it tomorrow, today, I would like to submit the patch about user ACL check at entity submition time)
Srikanth Sundarrajan: super;
Srikanth Sundarrajan: I will take section of this doc and attach to FALCON-267 where the CDC feature was discussed, instead of being a loose hanging thread. What do you think ?
Jean-Baptiste Onofré: +1
Jean-Baptiste Onofré: I will document about the CDC use case
Jean-Baptiste Onofré: and add documentation about the workflow/use cases that we discussed
Srikanth Sundarrajan: please do. thanks Jean.
{code}

> Add CDC feature
> ---------------
>
>                 Key: FALCON-267
>                 URL: https://issues.apache.org/jira/browse/FALCON-267
>             Project: Falcon
>          Issue Type: New Feature
>            Reporter: Jean-Baptiste Onofré
>            Assignee: Jean-Baptiste Onofré
>
> I propose to add a Change Data Capture feature in Falcon.
> The idea is to be able to catch the change, firstly on HDFS files, publish the identified gap to a messaging topic.
> It's what I would like to PoC:
> - in a feed definition, we had a <capture/> element defining the change check interval.
> - we create a coordinator in oozie which execute the following workflow at capture interval
> - in the Falcon staging "capture" location on HDFS, we keep the first state of the feed. We compare (diff) the current content with the staging location, and write the diff in the Falcon staging. If the file is a binary, we can detect a change (using MD5 for instance) and the diff is the complete file (like in svn, git, etc).
> - if we have a diff, we publish a message in the Falcon "capture" topic (containing a set of JMS properties and the message body contains the link to the diff (on HDFS, in the Falcon staging). The "stream" copy is ovewritten by the new one.
> The purpose of this CDC is:
> 1/ thanks to the publication on the topic, to be able to use "external" tools to "react" when a change occurs. For instance, I plan to make a demo with an Apache Camel route (sending e-mails for example) when data change.
> 2/ staying in falcon/oozie/hadoop, to be able to setup a pipeline triggered by data change: for instance, trigger a job when the data change.
> The first PoC is HDFS/fs centric but I think we can do diff on HBase or Hive.
> Thoughts ?



--
This message was sent by Atlassian JIRA
(v6.2#6252)