You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@hive.apache.org by atreju <n....@gmail.com> on 2010/06/08 20:54:01 UTC

How to apply RDBMS table updates and deletes into Hadoop

To generate smart output from base data we need to copy some base tables
from relational database into Hadoop. Some of them are big. To dump the
entire table into Hadoop everyday is not an option since there are like 30+
tables and each would take several hours.

The methodology that we approached is to get the entire table dump first.
Then each day or every 4-6 hours get only insert/update/delete since the
last copy from RDBMS (based on a date field in the table). Using Hive do
outer join + union the new data with existing data and write into a new
file. For example, if there are a 100 rows in Hadoop, and in RDBMS 3 records
inserted, 2 records updated and 1 deleted since the last Hadoop copy, then
the Hive query will get 97 of the not changed data + 3 inserts + 2 updates
and write into a new file. The other applications like Pig or Hive will pick
the most recent file to use when selecting/loading data from those base
table data files.

This logic is working fine in lower environments for small size tables. With
production data, for about 30GB size table, the incremental re-generation of
the file in Hadoop is still taking several hours. I tried using zipped
version and it took even longer time. I am not convinced that this is the
best we can do to handle updates and deletes since we had to re-write 29GB
unchanged data of the 30GB file again into a new file. ...and this is not
the biggest table.

I am thinking that this should be problem for many companies. What are the
other approaches to apply updates and deletes on base tables to the
Hadoop data files?

We have 4 data nodes and using version 20.3.

Thanks!

Re: How to apply RDBMS table updates and deletes into Hadoop

Posted by Sonal Goyal <so...@gmail.com>.
Hi Atreju,

You have a very valid use case here. Data changes in your database,
and you want to pull in only the changes to Hadoop. Have you
considered query based data retrieval from the RDBMS to Hadoop?  As
you already have a date field in your tables which marks the changed
rows, you can query on that field and get only the changed records to
Hadoop.

I have been working on an open source framework for incremental
updates and fetching such records to Hadoop. You can check

http://code.google.com/p/hiho/
http://code.google.com/p/hiho/wiki/DatabaseImportFAQ

If you have any questions or need any changes, please send me an
offline mail.

Thanks and Regards,
Sonal
www.meghsoft.com
http://in.linkedin.com/in/sonalgoyal



On Wed, Jun 9, 2010 at 4:39 AM, Yongqiang He <he...@gmail.com> wrote:
> Hi,
>
> I think hive’s join + transform could be helpful here.
>
> Thanks
> Yongqiang
> On 6/8/10 3:58 PM, "Aaron Kimball" <aa...@cloudera.com> wrote:
>
> I think that this might be the way to go. In general, folding updates and
> deletes into datasets is a difficult problem due to the append-only nature
> of datasets.
>
> Something that might help you here is to partition your tables in Hive based
> on some well-distributed key. Then if you have a relatively small number of
> partitions affected by an incremental import (perhaps more recently-imported
> records are more likely to be updated? in this case, partition the tables by
> the month/week you imported them?) you can only perform the fold-in of the
> new deltas on the affected partitions. This should be much faster than a
> full table scan.
>
> Have you seen the Sqoop tool? It handles imports and exports between HDFS
> (and Hive) and RDBMS systems --  but currently can only import new records
> (and subsequent INSERTs); it can't handle updates/deletes. Sqoop is
> available at http://github.com/cloudera/sqoop -- it doesn't run on Apache
> 0.20.3, but works on CDH (Cloudera's Distribution for Hadoop) and Hadoop
> 0.21/trunk.
>
> This sort of capability is something I'm really interested in adding to
> Sqoop. If you've got a well-run process for doing this, I'd really
> appreciate your help adding this feature :) Send me an email off-list if
> you're interested. At the very least, I'd urge you to try out the tool.
>
> Cheers,
> - Aaron Kimball
>
> On Tue, Jun 8, 2010 at 8:54 PM, atreju <n....@gmail.com> wrote:
>
> To generate smart output from base data we need to copy some base tables
> from relational database into Hadoop. Some of them are big. To dump the
> entire table into Hadoop everyday is not an option since there are like 30+
> tables and each would take several hours.
>
> The methodology that we approached is to get the entire table dump first.
> Then each day or every 4-6 hours get only insert/update/delete since the
> last copy from RDBMS (based on a date field in the table). Using Hive do
> outer join + union the new data with existing data and write into a new
> file. For example, if there are a 100 rows in Hadoop, and in RDBMS 3 records
> inserted, 2 records updated and 1 deleted since the last Hadoop copy, then
> the Hive query will get 97 of the not changed data + 3 inserts + 2 updates
> and write into a new file. The other applications like Pig or Hive will pick
> the most recent file to use when selecting/loading data from those base
> table data files.
>
> This logic is working fine in lower environments for small size tables. With
> production data, for about 30GB size table, the incremental re-generation of
> the file in Hadoop is still taking several hours. I tried using zipped
> version and it took even longer time. I am not convinced that this is the
> best we can do to handle updates and deletes since we had to re-write 29GB
> unchanged data of the 30GB file again into a new file. ...and this is not
> the biggest table.
>
> I am thinking that this should be problem for many companies. What are the
> other approaches to apply updates and deletes on base tables to the
> Hadoop data files?
>
> We have 4 data nodes and using version 20.3.
>
> Thanks!
>
>
>
>

Re: How to apply RDBMS table updates and deletes into Hadoop

Posted by Edward Capriolo <ed...@gmail.com>.
I do not know what made me think back to this thread but it is a
rather cool idea. I guess now NoSQL is more attractive for this, but
if you wanted to do it...

a1
pk  first   last       lastmod  deleted
1   A  lincoln   1        0
2   bob  barker   2             0

a2
1   A Lincoln    4	0
2   X		X	6	1

select transform(pk,first,last,lastmod,deleted) USING 'latest_not_delete.pl' as
(pk1,first1,last1,lastmod1,deleted1)
FROM a1 union a2 cluster by pk sort by pk, lastmod asc

Where 'latest_not_delete.pl' would be a script that accepts N rows and
returns only the most recent.

Has anyone ever tried something like this?

On Wed, Jun 9, 2010 at 6:32 PM, atreju <n....@gmail.com> wrote:
> As an ideal solution, I have a suggestion to Hive contributors to make it
> look like Hive is doing insert/update/delete:
>
>
> This will require a special type of table creation syntax. I will call it as
> "UPDATABLE TABLE". The table will have 3 special columns that are defined in
> the create script:
> 1. The primary key column. (let's say: col_pk)
> 2. BIGINT type date column that shows the ms from Jan 1st, 1970 to actual
> data manipulation date/time in RDMBS. (dml_date)
> 3. TINYINT or BOOLEAN type column that will store 0 if the record is deleted
> and 1 if it is inserted or updated. (dml_action)
>
> This will require the RDBMS table to have PK and last update date column and
> deletes recorded in some other table by pk and date.
>
> On Day 1, the entire table is put into Hadoop, with addition of 2 extra
> columns: dml_date (bigint) and dml_action.
>
> On Day 2, we first find the max of dml_date from Hive table. Then we query
> from RDBMS inserts/updates/deletes since that date/time and write into a
> file with the correct dml_date/dml_action. The file goes to the same folder
> that our Hive table is in.
>
> Right now, if on Day 1 we had 100 rows and on Day 2, 10 rows a regular Hive
> table would show 110 rows. But, since this is a special table (UPDATABLE
> TABLE), every time this table is queried in Hive, Hive first run a
> map-reduce that would find the most recent (max(dml_date)) row per pk (group
> by col_pk) that is not deleted (dml_action!=0) and use that output in the
> user's query. That is the big idea!!
>
> Hive can have Insert/Update/Delete commands that would do nothing but create
> a file with rows of manipulated data with correct date and action.
>
> There can be a special "flush" kind of command that runs the MR and replaces
> all files in the table directory with single file. That can run weekly,
> monthly or may be after each time dml data received from RDBMS.
>
> Sqoop can have Hive interface that saves certain table attributes like pk
> column, RDBMS connection info,... and with one command from Hive, the Hive
> table gets updated from RDBMS....
>
> What do you think?
>
>
>
> On Tue, Jun 8, 2010 at 3:58 PM, Aaron Kimball <aa...@cloudera.com> wrote:
>>
>> I think that this might be the way to go. In general, folding updates and
>> deletes into datasets is a difficult problem due to the append-only nature
>> of datasets.
>>
>> Something that might help you here is to partition your tables in Hive
>> based on some well-distributed key. Then if you have a relatively small
>> number of partitions affected by an incremental import (perhaps more
>> recently-imported records are more likely to be updated? in this case,
>> partition the tables by the month/week you imported them?) you can only
>> perform the fold-in of the new deltas on the affected partitions. This
>> should be much faster than a full table scan.
>>
>> Have you seen the Sqoop tool? It handles imports and exports between HDFS
>> (and Hive) and RDBMS systems --  but currently can only import new records
>> (and subsequent INSERTs); it can't handle updates/deletes. Sqoop is
>> available at http://github.com/cloudera/sqoop -- it doesn't run on Apache
>> 0.20.3, but works on CDH (Cloudera's Distribution for Hadoop) and Hadoop
>> 0.21/trunk.
>>
>> This sort of capability is something I'm really interested in adding to
>> Sqoop. If you've got a well-run process for doing this, I'd really
>> appreciate your help adding this feature :) Send me an email off-list if
>> you're interested. At the very least, I'd urge you to try out the tool.
>>
>> Cheers,
>> - Aaron Kimball
>>
>>
>> On Tue, Jun 8, 2010 at 8:54 PM, atreju <n....@gmail.com> wrote:
>>>
>>> To generate smart output from base data we need to copy some base tables
>>> from relational database into Hadoop. Some of them are big. To dump the
>>> entire table into Hadoop everyday is not an option since there are like 30+
>>> tables and each would take several hours.
>>>
>>> The methodology that we approached is to get the entire table dump first.
>>> Then each day or every 4-6 hours get only insert/update/delete since the
>>> last copy from RDBMS (based on a date field in the table). Using Hive do
>>> outer join + union the new data with existing data and write into a new
>>> file. For example, if there are a 100 rows in Hadoop, and in RDBMS 3 records
>>> inserted, 2 records updated and 1 deleted since the last Hadoop copy, then
>>> the Hive query will get 97 of the not changed data + 3 inserts + 2 updates
>>> and write into a new file. The other applications like Pig or Hive will pick
>>> the most recent file to use when selecting/loading data from those base
>>> table data files.
>>>
>>> This logic is working fine in lower environments for small size tables.
>>> With production data, for about 30GB size table, the incremental
>>> re-generation of the file in Hadoop is still taking several hours. I tried
>>> using zipped version and it took even longer time. I am not convinced that
>>> this is the best we can do to handle updates and deletes since we had to
>>> re-write 29GB unchanged data of the 30GB file again into a new file. ...and
>>> this is not the biggest table.
>>>
>>> I am thinking that this should be problem for many companies. What are
>>> the other approaches to apply updates and deletes on base tables to the
>>> Hadoop data files?
>>>
>>> We have 4 data nodes and using version 20.3.
>>>
>>> Thanks!
>>>
>>
>>
>

Re: How to apply RDBMS table updates and deletes into Hadoop

Posted by atreju <n....@gmail.com>.
Thank you for your response. I understand... Just a few points before I
accept that this is too complicated :)

The main idea is to keep different versions of data under the same table,
similar to HBase but this is row level and you don't have to make the other
versions accessible from Hive but only the most recent one. You just need to
create an access layer to work on the most recent version of the row. If you
can think of a different way of uniquely identifying a row to know the
versions of it and timestamp (or counter or version #??) to know the most
recent one, it doesn't have to be the columns that I specified before. It
can be a different file that you create in the background (which can also be
the index file!!).

Oracle has ROWID for physical location of the row and locks it before the
data manipulation. Hadoop has advantage of storage and map-reduce. So why
not use it and keep all versions of changed data and access it via
map-reduce for the most recent one. Accessing the data can get slower over
time when there are many versions. And that can be fixed with flush or full
replication of data time to time in a maintenance window by the end user.

Hive is a great tool to access and manipulate Hadoop files. You are doing an
amazing job. I have no idea what are the complications you face each day.
Just disregard if I am talking nonsense to you keep up the good work!

Cheers!



Atreju,

>
> Your work is great. Personally I would not get too tied up in the
> transactional side of hive. Once you start dealing with locking and
> concurrency the problem becomes tricky.
>
> We hivers have a long time tradition on 'punting' on complicated stuff we
> do not want to deal with. :) Thus we only have 'Insert Overwrite' no 'insert
> update' :)
>
> Again, I think you wrote a really cool application. It would make a great
> use case, blog post, or a stand alone application. Call it HiveMysqlRsync or
> something :). However you mention several requirements that are specific to
> your application timestamp and primary key. If you can abstract all your
> application specific logic it could make it's way into hive. But it might be
> a stand alone program because hive to rdbms replication might be a little
> out of scope.
>
> Edward
>

Re: How to apply RDBMS table updates and deletes into Hadoop

Posted by atreju <n....@gmail.com>.
Thank you for your response. I understand... Just a few points before I
accept that this is too complicated :)

The main idea is to keep different versions of data under the same table,
similar to HBase but this is row level and you don't have to make the other
versions accessible from Hive but only the most recent one. You just need to
create an access layer to work on the most recent version of the row. If you
can think of a different way of uniquely identifying a row to know the
versions of it and timestamp (or counter or version #??) to know the most
recent one, it doesn't have to be the columns that I specified before. It
can be a different file that you create in the background (which can also be
the index file!!).

Oracle has ROWID for physical location of the row and locks it before the
data manipulation. Hadoop has advantage of storage and map-reduce. So why
not use it and keep all versions of changed data and access it via
map-reduce for the most recent one. Accessing the data can get slower over
time when there are many versions. And that can be fixed with flush or full
replication of data time to time in a maintenance window by the end user.

Hive is a great tool to access and manipulate Hadoop files. You are doing an
amazing job. I have no idea what are the complications you face each day.
Just disregard if I am talking nonsense to you keep up the good work!

Cheers!



Atreju,

>
> Your work is great. Personally I would not get too tied up in the
> transactional side of hive. Once you start dealing with locking and
> concurrency the problem becomes tricky.
>
> We hivers have a long time tradition on 'punting' on complicated stuff we
> do not want to deal with. :) Thus we only have 'Insert Overwrite' no 'insert
> update' :)
>
> Again, I think you wrote a really cool application. It would make a great
> use case, blog post, or a stand alone application. Call it HiveMysqlRsync or
> something :). However you mention several requirements that are specific to
> your application timestamp and primary key. If you can abstract all your
> application specific logic it could make it's way into hive. But it might be
> a stand alone program because hive to rdbms replication might be a little
> out of scope.
>
> Edward
>

Re: How to apply RDBMS table updates and deletes into Hadoop

Posted by Edward Capriolo <ed...@gmail.com>.
On Wed, Jun 9, 2010 at 8:29 PM, atreju <n....@gmail.com> wrote:

> Insert/Update/Delete is nothing but "put" command for another file to the
> same directory. Only problem is during "flush" that would replace the files.
> I assume it would use the similar kind of logic of Hive's "insert overwrite"
> (create the file in a temporary space and replace the Hive file(s) when MR
> output is ready). Only for that "replace" (move command?) the flush has to
> talk to Namenode to wait for currently running MR jobs to finish and put
> others on hold until the file is replaced. That is of course the high level
> idea. I am not sure if it is practical.
>
>
> On Wed, Jun 9, 2010 at 4:56 PM, Ted Yu <yu...@gmail.com> wrote:
>
>> When hive is running the map-reduce job, how do we handle concurrent
>> update/deletion/insertion ?
>>
>>
Atreju,

Your work is great. Personally I would not get too tied up in the
transactional side of hive. Once you start dealing with locking and
concurrency the problem becomes tricky.

We hivers have a long time tradition on 'punting' on complicated stuff we do
not want to deal with. :) Thus we only have 'Insert Overwrite' no 'insert
update' :)

Again, I think you wrote a really cool application. It would make a great
use case, blog post, or a stand alone application. Call it HiveMysqlRsync or
something :). However you mention several requirements that are specific to
your application timestamp and primary key. If you can abstract all your
application specific logic it could make it's way into hive. But it might be
a stand alone program because hive to rdbms replication might be a little
out of scope.

Edward

Re: How to apply RDBMS table updates and deletes into Hadoop

Posted by atreju <n....@gmail.com>.
Insert/Update/Delete is nothing but "put" command for another file to the
same directory. Only problem is during "flush" that would replace the files.
I assume it would use the similar kind of logic of Hive's "insert overwrite"
(create the file in a temporary space and replace the Hive file(s) when MR
output is ready). Only for that "replace" (move command?) the flush has to
talk to Namenode to wait for currently running MR jobs to finish and put
others on hold until the file is replaced. That is of course the high level
idea. I am not sure if it is practical.


On Wed, Jun 9, 2010 at 4:56 PM, Ted Yu <yu...@gmail.com> wrote:

> When hive is running the map-reduce job, how do we handle concurrent
> update/deletion/insertion ?
>
>

Re: How to apply RDBMS table updates and deletes into Hadoop

Posted by atreju <n....@gmail.com>.
Insert/Update/Delete is nothing but "put" command for another file to the
same directory. Only problem is during "flush" that would replace the files.
I assume it would use the similar kind of logic of Hive's "insert overwrite"
(create the file in a temporary space and replace the Hive file(s) when MR
output is ready). Only for that "replace" (move command?) the flush has to
talk to Namenode to wait for currently running MR jobs to finish and put
others on hold until the file is replaced. That is of course the high level
idea. I am not sure if it is practical.


On Wed, Jun 9, 2010 at 4:56 PM, Ted Yu <yu...@gmail.com> wrote:

> When hive is running the map-reduce job, how do we handle concurrent
> update/deletion/insertion ?
>
>

Re: How to apply RDBMS table updates and deletes into Hadoop

Posted by Ted Yu <yu...@gmail.com>.
When hive is running the map-reduce job, how do we handle concurrent
update/deletion/insertion ?

On Wed, Jun 9, 2010 at 3:32 PM, atreju <n....@gmail.com> wrote:

> As an ideal solution, I have a suggestion to Hive contributors to make it
> look like Hive is doing insert/update/delete:
>
>
> This will require a special type of table creation syntax. I will call it
> as
> "UPDATABLE TABLE". The table will have 3 special columns that are defined
> in
> the create script:
> 1. The primary key column. (let's say: col_pk)
> 2. BIGINT type date column that shows the ms from Jan 1st, 1970 to actual
> data manipulation date/time in RDMBS. (dml_date)
> 3. TINYINT or BOOLEAN type column that will store 0 if the record is
> deleted
> and 1 if it is inserted or updated. (dml_action)
>
> This will require the RDBMS table to have PK and last update date column
> and
> deletes recorded in some other table by pk and date.
>
> On Day 1, the entire table is put into Hadoop, with addition of 2 extra
> columns: dml_date (bigint) and dml_action.
>
> On Day 2, we first find the max of dml_date from Hive table. Then we query
> from RDBMS inserts/updates/deletes since that date/time and write into a
> file with the correct dml_date/dml_action. The file goes to the same folder
> that our Hive table is in.
>
> Right now, if on Day 1 we had 100 rows and on Day 2, 10 rows a regular Hive
> table would show 110 rows. But, since this is a special table (UPDATABLE
> TABLE), every time this table is queried in Hive, Hive first run a
> map-reduce that would find the most recent (max(dml_date)) row per pk
> (group
> by col_pk) that is not deleted (dml_action!=0) and use that output in the
> user's query. That is the big idea!!
>
> Hive can have Insert/Update/Delete commands that would do nothing but
> create
> a file with rows of manipulated data with correct date and action.
>
> There can be a special "flush" kind of command that runs the MR and
> replaces
> all files in the table directory with single file. That can run weekly,
> monthly or may be after each time dml data received from RDBMS.
>
> Sqoop can have Hive interface that saves certain table attributes like pk
> column, RDBMS connection info,... and with one command from Hive, the Hive
> table gets updated from RDBMS....
>
> What do you think?
>
>
>
> On Tue, Jun 8, 2010 at 3:58 PM, Aaron Kimball <aa...@cloudera.com> wrote:
>
> > I think that this might be the way to go. In general, folding updates and
> > deletes into datasets is a difficult problem due to the append-only
> nature
> > of datasets.
> >
> > Something that might help you here is to partition your tables in Hive
> > based on some well-distributed key. Then if you have a relatively small
> > number of partitions affected by an incremental import (perhaps more
> > recently-imported records are more likely to be updated? in this case,
> > partition the tables by the month/week you imported them?) you can only
> > perform the fold-in of the new deltas on the affected partitions. This
> > should be much faster than a full table scan.
> >
> > Have you seen the Sqoop tool? It handles imports and exports between HDFS
> > (and Hive) and RDBMS systems --  but currently can only import new
> records
> > (and subsequent INSERTs); it can't handle updates/deletes. Sqoop is
> > available at http://github.com/cloudera/sqoop -- it doesn't run on
> Apache
> > 0.20.3, but works on CDH (Cloudera's Distribution for Hadoop) and Hadoop
> > 0.21/trunk.
> >
> > This sort of capability is something I'm really interested in adding to
> > Sqoop. If you've got a well-run process for doing this, I'd really
> > appreciate your help adding this feature :) Send me an email off-list if
> > you're interested. At the very least, I'd urge you to try out the tool.
> >
> > Cheers,
> > - Aaron Kimball
> >
> >
> > On Tue, Jun 8, 2010 at 8:54 PM, atreju <n....@gmail.com> wrote:
> >
> >> To generate smart output from base data we need to copy some base tables
> >> from relational database into Hadoop. Some of them are big. To dump the
> >> entire table into Hadoop everyday is not an option since there are like
> 30+
> >> tables and each would take several hours.
> >>
> >> The methodology that we approached is to get the entire table dump
> first.
> >> Then each day or every 4-6 hours get only insert/update/delete since the
> >> last copy from RDBMS (based on a date field in the table). Using Hive do
> >> outer join + union the new data with existing data and write into a new
> >> file. For example, if there are a 100 rows in Hadoop, and in RDBMS 3
> records
> >> inserted, 2 records updated and 1 deleted since the last Hadoop copy,
> then
> >> the Hive query will get 97 of the not changed data + 3 inserts + 2
> updates
> >> and write into a new file. The other applications like Pig or Hive will
> pick
> >> the most recent file to use when selecting/loading data from those base
> >> table data files.
> >>
> >> This logic is working fine in lower environments for small size tables.
> >> With production data, for about 30GB size table, the incremental
> >> re-generation of the file in Hadoop is still taking several hours. I
> tried
> >> using zipped version and it took even longer time. I am not convinced
> that
> >> this is the best we can do to handle updates and deletes since we had to
> >> re-write 29GB unchanged data of the 30GB file again into a new file.
> ...and
> >> this is not the biggest table.
> >>
> >> I am thinking that this should be problem for many companies. What are
> the
> >> other approaches to apply updates and deletes on base tables to the
> >> Hadoop data files?
> >>
> >> We have 4 data nodes and using version 20.3.
> >>
> >> Thanks!
> >>
> >>
> >
> >
>

Re: How to apply RDBMS table updates and deletes into Hadoop

Posted by atreju <n....@gmail.com>.
As an ideal solution, I have a suggestion to Hive contributors to make it
look like Hive is doing insert/update/delete:


This will require a special type of table creation syntax. I will call it as
"UPDATABLE TABLE". The table will have 3 special columns that are defined in
the create script:
1. The primary key column. (let's say: col_pk)
2. BIGINT type date column that shows the ms from Jan 1st, 1970 to actual
data manipulation date/time in RDMBS. (dml_date)
3. TINYINT or BOOLEAN type column that will store 0 if the record is deleted
and 1 if it is inserted or updated. (dml_action)

This will require the RDBMS table to have PK and last update date column and
deletes recorded in some other table by pk and date.

On Day 1, the entire table is put into Hadoop, with addition of 2 extra
columns: dml_date (bigint) and dml_action.

On Day 2, we first find the max of dml_date from Hive table. Then we query
from RDBMS inserts/updates/deletes since that date/time and write into a
file with the correct dml_date/dml_action. The file goes to the same folder
that our Hive table is in.

Right now, if on Day 1 we had 100 rows and on Day 2, 10 rows a regular Hive
table would show 110 rows. But, since this is a special table (UPDATABLE
TABLE), every time this table is queried in Hive, Hive first run a
map-reduce that would find the most recent (max(dml_date)) row per pk (group
by col_pk) that is not deleted (dml_action!=0) and use that output in the
user's query. That is the big idea!!

Hive can have Insert/Update/Delete commands that would do nothing but create
a file with rows of manipulated data with correct date and action.

There can be a special "flush" kind of command that runs the MR and replaces
all files in the table directory with single file. That can run weekly,
monthly or may be after each time dml data received from RDBMS.

Sqoop can have Hive interface that saves certain table attributes like pk
column, RDBMS connection info,... and with one command from Hive, the Hive
table gets updated from RDBMS....

What do you think?



On Tue, Jun 8, 2010 at 3:58 PM, Aaron Kimball <aa...@cloudera.com> wrote:

> I think that this might be the way to go. In general, folding updates and
> deletes into datasets is a difficult problem due to the append-only nature
> of datasets.
>
> Something that might help you here is to partition your tables in Hive
> based on some well-distributed key. Then if you have a relatively small
> number of partitions affected by an incremental import (perhaps more
> recently-imported records are more likely to be updated? in this case,
> partition the tables by the month/week you imported them?) you can only
> perform the fold-in of the new deltas on the affected partitions. This
> should be much faster than a full table scan.
>
> Have you seen the Sqoop tool? It handles imports and exports between HDFS
> (and Hive) and RDBMS systems --  but currently can only import new records
> (and subsequent INSERTs); it can't handle updates/deletes. Sqoop is
> available at http://github.com/cloudera/sqoop -- it doesn't run on Apache
> 0.20.3, but works on CDH (Cloudera's Distribution for Hadoop) and Hadoop
> 0.21/trunk.
>
> This sort of capability is something I'm really interested in adding to
> Sqoop. If you've got a well-run process for doing this, I'd really
> appreciate your help adding this feature :) Send me an email off-list if
> you're interested. At the very least, I'd urge you to try out the tool.
>
> Cheers,
> - Aaron Kimball
>
>
> On Tue, Jun 8, 2010 at 8:54 PM, atreju <n....@gmail.com> wrote:
>
>> To generate smart output from base data we need to copy some base tables
>> from relational database into Hadoop. Some of them are big. To dump the
>> entire table into Hadoop everyday is not an option since there are like 30+
>> tables and each would take several hours.
>>
>> The methodology that we approached is to get the entire table dump first.
>> Then each day or every 4-6 hours get only insert/update/delete since the
>> last copy from RDBMS (based on a date field in the table). Using Hive do
>> outer join + union the new data with existing data and write into a new
>> file. For example, if there are a 100 rows in Hadoop, and in RDBMS 3 records
>> inserted, 2 records updated and 1 deleted since the last Hadoop copy, then
>> the Hive query will get 97 of the not changed data + 3 inserts + 2 updates
>> and write into a new file. The other applications like Pig or Hive will pick
>> the most recent file to use when selecting/loading data from those base
>> table data files.
>>
>> This logic is working fine in lower environments for small size tables.
>> With production data, for about 30GB size table, the incremental
>> re-generation of the file in Hadoop is still taking several hours. I tried
>> using zipped version and it took even longer time. I am not convinced that
>> this is the best we can do to handle updates and deletes since we had to
>> re-write 29GB unchanged data of the 30GB file again into a new file. ...and
>> this is not the biggest table.
>>
>> I am thinking that this should be problem for many companies. What are the
>> other approaches to apply updates and deletes on base tables to the
>> Hadoop data files?
>>
>> We have 4 data nodes and using version 20.3.
>>
>> Thanks!
>>
>>
>
>

Re: How to apply RDBMS table updates and deletes into Hadoop

Posted by atreju <n....@gmail.com>.
As an ideal solution, I have a suggestion to Hive contributors to make it
look like Hive is doing insert/update/delete:


This will require a special type of table creation syntax. I will call it as
"UPDATABLE TABLE". The table will have 3 special columns that are defined in
the create script:
1. The primary key column. (let's say: col_pk)
2. BIGINT type date column that shows the ms from Jan 1st, 1970 to actual
data manipulation date/time in RDMBS. (dml_date)
3. TINYINT or BOOLEAN type column that will store 0 if the record is deleted
and 1 if it is inserted or updated. (dml_action)

This will require the RDBMS table to have PK and last update date column and
deletes recorded in some other table by pk and date.

On Day 1, the entire table is put into Hadoop, with addition of 2 extra
columns: dml_date (bigint) and dml_action.

On Day 2, we first find the max of dml_date from Hive table. Then we query
from RDBMS inserts/updates/deletes since that date/time and write into a
file with the correct dml_date/dml_action. The file goes to the same folder
that our Hive table is in.

Right now, if on Day 1 we had 100 rows and on Day 2, 10 rows a regular Hive
table would show 110 rows. But, since this is a special table (UPDATABLE
TABLE), every time this table is queried in Hive, Hive first run a
map-reduce that would find the most recent (max(dml_date)) row per pk (group
by col_pk) that is not deleted (dml_action!=0) and use that output in the
user's query. That is the big idea!!

Hive can have Insert/Update/Delete commands that would do nothing but create
a file with rows of manipulated data with correct date and action.

There can be a special "flush" kind of command that runs the MR and replaces
all files in the table directory with single file. That can run weekly,
monthly or may be after each time dml data received from RDBMS.

Sqoop can have Hive interface that saves certain table attributes like pk
column, RDBMS connection info,... and with one command from Hive, the Hive
table gets updated from RDBMS....

What do you think?



On Tue, Jun 8, 2010 at 3:58 PM, Aaron Kimball <aa...@cloudera.com> wrote:

> I think that this might be the way to go. In general, folding updates and
> deletes into datasets is a difficult problem due to the append-only nature
> of datasets.
>
> Something that might help you here is to partition your tables in Hive
> based on some well-distributed key. Then if you have a relatively small
> number of partitions affected by an incremental import (perhaps more
> recently-imported records are more likely to be updated? in this case,
> partition the tables by the month/week you imported them?) you can only
> perform the fold-in of the new deltas on the affected partitions. This
> should be much faster than a full table scan.
>
> Have you seen the Sqoop tool? It handles imports and exports between HDFS
> (and Hive) and RDBMS systems --  but currently can only import new records
> (and subsequent INSERTs); it can't handle updates/deletes. Sqoop is
> available at http://github.com/cloudera/sqoop -- it doesn't run on Apache
> 0.20.3, but works on CDH (Cloudera's Distribution for Hadoop) and Hadoop
> 0.21/trunk.
>
> This sort of capability is something I'm really interested in adding to
> Sqoop. If you've got a well-run process for doing this, I'd really
> appreciate your help adding this feature :) Send me an email off-list if
> you're interested. At the very least, I'd urge you to try out the tool.
>
> Cheers,
> - Aaron Kimball
>
>
> On Tue, Jun 8, 2010 at 8:54 PM, atreju <n....@gmail.com> wrote:
>
>> To generate smart output from base data we need to copy some base tables
>> from relational database into Hadoop. Some of them are big. To dump the
>> entire table into Hadoop everyday is not an option since there are like 30+
>> tables and each would take several hours.
>>
>> The methodology that we approached is to get the entire table dump first.
>> Then each day or every 4-6 hours get only insert/update/delete since the
>> last copy from RDBMS (based on a date field in the table). Using Hive do
>> outer join + union the new data with existing data and write into a new
>> file. For example, if there are a 100 rows in Hadoop, and in RDBMS 3 records
>> inserted, 2 records updated and 1 deleted since the last Hadoop copy, then
>> the Hive query will get 97 of the not changed data + 3 inserts + 2 updates
>> and write into a new file. The other applications like Pig or Hive will pick
>> the most recent file to use when selecting/loading data from those base
>> table data files.
>>
>> This logic is working fine in lower environments for small size tables.
>> With production data, for about 30GB size table, the incremental
>> re-generation of the file in Hadoop is still taking several hours. I tried
>> using zipped version and it took even longer time. I am not convinced that
>> this is the best we can do to handle updates and deletes since we had to
>> re-write 29GB unchanged data of the 30GB file again into a new file. ...and
>> this is not the biggest table.
>>
>> I am thinking that this should be problem for many companies. What are the
>> other approaches to apply updates and deletes on base tables to the
>> Hadoop data files?
>>
>> We have 4 data nodes and using version 20.3.
>>
>> Thanks!
>>
>>
>
>

Re: How to apply RDBMS table updates and deletes into Hadoop

Posted by Yongqiang He <he...@gmail.com>.
Hi,

I think hive¹s join + transform could be helpful here.

Thanks
Yongqiang
On 6/8/10 3:58 PM, "Aaron Kimball" <aa...@cloudera.com> wrote:

> I think that this might be the way to go. In general, folding updates and
> deletes into datasets is a difficult problem due to the append-only nature of
> datasets.
> 
> Something that might help you here is to partition your tables in Hive based
> on some well-distributed key. Then if you have a relatively small number of
> partitions affected by an incremental import (perhaps more recently-imported
> records are more likely to be updated? in this case, partition the tables by
> the month/week you imported them?) you can only perform the fold-in of the new
> deltas on the affected partitions. This should be much faster than a full
> table scan.
> 
> Have you seen the Sqoop tool? It handles imports and exports between HDFS (and
> Hive) and RDBMS systems --  but currently can only import new records (and
> subsequent INSERTs); it can't handle updates/deletes. Sqoop is available at
> http://github.com/cloudera/sqoop -- it doesn't run on Apache 0.20.3, but works
> on CDH (Cloudera's Distribution for Hadoop) and Hadoop 0.21/trunk.
> 
> This sort of capability is something I'm really interested in adding to Sqoop.
> If you've got a well-run process for doing this, I'd really appreciate your
> help adding this feature :) Send me an email off-list if you're interested. At
> the very least, I'd urge you to try out the tool.
> 
> Cheers,
> - Aaron Kimball
> 
> On Tue, Jun 8, 2010 at 8:54 PM, atreju <n....@gmail.com> wrote:
>> To generate smart output from base data we need to copy some base tables from
>> relational database into Hadoop. Some of them are big. To dump the entire
>> table into Hadoop everyday is not an option since there are like 30+ tables
>> and each would take several hours.
>>  
>> The methodology that we approached is to get the entire table dump first.
>> Then each day or every 4-6 hours get only insert/update/delete since the last
>> copy from RDBMS (based on a date field in the table). Using Hive do outer
>> join + union the new data with existing data and write into a new file. For
>> example, if there are a 100 rows in Hadoop, and in RDBMS 3 records inserted,
>> 2 records updated and 1 deleted since the last Hadoop copy, then the Hive
>> query will get 97 of the not changed data + 3 inserts + 2 updates and write
>> into a new file. The other applications like Pig or Hive will pick the most
>> recent file to use when selecting/loading data from those base table data
>> files.
>>  
>> This logic is working fine in lower environments for small size tables. With
>> production data, for about 30GB size table, the incremental re-generation of
>> the file in Hadoop is still taking several hours. I tried using zipped
>> version and it took even longer time. I am not convinced that this is the
>> best we can do to handle updates and deletes since we had to re-write 29GB
>> unchanged data of the 30GB file again into a new file. ...and this is not the
>> biggest table.
>>  
>> I am thinking that this should be problem for many companies. What are the
>> other approaches to apply updates and deletes on base tables to the
>> Hadoop data files?
>>  
>> We have 4 data nodes and using version 20.3.
>>  
>> Thanks!
>>  
> 
> 


Re: How to apply RDBMS table updates and deletes into Hadoop

Posted by Aaron Kimball <aa...@cloudera.com>.
I think that this might be the way to go. In general, folding updates and
deletes into datasets is a difficult problem due to the append-only nature
of datasets.

Something that might help you here is to partition your tables in Hive based
on some well-distributed key. Then if you have a relatively small number of
partitions affected by an incremental import (perhaps more recently-imported
records are more likely to be updated? in this case, partition the tables by
the month/week you imported them?) you can only perform the fold-in of the
new deltas on the affected partitions. This should be much faster than a
full table scan.

Have you seen the Sqoop tool? It handles imports and exports between HDFS
(and Hive) and RDBMS systems --  but currently can only import new records
(and subsequent INSERTs); it can't handle updates/deletes. Sqoop is
available at http://github.com/cloudera/sqoop -- it doesn't run on Apache
0.20.3, but works on CDH (Cloudera's Distribution for Hadoop) and Hadoop
0.21/trunk.

This sort of capability is something I'm really interested in adding to
Sqoop. If you've got a well-run process for doing this, I'd really
appreciate your help adding this feature :) Send me an email off-list if
you're interested. At the very least, I'd urge you to try out the tool.

Cheers,
- Aaron Kimball

On Tue, Jun 8, 2010 at 8:54 PM, atreju <n....@gmail.com> wrote:

> To generate smart output from base data we need to copy some base tables
> from relational database into Hadoop. Some of them are big. To dump the
> entire table into Hadoop everyday is not an option since there are like 30+
> tables and each would take several hours.
>
> The methodology that we approached is to get the entire table dump first.
> Then each day or every 4-6 hours get only insert/update/delete since the
> last copy from RDBMS (based on a date field in the table). Using Hive do
> outer join + union the new data with existing data and write into a new
> file. For example, if there are a 100 rows in Hadoop, and in RDBMS 3 records
> inserted, 2 records updated and 1 deleted since the last Hadoop copy, then
> the Hive query will get 97 of the not changed data + 3 inserts + 2 updates
> and write into a new file. The other applications like Pig or Hive will pick
> the most recent file to use when selecting/loading data from those base
> table data files.
>
> This logic is working fine in lower environments for small size tables.
> With production data, for about 30GB size table, the incremental
> re-generation of the file in Hadoop is still taking several hours. I tried
> using zipped version and it took even longer time. I am not convinced that
> this is the best we can do to handle updates and deletes since we had to
> re-write 29GB unchanged data of the 30GB file again into a new file. ...and
> this is not the biggest table.
>
> I am thinking that this should be problem for many companies. What are the
> other approaches to apply updates and deletes on base tables to the
> Hadoop data files?
>
> We have 4 data nodes and using version 20.3.
>
> Thanks!
>
>

Re: How to apply RDBMS table updates and deletes into Hadoop

Posted by Aaron Kimball <aa...@cloudera.com>.
I think that this might be the way to go. In general, folding updates and
deletes into datasets is a difficult problem due to the append-only nature
of datasets.

Something that might help you here is to partition your tables in Hive based
on some well-distributed key. Then if you have a relatively small number of
partitions affected by an incremental import (perhaps more recently-imported
records are more likely to be updated? in this case, partition the tables by
the month/week you imported them?) you can only perform the fold-in of the
new deltas on the affected partitions. This should be much faster than a
full table scan.

Have you seen the Sqoop tool? It handles imports and exports between HDFS
(and Hive) and RDBMS systems --  but currently can only import new records
(and subsequent INSERTs); it can't handle updates/deletes. Sqoop is
available at http://github.com/cloudera/sqoop -- it doesn't run on Apache
0.20.3, but works on CDH (Cloudera's Distribution for Hadoop) and Hadoop
0.21/trunk.

This sort of capability is something I'm really interested in adding to
Sqoop. If you've got a well-run process for doing this, I'd really
appreciate your help adding this feature :) Send me an email off-list if
you're interested. At the very least, I'd urge you to try out the tool.

Cheers,
- Aaron Kimball

On Tue, Jun 8, 2010 at 8:54 PM, atreju <n....@gmail.com> wrote:

> To generate smart output from base data we need to copy some base tables
> from relational database into Hadoop. Some of them are big. To dump the
> entire table into Hadoop everyday is not an option since there are like 30+
> tables and each would take several hours.
>
> The methodology that we approached is to get the entire table dump first.
> Then each day or every 4-6 hours get only insert/update/delete since the
> last copy from RDBMS (based on a date field in the table). Using Hive do
> outer join + union the new data with existing data and write into a new
> file. For example, if there are a 100 rows in Hadoop, and in RDBMS 3 records
> inserted, 2 records updated and 1 deleted since the last Hadoop copy, then
> the Hive query will get 97 of the not changed data + 3 inserts + 2 updates
> and write into a new file. The other applications like Pig or Hive will pick
> the most recent file to use when selecting/loading data from those base
> table data files.
>
> This logic is working fine in lower environments for small size tables.
> With production data, for about 30GB size table, the incremental
> re-generation of the file in Hadoop is still taking several hours. I tried
> using zipped version and it took even longer time. I am not convinced that
> this is the best we can do to handle updates and deletes since we had to
> re-write 29GB unchanged data of the 30GB file again into a new file. ...and
> this is not the biggest table.
>
> I am thinking that this should be problem for many companies. What are the
> other approaches to apply updates and deletes on base tables to the
> Hadoop data files?
>
> We have 4 data nodes and using version 20.3.
>
> Thanks!
>
>

Re: How to apply RDBMS table updates and deletes into Hadoop

Posted by Maxim Veksler <ma...@vekslers.org>.
Hi,

On Wed, Jun 9, 2010 at 9:26 PM, Edward Capriolo <ed...@gmail.com>wrote:
>
> On Tue, Jun 8, 2010 at 2:54 PM, atreju <n....@gmail.com> wrote:
>
>> To generate smart output from base data we need to copy some base tables
>> from relational database into Hadoop. Some of them are big. To dump the
>> entire table into Hadoop everyday is not an option since there are like 30+
>> tables and each would take several hours.
>>
>> The methodology that we approached is to get the entire table dump first.
>> Then each day or every 4-6 hours get only insert/update/delete since the
>> last copy from RDBMS (based on a date field in the table). Using Hive do
>> outer join + union the new data with existing data and write into a new
>> file. For example, if there are a 100 rows in Hadoop, and in RDBMS 3 records
>> inserted, 2 records updated and 1 deleted since the last Hadoop copy, then
>> the Hive query will get 97 of the not changed data + 3 inserts + 2 updates
>> and write into a new file. The other applications like Pig or Hive will pick
>> the most recent file to use when selecting/loading data from those base
>> table data files.
>>
>
This solution is very interesting.

Could you please further describe the logic for filtering out the deleted
record and how do you handle UPDATE for existing records in Hive (hadoop
files).

Thank you,
Maxim.

Re: How to apply RDBMS table updates and deletes into Hadoop

Posted by Edward Capriolo <ed...@gmail.com>.
On Tue, Jun 8, 2010 at 2:54 PM, atreju <n....@gmail.com> wrote:

> To generate smart output from base data we need to copy some base tables
> from relational database into Hadoop. Some of them are big. To dump the
> entire table into Hadoop everyday is not an option since there are like 30+
> tables and each would take several hours.
>
> The methodology that we approached is to get the entire table dump first.
> Then each day or every 4-6 hours get only insert/update/delete since the
> last copy from RDBMS (based on a date field in the table). Using Hive do
> outer join + union the new data with existing data and write into a new
> file. For example, if there are a 100 rows in Hadoop, and in RDBMS 3 records
> inserted, 2 records updated and 1 deleted since the last Hadoop copy, then
> the Hive query will get 97 of the not changed data + 3 inserts + 2 updates
> and write into a new file. The other applications like Pig or Hive will pick
> the most recent file to use when selecting/loading data from those base
> table data files.
>
> This logic is working fine in lower environments for small size tables.
> With production data, for about 30GB size table, the incremental
> re-generation of the file in Hadoop is still taking several hours. I tried
> using zipped version and it took even longer time. I am not convinced that
> this is the best we can do to handle updates and deletes since we had to
> re-write 29GB unchanged data of the 30GB file again into a new file. ...and
> this is not the biggest table.
>
> I am thinking that this should be problem for many companies. What are the
> other approaches to apply updates and deletes on base tables to the
> Hadoop data files?
>
> We have 4 data nodes and using version 20.3.
>
> Thanks!
>
>

Very interesting. An important note about compression is you have to
carefully chose the codec(gzip, lzo), blocksize(1-N), compression type
(block,record) etc. Chosing the wrong compression variables results in
anti-compression, large files and slow performance.

You said:
for about 30GB size table, the incremental re-generation of the file in
Hadoop is still taking several hours.

It sounds like you have worked up a solution that will scale. With the
exception of your backend Database (mysql/whatever), you should be able to
up the number of Data Nodes Task Trackers and get (linear) performans gains.
Add some nodes and run see if you run time shrinks.

I am intereted to know. Great work. Very cool.