You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Flavio Pompermaier <po...@okkam.it> on 2014/11/04 09:27:25 UTC

Flink Mongodb

Hi to all,

I saw this post
https://flink.incubator.apache.org/news/2014/01/28/querying_mongodb.html
but it use the old APIs (HadoopDataSource instead of DataSource).
How can I use Mongodb with the new Flink APIs?

Best,
Flavio

Re: Flink Mongodb

Posted by Stephan Ewen <se...@apache.org>.
InputSplits are assigned lazily at runtime, which gives you many of the
benefits of re-assigning without the nastyness.

Can you write the logic that creates the splits such that it creates
multiple splits per region? Then the lazy assignment will make sure that
workers that get a larger split will get get additional splits than workers
that get smaller splits...
Hmm, that's good question indeed. I am not familiar with HBase's mode of
operation.
I would assume, that HBase uses range partitioning to partition a table
into regions. That way it is rather easy to balance the size of regions, as
long as there is no single key that occurs very often. I am not sure if it
is possible to overcome data skew cause by frequent keys.
However as I said, these are just assumption. I will have a look at HBase's
internals for verification.

In any case, Flink does currently not support reassigning or splitting
of InputSplits at runtime.
Also initially generating balanced InputSplits willl be tricky. That would
be possible if we can efficiently determine the "density" of a key range
when creating the InputSplits. However, I'm a bit skeptical that this can
be done...

2014-11-04 17:33 GMT+01:00 Flavio Pompermaier <po...@okkam.it>:

> From what I know HBase manages the regions but the fact that they are
> evenly distributed depends on a well-designed key..
> if it is not the case you could encounter very unbalanced regions (i.e.
> hot spotting).
>
> Could it be a good idea to create a split policy that compares the size of
> all the splits and generate equally-sized split that can be reassigned to
> free worker if the original assigned one is still busy?
>
> On Tue, Nov 4, 2014 at 5:18 PM, Fabian Hueske <fh...@apache.org> wrote:
>
>> ad 1) HBase manages the regions and should also take care of their
>> uniform size.
>> as 2) Dynamically changing InputSplits is not possible at the moment.
>> However, the input split generation of the IF should also be able to handle
>> such issues upfront. In fact, the IF could also generate multiple splits
>> per region (this would be necessary to make sure that the minimum number of
>> splits is generated if there are less regions than required splits).
>>
>> 2014-11-04 17:04 GMT+01:00 Flavio Pompermaier <po...@okkam.it>:
>>
>>> Ok, thanks for the explanation!
>>> That was more or less like I thought it should be but there are still
>>> points I'd like to clarify:
>>>
>>> 1 - What if a region is very big and there are other regions very
>>> small..? There will be one slot that takes a very long time while the
>>> others will stay inactive..
>>> 2 - Do you think it is possible to implement this in an adaptive way
>>> (stop processing of huge region if it worth it and assign remaining data to
>>> inactive task managers)?
>>>
>>>
>>> On Tue, Nov 4, 2014 at 4:37 PM, Fabian Hueske <fh...@apache.org>
>>> wrote:
>>>
>>>> Local split assignment preferably assigns input split to workers that
>>>> can locally read the data of an input split.
>>>> For example, HDFS stores file chunks (blocks) distributed over the
>>>> cluster and gives access to these chunks to every worker via network
>>>> transfer. However, if a chunk is read from a process that runs on the same
>>>> node as the chunk is stored, the read operation directly accesses the local
>>>> file system without going over the network. Hence, it is essential to
>>>> assign input splits based on the locality of their data if you want to have
>>>> reasonably performance. We call this local split assignment. This is a
>>>> general concept of all data parallel systems including Hadoop, Spark, and
>>>> Flink.
>>>>
>>>> This issue is not related to serializability of input formats.
>>>> I assume that the wrapped MongoIF is also not capable of local split
>>>> assignment.
>>>>
>>>> Am Dienstag, 4. November 2014 schrieb Flavio Pompermaier :
>>>>
>>>>> What do you mean for  "might lack support for local split
>>>>> assignment"? You mean that InputFormat is not serializable? This
>>>>> instead is not true for Mongodb?
>>>>>
>>>>> On Tue, Nov 4, 2014 at 10:00 AM, Fabian Hueske <fh...@apache.org>
>>>>> wrote:
>>>>>
>>>>>> There's a page about Hadoop Compatibility that shows how to use the
>>>>>> wrapper.
>>>>>>
>>>>>> The HBase format should work as well, but might lack support for
>>>>>> local split assignment. In that case performance would suffer a lot.
>>>>>>
>>>>>> Am Dienstag, 4. November 2014 schrieb Flavio Pompermaier :
>>>>>>
>>>>>>> Should I start from
>>>>>>> http://flink.incubator.apache.org/docs/0.7-incubating/example_connectors.html
>>>>>>> ? Is it ok?
>>>>>>> Thus, in principle, also the TableInputFormat of HBase could be used
>>>>>>> in a similar way..isn't it?
>>>>>>>
>>>>>>> On Tue, Nov 4, 2014 at 9:42 AM, Fabian Hueske <fh...@apache.org>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>> the blog post uses Flinks wrapper for Hadoop InputFormats.
>>>>>>>> This has been ported to the new API and is described in the
>>>>>>>> documentation.
>>>>>>>>
>>>>>>>> So you just need to take Mongos Hadoop IF and plug it into the new
>>>>>>>> IF wrapper. :-)
>>>>>>>>
>>>>>>>> Fabian
>>>>>>>>
>>>>>>>> Am Dienstag, 4. November 2014 schrieb Flavio Pompermaier :
>>>>>>>>
>>>>>>>> Hi to all,
>>>>>>>>>
>>>>>>>>> I saw this post
>>>>>>>>> https://flink.incubator.apache.org/news/2014/01/28/querying_mongodb.html
>>>>>>>>> but it use the old APIs (HadoopDataSource instead of DataSource).
>>>>>>>>> How can I use Mongodb with the new Flink APIs?
>>>>>>>>>
>>>>>>>>> Best,
>>>>>>>>> Flavio
>>>>>>>>>
>>>>>>>>
>>>>>>>  .
>
>

Re: Flink Mongodb

Posted by Fabian Hueske <fh...@apache.org>.
Hmm, that's good question indeed. I am not familiar with HBase's mode of
operation.
I would assume, that HBase uses range partitioning to partition a table
into regions. That way it is rather easy to balance the size of regions, as
long as there is no single key that occurs very often. I am not sure if it
is possible to overcome data skew cause by frequent keys.
However as I said, these are just assumption. I will have a look at HBase's
internals for verification.

In any case, Flink does currently not support reassigning or splitting
of InputSplits at runtime.
Also initially generating balanced InputSplits willl be tricky. That would
be possible if we can efficiently determine the "density" of a key range
when creating the InputSplits. However, I'm a bit skeptical that this can
be done...

2014-11-04 17:33 GMT+01:00 Flavio Pompermaier <po...@okkam.it>:

> From what I know HBase manages the regions but the fact that they are
> evenly distributed depends on a well-designed key..
> if it is not the case you could encounter very unbalanced regions (i.e.
> hot spotting).
>
> Could it be a good idea to create a split policy that compares the size of
> all the splits and generate equally-sized split that can be reassigned to
> free worker if the original assigned one is still busy?
>
> On Tue, Nov 4, 2014 at 5:18 PM, Fabian Hueske <fh...@apache.org> wrote:
>
>> ad 1) HBase manages the regions and should also take care of their
>> uniform size.
>> as 2) Dynamically changing InputSplits is not possible at the moment.
>> However, the input split generation of the IF should also be able to handle
>> such issues upfront. In fact, the IF could also generate multiple splits
>> per region (this would be necessary to make sure that the minimum number of
>> splits is generated if there are less regions than required splits).
>>
>> 2014-11-04 17:04 GMT+01:00 Flavio Pompermaier <po...@okkam.it>:
>>
>>> Ok, thanks for the explanation!
>>> That was more or less like I thought it should be but there are still
>>> points I'd like to clarify:
>>>
>>> 1 - What if a region is very big and there are other regions very
>>> small..? There will be one slot that takes a very long time while the
>>> others will stay inactive..
>>> 2 - Do you think it is possible to implement this in an adaptive way
>>> (stop processing of huge region if it worth it and assign remaining data to
>>> inactive task managers)?
>>>
>>>
>>> On Tue, Nov 4, 2014 at 4:37 PM, Fabian Hueske <fh...@apache.org>
>>> wrote:
>>>
>>>> Local split assignment preferably assigns input split to workers that
>>>> can locally read the data of an input split.
>>>> For example, HDFS stores file chunks (blocks) distributed over the
>>>> cluster and gives access to these chunks to every worker via network
>>>> transfer. However, if a chunk is read from a process that runs on the same
>>>> node as the chunk is stored, the read operation directly accesses the local
>>>> file system without going over the network. Hence, it is essential to
>>>> assign input splits based on the locality of their data if you want to have
>>>> reasonably performance. We call this local split assignment. This is a
>>>> general concept of all data parallel systems including Hadoop, Spark, and
>>>> Flink.
>>>>
>>>> This issue is not related to serializability of input formats.
>>>> I assume that the wrapped MongoIF is also not capable of local split
>>>> assignment.
>>>>
>>>> Am Dienstag, 4. November 2014 schrieb Flavio Pompermaier :
>>>>
>>>>> What do you mean for  "might lack support for local split
>>>>> assignment"? You mean that InputFormat is not serializable? This
>>>>> instead is not true for Mongodb?
>>>>>
>>>>> On Tue, Nov 4, 2014 at 10:00 AM, Fabian Hueske <fh...@apache.org>
>>>>> wrote:
>>>>>
>>>>>> There's a page about Hadoop Compatibility that shows how to use the
>>>>>> wrapper.
>>>>>>
>>>>>> The HBase format should work as well, but might lack support for
>>>>>> local split assignment. In that case performance would suffer a lot.
>>>>>>
>>>>>> Am Dienstag, 4. November 2014 schrieb Flavio Pompermaier :
>>>>>>
>>>>>>> Should I start from
>>>>>>> http://flink.incubator.apache.org/docs/0.7-incubating/example_connectors.html
>>>>>>> ? Is it ok?
>>>>>>> Thus, in principle, also the TableInputFormat of HBase could be used
>>>>>>> in a similar way..isn't it?
>>>>>>>
>>>>>>> On Tue, Nov 4, 2014 at 9:42 AM, Fabian Hueske <fh...@apache.org>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>> the blog post uses Flinks wrapper for Hadoop InputFormats.
>>>>>>>> This has been ported to the new API and is described in the
>>>>>>>> documentation.
>>>>>>>>
>>>>>>>> So you just need to take Mongos Hadoop IF and plug it into the new
>>>>>>>> IF wrapper. :-)
>>>>>>>>
>>>>>>>> Fabian
>>>>>>>>
>>>>>>>> Am Dienstag, 4. November 2014 schrieb Flavio Pompermaier :
>>>>>>>>
>>>>>>>> Hi to all,
>>>>>>>>>
>>>>>>>>> I saw this post
>>>>>>>>> https://flink.incubator.apache.org/news/2014/01/28/querying_mongodb.html
>>>>>>>>> but it use the old APIs (HadoopDataSource instead of DataSource).
>>>>>>>>> How can I use Mongodb with the new Flink APIs?
>>>>>>>>>
>>>>>>>>> Best,
>>>>>>>>> Flavio
>>>>>>>>>
>>>>>>>>
>>>>>>>  .
>
>

Re: Flink Mongodb

Posted by Flavio Pompermaier <po...@okkam.it>.
>From what I know HBase manages the regions but the fact that they are
evenly distributed depends on a well-designed key..
if it is not the case you could encounter very unbalanced regions (i.e. hot
spotting).

Could it be a good idea to create a split policy that compares the size of
all the splits and generate equally-sized split that can be reassigned to
free worker if the original assigned one is still busy?

On Tue, Nov 4, 2014 at 5:18 PM, Fabian Hueske <fh...@apache.org> wrote:

> ad 1) HBase manages the regions and should also take care of their
> uniform size.
> as 2) Dynamically changing InputSplits is not possible at the moment.
> However, the input split generation of the IF should also be able to handle
> such issues upfront. In fact, the IF could also generate multiple splits
> per region (this would be necessary to make sure that the minimum number of
> splits is generated if there are less regions than required splits).
>
> 2014-11-04 17:04 GMT+01:00 Flavio Pompermaier <po...@okkam.it>:
>
>> Ok, thanks for the explanation!
>> That was more or less like I thought it should be but there are still
>> points I'd like to clarify:
>>
>> 1 - What if a region is very big and there are other regions very
>> small..? There will be one slot that takes a very long time while the
>> others will stay inactive..
>> 2 - Do you think it is possible to implement this in an adaptive way
>> (stop processing of huge region if it worth it and assign remaining data to
>> inactive task managers)?
>>
>>
>> On Tue, Nov 4, 2014 at 4:37 PM, Fabian Hueske <fh...@apache.org> wrote:
>>
>>> Local split assignment preferably assigns input split to workers that
>>> can locally read the data of an input split.
>>> For example, HDFS stores file chunks (blocks) distributed over the
>>> cluster and gives access to these chunks to every worker via network
>>> transfer. However, if a chunk is read from a process that runs on the same
>>> node as the chunk is stored, the read operation directly accesses the local
>>> file system without going over the network. Hence, it is essential to
>>> assign input splits based on the locality of their data if you want to have
>>> reasonably performance. We call this local split assignment. This is a
>>> general concept of all data parallel systems including Hadoop, Spark, and
>>> Flink.
>>>
>>> This issue is not related to serializability of input formats.
>>> I assume that the wrapped MongoIF is also not capable of local split
>>> assignment.
>>>
>>> Am Dienstag, 4. November 2014 schrieb Flavio Pompermaier :
>>>
>>>> What do you mean for  "might lack support for local split assignment"? You
>>>> mean that InputFormat is not serializable? This instead is not true for
>>>> Mongodb?
>>>>
>>>> On Tue, Nov 4, 2014 at 10:00 AM, Fabian Hueske <fh...@apache.org>
>>>> wrote:
>>>>
>>>>> There's a page about Hadoop Compatibility that shows how to use the
>>>>> wrapper.
>>>>>
>>>>> The HBase format should work as well, but might lack support for local
>>>>> split assignment. In that case performance would suffer a lot.
>>>>>
>>>>> Am Dienstag, 4. November 2014 schrieb Flavio Pompermaier :
>>>>>
>>>>>> Should I start from
>>>>>> http://flink.incubator.apache.org/docs/0.7-incubating/example_connectors.html
>>>>>> ? Is it ok?
>>>>>> Thus, in principle, also the TableInputFormat of HBase could be used
>>>>>> in a similar way..isn't it?
>>>>>>
>>>>>> On Tue, Nov 4, 2014 at 9:42 AM, Fabian Hueske <fh...@apache.org>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> the blog post uses Flinks wrapper for Hadoop InputFormats.
>>>>>>> This has been ported to the new API and is described in the
>>>>>>> documentation.
>>>>>>>
>>>>>>> So you just need to take Mongos Hadoop IF and plug it into the new
>>>>>>> IF wrapper. :-)
>>>>>>>
>>>>>>> Fabian
>>>>>>>
>>>>>>> Am Dienstag, 4. November 2014 schrieb Flavio Pompermaier :
>>>>>>>
>>>>>>> Hi to all,
>>>>>>>>
>>>>>>>> I saw this post
>>>>>>>> https://flink.incubator.apache.org/news/2014/01/28/querying_mongodb.html
>>>>>>>> but it use the old APIs (HadoopDataSource instead of DataSource).
>>>>>>>> How can I use Mongodb with the new Flink APIs?
>>>>>>>>
>>>>>>>> Best,
>>>>>>>> Flavio
>>>>>>>>
>>>>>>>
>>>>>>  .

Re: Flink Mongodb

Posted by Fabian Hueske <fh...@apache.org>.
ad 1) HBase manages the regions and should also take care of their
uniform size.
as 2) Dynamically changing InputSplits is not possible at the moment.
However, the input split generation of the IF should also be able to handle
such issues upfront. In fact, the IF could also generate multiple splits
per region (this would be necessary to make sure that the minimum number of
splits is generated if there are less regions than required splits).

2014-11-04 17:04 GMT+01:00 Flavio Pompermaier <po...@okkam.it>:

> Ok, thanks for the explanation!
> That was more or less like I thought it should be but there are still
> points I'd like to clarify:
>
> 1 - What if a region is very big and there are other regions very small..?
> There will be one slot that takes a very long time while the others will
> stay inactive..
> 2 - Do you think it is possible to implement this in an adaptive way (stop
> processing of huge region if it worth it and assign remaining data to
> inactive task managers)?
>
>
> On Tue, Nov 4, 2014 at 4:37 PM, Fabian Hueske <fh...@apache.org> wrote:
>
>> Local split assignment preferably assigns input split to workers that can
>> locally read the data of an input split.
>> For example, HDFS stores file chunks (blocks) distributed over the
>> cluster and gives access to these chunks to every worker via network
>> transfer. However, if a chunk is read from a process that runs on the same
>> node as the chunk is stored, the read operation directly accesses the local
>> file system without going over the network. Hence, it is essential to
>> assign input splits based on the locality of their data if you want to have
>> reasonably performance. We call this local split assignment. This is a
>> general concept of all data parallel systems including Hadoop, Spark, and
>> Flink.
>>
>> This issue is not related to serializability of input formats.
>> I assume that the wrapped MongoIF is also not capable of local split
>> assignment.
>>
>> Am Dienstag, 4. November 2014 schrieb Flavio Pompermaier :
>>
>>> What do you mean for  "might lack support for local split assignment"? You
>>> mean that InputFormat is not serializable? This instead is not true for
>>> Mongodb?
>>>
>>> On Tue, Nov 4, 2014 at 10:00 AM, Fabian Hueske <fh...@apache.org>
>>> wrote:
>>>
>>>> There's a page about Hadoop Compatibility that shows how to use the
>>>> wrapper.
>>>>
>>>> The HBase format should work as well, but might lack support for local
>>>> split assignment. In that case performance would suffer a lot.
>>>>
>>>> Am Dienstag, 4. November 2014 schrieb Flavio Pompermaier :
>>>>
>>>>> Should I start from
>>>>> http://flink.incubator.apache.org/docs/0.7-incubating/example_connectors.html
>>>>> ? Is it ok?
>>>>> Thus, in principle, also the TableInputFormat of HBase could be used
>>>>> in a similar way..isn't it?
>>>>>
>>>>> On Tue, Nov 4, 2014 at 9:42 AM, Fabian Hueske <fh...@apache.org>
>>>>> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> the blog post uses Flinks wrapper for Hadoop InputFormats.
>>>>>> This has been ported to the new API and is described in the
>>>>>> documentation.
>>>>>>
>>>>>> So you just need to take Mongos Hadoop IF and plug it into the new IF
>>>>>> wrapper. :-)
>>>>>>
>>>>>> Fabian
>>>>>>
>>>>>> Am Dienstag, 4. November 2014 schrieb Flavio Pompermaier :
>>>>>>
>>>>>> Hi to all,
>>>>>>>
>>>>>>> I saw this post
>>>>>>> https://flink.incubator.apache.org/news/2014/01/28/querying_mongodb.html
>>>>>>> but it use the old APIs (HadoopDataSource instead of DataSource).
>>>>>>> How can I use Mongodb with the new Flink APIs?
>>>>>>>
>>>>>>> Best,
>>>>>>> Flavio
>>>>>>>
>>>>>>
>>>>>

Re: Flink Mongodb

Posted by Flavio Pompermaier <po...@okkam.it>.
Ok, thanks for the explanation!
That was more or less like I thought it should be but there are still
points I'd like to clarify:

1 - What if a region is very big and there are other regions very small..?
There will be one slot that takes a very long time while the others will
stay inactive..
2 - Do you think it is possible to implement this in an adaptive way (stop
processing of huge region if it worth it and assign remaining data to
inactive task managers)?

On Tue, Nov 4, 2014 at 4:37 PM, Fabian Hueske <fh...@apache.org> wrote:

> Local split assignment preferably assigns input split to workers that can
> locally read the data of an input split.
> For example, HDFS stores file chunks (blocks) distributed over the cluster
> and gives access to these chunks to every worker via network transfer.
> However, if a chunk is read from a process that runs on the same node as
> the chunk is stored, the read operation directly accesses the local file
> system without going over the network. Hence, it is essential to assign
> input splits based on the locality of their data if you want to have
> reasonably performance. We call this local split assignment. This is a
> general concept of all data parallel systems including Hadoop, Spark, and
> Flink.
>
> This issue is not related to serializability of input formats.
> I assume that the wrapped MongoIF is also not capable of local split
> assignment.
>
> Am Dienstag, 4. November 2014 schrieb Flavio Pompermaier :
>
>> What do you mean for  "might lack support for local split assignment"? You
>> mean that InputFormat is not serializable? This instead is not true for
>> Mongodb?
>>
>> On Tue, Nov 4, 2014 at 10:00 AM, Fabian Hueske <fh...@apache.org>
>> wrote:
>>
>>> There's a page about Hadoop Compatibility that shows how to use the
>>> wrapper.
>>>
>>> The HBase format should work as well, but might lack support for local
>>> split assignment. In that case performance would suffer a lot.
>>>
>>> Am Dienstag, 4. November 2014 schrieb Flavio Pompermaier :
>>>
>>>> Should I start from
>>>> http://flink.incubator.apache.org/docs/0.7-incubating/example_connectors.html
>>>> ? Is it ok?
>>>> Thus, in principle, also the TableInputFormat of HBase could be used in
>>>> a similar way..isn't it?
>>>>
>>>> On Tue, Nov 4, 2014 at 9:42 AM, Fabian Hueske <fh...@apache.org>
>>>> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> the blog post uses Flinks wrapper for Hadoop InputFormats.
>>>>> This has been ported to the new API and is described in the
>>>>> documentation.
>>>>>
>>>>> So you just need to take Mongos Hadoop IF and plug it into the new IF
>>>>> wrapper. :-)
>>>>>
>>>>> Fabian
>>>>>
>>>>> Am Dienstag, 4. November 2014 schrieb Flavio Pompermaier :
>>>>>
>>>>> Hi to all,
>>>>>>
>>>>>> I saw this post
>>>>>> https://flink.incubator.apache.org/news/2014/01/28/querying_mongodb.html
>>>>>> but it use the old APIs (HadoopDataSource instead of DataSource).
>>>>>> How can I use Mongodb with the new Flink APIs?
>>>>>>
>>>>>> Best,
>>>>>> Flavio
>>>>>>
>>>>>
>>>>

Re: Flink Mongodb

Posted by Fabian Hueske <fh...@apache.org>.
Local split assignment preferably assigns input split to workers that can
locally read the data of an input split.
For example, HDFS stores file chunks (blocks) distributed over the cluster
and gives access to these chunks to every worker via network transfer.
However, if a chunk is read from a process that runs on the same node as
the chunk is stored, the read operation directly accesses the local file
system without going over the network. Hence, it is essential to assign
input splits based on the locality of their data if you want to have
reasonably performance. We call this local split assignment. This is a
general concept of all data parallel systems including Hadoop, Spark, and
Flink.

This issue is not related to serializability of input formats.
I assume that the wrapped MongoIF is also not capable of local split
assignment.

Am Dienstag, 4. November 2014 schrieb Flavio Pompermaier :

> What do you mean for  "might lack support for local split assignment"? You
> mean that InputFormat is not serializable? This instead is not true for
> Mongodb?
>
> On Tue, Nov 4, 2014 at 10:00 AM, Fabian Hueske <fh...@apache.org> wrote:
>
>> There's a page about Hadoop Compatibility that shows how to use the
>> wrapper.
>>
>> The HBase format should work as well, but might lack support for local
>> split assignment. In that case performance would suffer a lot.
>>
>> Am Dienstag, 4. November 2014 schrieb Flavio Pompermaier :
>>
>>> Should I start from
>>> http://flink.incubator.apache.org/docs/0.7-incubating/example_connectors.html
>>> ? Is it ok?
>>> Thus, in principle, also the TableInputFormat of HBase could be used in
>>> a similar way..isn't it?
>>>
>>> On Tue, Nov 4, 2014 at 9:42 AM, Fabian Hueske <fh...@apache.org>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> the blog post uses Flinks wrapper for Hadoop InputFormats.
>>>> This has been ported to the new API and is described in the
>>>> documentation.
>>>>
>>>> So you just need to take Mongos Hadoop IF and plug it into the new IF
>>>> wrapper. :-)
>>>>
>>>> Fabian
>>>>
>>>> Am Dienstag, 4. November 2014 schrieb Flavio Pompermaier :
>>>>
>>>> Hi to all,
>>>>>
>>>>> I saw this post
>>>>> https://flink.incubator.apache.org/news/2014/01/28/querying_mongodb.html
>>>>> but it use the old APIs (HadoopDataSource instead of DataSource).
>>>>> How can I use Mongodb with the new Flink APIs?
>>>>>
>>>>> Best,
>>>>> Flavio
>>>>>
>>>>
>>>

Re: Flink Mongodb

Posted by Fabian Hueske <fh...@apache.org>.
That would be great! :-)

2014-11-04 15:26 GMT+01:00 Flavio Pompermaier <po...@okkam.it>:

> Ok! I hope to write some blog within tomorrow evening!
>
> On Tue, Nov 4, 2014 at 3:16 PM, Stephan Ewen <se...@apache.org> wrote:
>
>> Absolutely, please share the example!
>>
>> On Tue, Nov 4, 2014 at 3:02 PM, Flavio Pompermaier <po...@okkam.it>
>> wrote:
>>
>>> Sorry I was looking at the wrong MongoInputFormat..the correct one is
>>> this:
>>>
>>>
>>> https://github.com/mongodb/mongo-hadoop/blob/master/core/src/main/java/com/mongodb/hadoop/mapred/MongoInputFormat.java
>>>
>>> So now I have my working example. Could you be interested in sharing it?
>>> It works both with Avro and Kryo as default serializer (see
>>> GenericTypeInfo.createSerializer()).
>>>
>>> On Tue, Nov 4, 2014 at 10:30 AM, Flavio Pompermaier <
>>> pompermaier@okkam.it> wrote:
>>>
>>>> I don't know if that possible anymore..
>>>>
>>>> AzureTableInputFormat extends InputFormat<Text, WritableEntity>
>>>> while MongoInputFormat extends InputFormat<Object, BSONObject>
>>>>
>>>> and thus I cannot do the following..
>>>>
>>>> HadoopInputFormat<Object, BSONObject> hdIf = new
>>>> HadoopInputFormat<Object, BSONObject>(
>>>>      new MongoInputFormat(), Object.class, BSONObject.class, new
>>>> Job());
>>>>
>>>> Am I'm doing something wrong or is this a problem of Flink ?
>>>>
>>>>
>>>> On Tue, Nov 4, 2014 at 10:03 AM, Flavio Pompermaier <
>>>> pompermaier@okkam.it> wrote:
>>>>
>>>>> What do you mean for  "might lack support for local split
>>>>> assignment"?
>>>>> You mean that InputFormat is not serializable? This instead is not
>>>>> true for Mongodb?
>>>>>
>>>>>
>>>>> On Tue, Nov 4, 2014 at 10:00 AM, Fabian Hueske <fh...@apache.org>
>>>>> wrote:
>>>>>
>>>>>> There's a page about Hadoop Compatibility that shows how to use the
>>>>>> wrapper.
>>>>>>
>>>>>> The HBase format should work as well, but might lack support for
>>>>>> local split assignment. In that case performance would suffer a lot.
>>>>>>
>>>>>> Am Dienstag, 4. November 2014 schrieb Flavio Pompermaier :
>>>>>>
>>>>>>> Should I start from
>>>>>>> http://flink.incubator.apache.org/docs/0.7-incubating/example_connectors.html
>>>>>>> ? Is it ok?
>>>>>>> Thus, in principle, also the TableInputFormat of HBase could be used
>>>>>>> in a similar way..isn't it?
>>>>>>>
>>>>>>> On Tue, Nov 4, 2014 at 9:42 AM, Fabian Hueske <fh...@apache.org>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>> the blog post uses Flinks wrapper for Hadoop InputFormats.
>>>>>>>> This has been ported to the new API and is described in the
>>>>>>>> documentation.
>>>>>>>>
>>>>>>>> So you just need to take Mongos Hadoop IF and plug it into the new
>>>>>>>> IF wrapper. :-)
>>>>>>>>
>>>>>>>> Fabian
>>>>>>>>
>>>>>>>> Am Dienstag, 4. November 2014 schrieb Flavio Pompermaier :
>>>>>>>>
>>>>>>>> Hi to all,
>>>>>>>>>
>>>>>>>>> I saw this post
>>>>>>>>> https://flink.incubator.apache.org/news/2014/01/28/querying_mongodb.html
>>>>>>>>> but it use the old APIs (HadoopDataSource instead of DataSource).
>>>>>>>>> How can I use Mongodb with the new Flink APIs?
>>>>>>>>>
>>>>>>>>> Best,
>>>>>>>>> Flavio
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>
>

Re: Flink Mongodb

Posted by Flavio Pompermaier <po...@okkam.it>.
Ok! I hope to write some blog within tomorrow evening!

On Tue, Nov 4, 2014 at 3:16 PM, Stephan Ewen <se...@apache.org> wrote:

> Absolutely, please share the example!
>
> On Tue, Nov 4, 2014 at 3:02 PM, Flavio Pompermaier <po...@okkam.it>
> wrote:
>
>> Sorry I was looking at the wrong MongoInputFormat..the correct one is
>> this:
>>
>>
>> https://github.com/mongodb/mongo-hadoop/blob/master/core/src/main/java/com/mongodb/hadoop/mapred/MongoInputFormat.java
>>
>> So now I have my working example. Could you be interested in sharing it?
>> It works both with Avro and Kryo as default serializer (see
>> GenericTypeInfo.createSerializer()).
>>
>> On Tue, Nov 4, 2014 at 10:30 AM, Flavio Pompermaier <pompermaier@okkam.it
>> > wrote:
>>
>>> I don't know if that possible anymore..
>>>
>>> AzureTableInputFormat extends InputFormat<Text, WritableEntity>
>>> while MongoInputFormat extends InputFormat<Object, BSONObject>
>>>
>>> and thus I cannot do the following..
>>>
>>> HadoopInputFormat<Object, BSONObject> hdIf = new
>>> HadoopInputFormat<Object, BSONObject>(
>>>      new MongoInputFormat(), Object.class, BSONObject.class, new Job());
>>>
>>> Am I'm doing something wrong or is this a problem of Flink ?
>>>
>>>
>>> On Tue, Nov 4, 2014 at 10:03 AM, Flavio Pompermaier <
>>> pompermaier@okkam.it> wrote:
>>>
>>>> What do you mean for  "might lack support for local split assignment"?
>>>> You mean that InputFormat is not serializable? This instead is not true
>>>> for Mongodb?
>>>>
>>>>
>>>> On Tue, Nov 4, 2014 at 10:00 AM, Fabian Hueske <fh...@apache.org>
>>>> wrote:
>>>>
>>>>> There's a page about Hadoop Compatibility that shows how to use the
>>>>> wrapper.
>>>>>
>>>>> The HBase format should work as well, but might lack support for local
>>>>> split assignment. In that case performance would suffer a lot.
>>>>>
>>>>> Am Dienstag, 4. November 2014 schrieb Flavio Pompermaier :
>>>>>
>>>>>> Should I start from
>>>>>> http://flink.incubator.apache.org/docs/0.7-incubating/example_connectors.html
>>>>>> ? Is it ok?
>>>>>> Thus, in principle, also the TableInputFormat of HBase could be used
>>>>>> in a similar way..isn't it?
>>>>>>
>>>>>> On Tue, Nov 4, 2014 at 9:42 AM, Fabian Hueske <fh...@apache.org>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> the blog post uses Flinks wrapper for Hadoop InputFormats.
>>>>>>> This has been ported to the new API and is described in the
>>>>>>> documentation.
>>>>>>>
>>>>>>> So you just need to take Mongos Hadoop IF and plug it into the new
>>>>>>> IF wrapper. :-)
>>>>>>>
>>>>>>> Fabian
>>>>>>>
>>>>>>> Am Dienstag, 4. November 2014 schrieb Flavio Pompermaier :
>>>>>>>
>>>>>>> Hi to all,
>>>>>>>>
>>>>>>>> I saw this post
>>>>>>>> https://flink.incubator.apache.org/news/2014/01/28/querying_mongodb.html
>>>>>>>> but it use the old APIs (HadoopDataSource instead of DataSource).
>>>>>>>> How can I use Mongodb with the new Flink APIs?
>>>>>>>>
>>>>>>>> Best,
>>>>>>>> Flavio
>>>>>>>>
>>>>>>>
>>>>>>
>

Re: Flink Mongodb

Posted by Stephan Ewen <se...@apache.org>.
Absolutely, please share the example!

On Tue, Nov 4, 2014 at 3:02 PM, Flavio Pompermaier <po...@okkam.it>
wrote:

> Sorry I was looking at the wrong MongoInputFormat..the correct one is this:
>
>
> https://github.com/mongodb/mongo-hadoop/blob/master/core/src/main/java/com/mongodb/hadoop/mapred/MongoInputFormat.java
>
> So now I have my working example. Could you be interested in sharing it?
> It works both with Avro and Kryo as default serializer (see
> GenericTypeInfo.createSerializer()).
>
> On Tue, Nov 4, 2014 at 10:30 AM, Flavio Pompermaier <po...@okkam.it>
> wrote:
>
>> I don't know if that possible anymore..
>>
>> AzureTableInputFormat extends InputFormat<Text, WritableEntity>
>> while MongoInputFormat extends InputFormat<Object, BSONObject>
>>
>> and thus I cannot do the following..
>>
>> HadoopInputFormat<Object, BSONObject> hdIf = new
>> HadoopInputFormat<Object, BSONObject>(
>>      new MongoInputFormat(), Object.class, BSONObject.class, new Job());
>>
>> Am I'm doing something wrong or is this a problem of Flink ?
>>
>>
>> On Tue, Nov 4, 2014 at 10:03 AM, Flavio Pompermaier <pompermaier@okkam.it
>> > wrote:
>>
>>> What do you mean for  "might lack support for local split assignment"?
>>> You mean that InputFormat is not serializable? This instead is not true
>>> for Mongodb?
>>>
>>>
>>> On Tue, Nov 4, 2014 at 10:00 AM, Fabian Hueske <fh...@apache.org>
>>> wrote:
>>>
>>>> There's a page about Hadoop Compatibility that shows how to use the
>>>> wrapper.
>>>>
>>>> The HBase format should work as well, but might lack support for local
>>>> split assignment. In that case performance would suffer a lot.
>>>>
>>>> Am Dienstag, 4. November 2014 schrieb Flavio Pompermaier :
>>>>
>>>>> Should I start from
>>>>> http://flink.incubator.apache.org/docs/0.7-incubating/example_connectors.html
>>>>> ? Is it ok?
>>>>> Thus, in principle, also the TableInputFormat of HBase could be used
>>>>> in a similar way..isn't it?
>>>>>
>>>>> On Tue, Nov 4, 2014 at 9:42 AM, Fabian Hueske <fh...@apache.org>
>>>>> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> the blog post uses Flinks wrapper for Hadoop InputFormats.
>>>>>> This has been ported to the new API and is described in the
>>>>>> documentation.
>>>>>>
>>>>>> So you just need to take Mongos Hadoop IF and plug it into the new IF
>>>>>> wrapper. :-)
>>>>>>
>>>>>> Fabian
>>>>>>
>>>>>> Am Dienstag, 4. November 2014 schrieb Flavio Pompermaier :
>>>>>>
>>>>>> Hi to all,
>>>>>>>
>>>>>>> I saw this post
>>>>>>> https://flink.incubator.apache.org/news/2014/01/28/querying_mongodb.html
>>>>>>> but it use the old APIs (HadoopDataSource instead of DataSource).
>>>>>>> How can I use Mongodb with the new Flink APIs?
>>>>>>>
>>>>>>> Best,
>>>>>>> Flavio
>>>>>>>
>>>>>>
>>>>>

Re: Flink Mongodb

Posted by Flavio Pompermaier <po...@okkam.it>.
Sorry I was looking at the wrong MongoInputFormat..the correct one is this:

https://github.com/mongodb/mongo-hadoop/blob/master/core/src/main/java/com/mongodb/hadoop/mapred/MongoInputFormat.java

So now I have my working example. Could you be interested in sharing it?
It works both with Avro and Kryo as default serializer (see
GenericTypeInfo.createSerializer()).

On Tue, Nov 4, 2014 at 10:30 AM, Flavio Pompermaier <po...@okkam.it>
wrote:

> I don't know if that possible anymore..
>
> AzureTableInputFormat extends InputFormat<Text, WritableEntity>
> while MongoInputFormat extends InputFormat<Object, BSONObject>
>
> and thus I cannot do the following..
>
> HadoopInputFormat<Object, BSONObject> hdIf = new HadoopInputFormat<Object,
> BSONObject>(
>      new MongoInputFormat(), Object.class, BSONObject.class, new Job());
>
> Am I'm doing something wrong or is this a problem of Flink ?
>
>
> On Tue, Nov 4, 2014 at 10:03 AM, Flavio Pompermaier <po...@okkam.it>
> wrote:
>
>> What do you mean for  "might lack support for local split assignment"?
>> You mean that InputFormat is not serializable? This instead is not true
>> for Mongodb?
>>
>>
>> On Tue, Nov 4, 2014 at 10:00 AM, Fabian Hueske <fh...@apache.org>
>> wrote:
>>
>>> There's a page about Hadoop Compatibility that shows how to use the
>>> wrapper.
>>>
>>> The HBase format should work as well, but might lack support for local
>>> split assignment. In that case performance would suffer a lot.
>>>
>>> Am Dienstag, 4. November 2014 schrieb Flavio Pompermaier :
>>>
>>>> Should I start from
>>>> http://flink.incubator.apache.org/docs/0.7-incubating/example_connectors.html
>>>> ? Is it ok?
>>>> Thus, in principle, also the TableInputFormat of HBase could be used in
>>>> a similar way..isn't it?
>>>>
>>>> On Tue, Nov 4, 2014 at 9:42 AM, Fabian Hueske <fh...@apache.org>
>>>> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> the blog post uses Flinks wrapper for Hadoop InputFormats.
>>>>> This has been ported to the new API and is described in the
>>>>> documentation.
>>>>>
>>>>> So you just need to take Mongos Hadoop IF and plug it into the new IF
>>>>> wrapper. :-)
>>>>>
>>>>> Fabian
>>>>>
>>>>> Am Dienstag, 4. November 2014 schrieb Flavio Pompermaier :
>>>>>
>>>>> Hi to all,
>>>>>>
>>>>>> I saw this post
>>>>>> https://flink.incubator.apache.org/news/2014/01/28/querying_mongodb.html
>>>>>> but it use the old APIs (HadoopDataSource instead of DataSource).
>>>>>> How can I use Mongodb with the new Flink APIs?
>>>>>>
>>>>>> Best,
>>>>>> Flavio
>>>>>>
>>>>>
>>>>

Re: Flink Mongodb

Posted by Flavio Pompermaier <po...@okkam.it>.
I don't know if that possible anymore..

AzureTableInputFormat extends InputFormat<Text, WritableEntity>
while MongoInputFormat extends InputFormat<Object, BSONObject>

and thus I cannot do the following..

HadoopInputFormat<Object, BSONObject> hdIf = new HadoopInputFormat<Object,
BSONObject>(
     new MongoInputFormat(), Object.class, BSONObject.class, new Job());

Am I'm doing something wrong or is this a problem of Flink ?


On Tue, Nov 4, 2014 at 10:03 AM, Flavio Pompermaier <po...@okkam.it>
wrote:

> What do you mean for  "might lack support for local split assignment"?
> You mean that InputFormat is not serializable? This instead is not true
> for Mongodb?
>
>
> On Tue, Nov 4, 2014 at 10:00 AM, Fabian Hueske <fh...@apache.org> wrote:
>
>> There's a page about Hadoop Compatibility that shows how to use the
>> wrapper.
>>
>> The HBase format should work as well, but might lack support for local
>> split assignment. In that case performance would suffer a lot.
>>
>> Am Dienstag, 4. November 2014 schrieb Flavio Pompermaier :
>>
>>> Should I start from
>>> http://flink.incubator.apache.org/docs/0.7-incubating/example_connectors.html
>>> ? Is it ok?
>>> Thus, in principle, also the TableInputFormat of HBase could be used in
>>> a similar way..isn't it?
>>>
>>> On Tue, Nov 4, 2014 at 9:42 AM, Fabian Hueske <fh...@apache.org>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> the blog post uses Flinks wrapper for Hadoop InputFormats.
>>>> This has been ported to the new API and is described in the
>>>> documentation.
>>>>
>>>> So you just need to take Mongos Hadoop IF and plug it into the new IF
>>>> wrapper. :-)
>>>>
>>>> Fabian
>>>>
>>>> Am Dienstag, 4. November 2014 schrieb Flavio Pompermaier :
>>>>
>>>> Hi to all,
>>>>>
>>>>> I saw this post
>>>>> https://flink.incubator.apache.org/news/2014/01/28/querying_mongodb.html
>>>>> but it use the old APIs (HadoopDataSource instead of DataSource).
>>>>> How can I use Mongodb with the new Flink APIs?
>>>>>
>>>>> Best,
>>>>> Flavio
>>>>>
>>>>
>>>

Re: Flink Mongodb

Posted by Flavio Pompermaier <po...@okkam.it>.
What do you mean for  "might lack support for local split assignment"?
You mean that InputFormat is not serializable? This instead is not true for
Mongodb?

On Tue, Nov 4, 2014 at 10:00 AM, Fabian Hueske <fh...@apache.org> wrote:

> There's a page about Hadoop Compatibility that shows how to use the
> wrapper.
>
> The HBase format should work as well, but might lack support for local
> split assignment. In that case performance would suffer a lot.
>
> Am Dienstag, 4. November 2014 schrieb Flavio Pompermaier :
>
>> Should I start from
>> http://flink.incubator.apache.org/docs/0.7-incubating/example_connectors.html
>> ? Is it ok?
>> Thus, in principle, also the TableInputFormat of HBase could be used in a
>> similar way..isn't it?
>>
>> On Tue, Nov 4, 2014 at 9:42 AM, Fabian Hueske <fh...@apache.org> wrote:
>>
>>> Hi,
>>>
>>> the blog post uses Flinks wrapper for Hadoop InputFormats.
>>> This has been ported to the new API and is described in the
>>> documentation.
>>>
>>> So you just need to take Mongos Hadoop IF and plug it into the new IF
>>> wrapper. :-)
>>>
>>> Fabian
>>>
>>> Am Dienstag, 4. November 2014 schrieb Flavio Pompermaier :
>>>
>>> Hi to all,
>>>>
>>>> I saw this post
>>>> https://flink.incubator.apache.org/news/2014/01/28/querying_mongodb.html
>>>> but it use the old APIs (HadoopDataSource instead of DataSource).
>>>> How can I use Mongodb with the new Flink APIs?
>>>>
>>>> Best,
>>>> Flavio
>>>>
>>>
>>

Re: Flink Mongodb

Posted by Fabian Hueske <fh...@apache.org>.
There's a page about Hadoop Compatibility that shows how to use the
wrapper.

The HBase format should work as well, but might lack support for local
split assignment. In that case performance would suffer a lot.

Am Dienstag, 4. November 2014 schrieb Flavio Pompermaier :

> Should I start from
> http://flink.incubator.apache.org/docs/0.7-incubating/example_connectors.html
> ? Is it ok?
> Thus, in principle, also the TableInputFormat of HBase could be used in a
> similar way..isn't it?
>
> On Tue, Nov 4, 2014 at 9:42 AM, Fabian Hueske <fhueske@apache.org
> <javascript:_e(%7B%7D,'cvml','fhueske@apache.org');>> wrote:
>
>> Hi,
>>
>> the blog post uses Flinks wrapper for Hadoop InputFormats.
>> This has been ported to the new API and is described in the documentation.
>>
>> So you just need to take Mongos Hadoop IF and plug it into the new IF
>> wrapper. :-)
>>
>> Fabian
>>
>> Am Dienstag, 4. November 2014 schrieb Flavio Pompermaier :
>>
>> Hi to all,
>>>
>>> I saw this post
>>> https://flink.incubator.apache.org/news/2014/01/28/querying_mongodb.html
>>> but it use the old APIs (HadoopDataSource instead of DataSource).
>>> How can I use Mongodb with the new Flink APIs?
>>>
>>> Best,
>>> Flavio
>>>
>>
>

Re: Flink Mongodb

Posted by Flavio Pompermaier <po...@okkam.it>.
Should I start from
http://flink.incubator.apache.org/docs/0.7-incubating/example_connectors.html
? Is it ok?
Thus, in principle, also the TableInputFormat of HBase could be used in a
similar way..isn't it?

On Tue, Nov 4, 2014 at 9:42 AM, Fabian Hueske <fh...@apache.org> wrote:

> Hi,
>
> the blog post uses Flinks wrapper for Hadoop InputFormats.
> This has been ported to the new API and is described in the documentation.
>
> So you just need to take Mongos Hadoop IF and plug it into the new IF
> wrapper. :-)
>
> Fabian
>
> Am Dienstag, 4. November 2014 schrieb Flavio Pompermaier :
>
> Hi to all,
>>
>> I saw this post
>> https://flink.incubator.apache.org/news/2014/01/28/querying_mongodb.html
>> but it use the old APIs (HadoopDataSource instead of DataSource).
>> How can I use Mongodb with the new Flink APIs?
>>
>> Best,
>> Flavio
>>
>

Re: Flink Mongodb

Posted by Fabian Hueske <fh...@apache.org>.
Hi,

the blog post uses Flinks wrapper for Hadoop InputFormats.
This has been ported to the new API and is described in the documentation.

So you just need to take Mongos Hadoop IF and plug it into the new IF
wrapper. :-)

Fabian

Am Dienstag, 4. November 2014 schrieb Flavio Pompermaier :

> Hi to all,
>
> I saw this post
> https://flink.incubator.apache.org/news/2014/01/28/querying_mongodb.html
> but it use the old APIs (HadoopDataSource instead of DataSource).
> How can I use Mongodb with the new Flink APIs?
>
> Best,
> Flavio
>