You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Frank Dekervel <ke...@gmail.com> on 2016/09/08 16:30:50 UTC

scala version of flink mongodb example

Hello,

i'm new to flink, and i'm trying to get a mongodb hadoop input format
working in scala. However, i get lost in the scala generics system ...
could somebody help me ?

Code is below, neither version works (compile error at the "map" call),
either because of method not applicable either because of ambiguous
reference to overloaded method map (flink 1.0.3)

Thanks already
greetings,
Frank


import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.scala.DataSet;
import org.apache.flink.api.scala.ExecutionEnvironment;
import org.apache.flink.api.scala.hadoop.mapred.HadoopInputFormat;

import org.apache.hadoop.mapred.JobConf;
import org.bson.BSONObject;

import com.mongodb.BasicDBObject;
import com.mongodb.hadoop.io.BSONWritable;
import com.mongodb.hadoop.mapred.MongoInputFormat;

val hdIf = new HadoopInputFormat(new
MongoInputFormat(),classOf[BSONWritable], classOf[BSONWritable], new
JobConf())

hdIf.getJobConf().set("mongo.input.uri",
    "mongodb://localhost:27017/handling.event");

val input = env.createInput(hdIf);

def mapfunc(t1: BSONWritable, t2: BSONWritable): String  = {
    return t1.toString()
}

// does not work
//input.map mapfunc

// does not work either
input.map( (t1: BSONWritable, t2: BSONWritable) => t1 )
// does not work either
//input.map ( (t1, t2) => t1 )

Re: scala version of flink mongodb example

Posted by "alex.decastro" <al...@lab49.com>.
Hi there, treading in the thread, 

do you know how to add authentication options to mongo here? 

I'm trying to do 

hdIf.getJobConf.set("user", s"$USER")
hdIf.getJobConf.set("password", s"$PWD")

but I can't find any documentation to support it. 

Any pointers? 

Many thanks,
Alex





--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/scala-version-of-flink-mongodb-example-tp8971p11489.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Re: scala version of flink mongodb example

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Frank,

I didn't tried to run the code, but this does not show a compiler error in
IntelliJ:

> input.map( mapfunc2 _ )

Decomposing the Tuple2 into two separate arguments does only work with
Scala's pattern matching technique (this is the second approach I posted).
The Java API is not capable of splitting the fields of a Tuple2 argument
into separate arguments.

Best, Fabian


2016-09-08 20:54 GMT+02:00 Frank Dekervel <ke...@gmail.com>:

> Hello Fabian,
>
> Thanks, your solution works indeed. however, i don't understand why.
> When i replace the lambda by an explicit function
>
> def mapfunc2(pair: Tuple2[BSONWritable, BSONWritable]) : String = {
>     return pair._1.toString
> }
> input.map mapfunc2
>
>
> i get the error below, which seemingly indicates that my method call maps
> both to the scala version (first overloaded method) and the java version
> (which works with a MapFunction, second one in the error message)
>
> this was also the error i got when doing the following (which looks the
> most logical to me)
>
> def mapfunc(t1: BSONWritable, t2: BSONWritable): String  = {
>     return t1.toString()
> }
> input.map mapfunc
>
> it would seem logical to me to decompose the pair as 2 separate arguments
> (which is what the java version of the example also does at
> https://github.com/okkam-it/flink-mongodb-test)
>
> and this is the error message:
>
> both method map in class DataSet of type [R](fun: ((com.mongodb.hadoop.io.BSONWritable,
> com.mongodb.hadoop.io.BSONWritable)) => R)(implicit evidence$4:
> org.apache.flink.api.common.typeinfo.TypeInformation[R], implicit
> evidence$5: scala.reflect.ClassTag[R])org.apache.flink.api.scala.
> DataSet[R]
> and method map in class DataSet of type [R](mapper:
> org.apache.flink.api.common.functions.MapFunction[(com.mongodb.hadoop.io.BSONWritable,
> com.mongodb.hadoop.io.BSONWritable),R])(implicit evidence$2:
> org.apache.flink.api.common.typeinfo.TypeInformation[R], implicit
> evidence$3: scala.reflect.ClassTag[R])org.apache.flink.api.scala.
> DataSet[R]
> match expected type ?
>
> Thanks!
> Frank
>
>
> On Thu, Sep 8, 2016 at 6:56 PM, Fabian Hueske <fh...@gmail.com> wrote:
>
>> Hi Frank,
>>
>> input should be of DataSet[(BSONWritable, BSONWritable)], so a
>> Tuple2[BSONWritable, BSONWritable], right?
>>
>> Something like this should work:
>>
>> input.map( pair => pair._1.toString )
>>
>> Pair is a Tuple2[BSONWritable, BSONWritable], and pair._1 accesses the
>> key of the pair.
>>
>> Alternatively you can also add an import org.apache.flink.api.scala.ext
>> ensions._
>>
>> and then you can do
>>
>> input.mapWith { case (x, y) => x }
>>
>> Best, Fabian
>>
>>
>> 2016-09-08 18:30 GMT+02:00 Frank Dekervel <ke...@gmail.com>:
>>
>>> Hello,
>>>
>>> i'm new to flink, and i'm trying to get a mongodb hadoop input format
>>> working in scala. However, i get lost in the scala generics system ...
>>> could somebody help me ?
>>>
>>> Code is below, neither version works (compile error at the "map" call),
>>> either because of method not applicable either because of ambiguous
>>> reference to overloaded method map (flink 1.0.3)
>>>
>>> Thanks already
>>> greetings,
>>> Frank
>>>
>>>
>>> import org.apache.flink.api.common.functions.MapFunction;
>>> import org.apache.flink.api.scala.DataSet;
>>> import org.apache.flink.api.scala.ExecutionEnvironment;
>>> import org.apache.flink.api.scala.hadoop.mapred.HadoopInputFormat;
>>>
>>> import org.apache.hadoop.mapred.JobConf;
>>> import org.bson.BSONObject;
>>>
>>> import com.mongodb.BasicDBObject;
>>> import com.mongodb.hadoop.io.BSONWritable;
>>> import com.mongodb.hadoop.mapred.MongoInputFormat;
>>>
>>> val hdIf = new HadoopInputFormat(new MongoInputFormat(),classOf[BSONWritable],
>>> classOf[BSONWritable], new JobConf())
>>>
>>> hdIf.getJobConf().set("mongo.input.uri",
>>>     "mongodb://localhost:27017/handling.event");
>>>
>>> val input = env.createInput(hdIf);
>>>
>>> def mapfunc(t1: BSONWritable, t2: BSONWritable): String  = {
>>>     return t1.toString()
>>> }
>>>
>>> // does not work
>>> //input.map mapfunc
>>>
>>> // does not work either
>>> input.map( (t1: BSONWritable, t2: BSONWritable) => t1 )
>>> // does not work either
>>> //input.map ( (t1, t2) => t1 )
>>>
>>>
>>
>

Re: scala version of flink mongodb example

Posted by Frank Dekervel <ke...@gmail.com>.
Hello Fabian,

Thanks, your solution works indeed. however, i don't understand why.
When i replace the lambda by an explicit function

def mapfunc2(pair: Tuple2[BSONWritable, BSONWritable]) : String = {
    return pair._1.toString
}
input.map mapfunc2


i get the error below, which seemingly indicates that my method call maps
both to the scala version (first overloaded method) and the java version
(which works with a MapFunction, second one in the error message)

this was also the error i got when doing the following (which looks the
most logical to me)

def mapfunc(t1: BSONWritable, t2: BSONWritable): String  = {
    return t1.toString()
}
input.map mapfunc

it would seem logical to me to decompose the pair as 2 separate arguments
(which is what the java version of the example also does at
https://github.com/okkam-it/flink-mongodb-test)

and this is the error message:

both method map in class DataSet of type [R](fun:
((com.mongodb.hadoop.io.BSONWritable, com.mongodb.hadoop.io.BSONWritable))
=> R)(implicit evidence$4:
org.apache.flink.api.common.typeinfo.TypeInformation[R], implicit
evidence$5: scala.reflect.ClassTag[R])org.apache.flink.api.scala.DataSet[R]
and method map in class DataSet of type [R](mapper:
org.apache.flink.api.common.functions.MapFunction[(com.mongodb.hadoop.io.BSONWritable,
com.mongodb.hadoop.io.BSONWritable),R])(implicit evidence$2:
org.apache.flink.api.common.typeinfo.TypeInformation[R], implicit
evidence$3: scala.reflect.ClassTag[R])org.apache.flink.api.scala.DataSet[R]
match expected type ?

Thanks!
Frank


On Thu, Sep 8, 2016 at 6:56 PM, Fabian Hueske <fh...@gmail.com> wrote:

> Hi Frank,
>
> input should be of DataSet[(BSONWritable, BSONWritable)], so a
> Tuple2[BSONWritable, BSONWritable], right?
>
> Something like this should work:
>
> input.map( pair => pair._1.toString )
>
> Pair is a Tuple2[BSONWritable, BSONWritable], and pair._1 accesses the key
> of the pair.
>
> Alternatively you can also add an import org.apache.flink.api.scala.
> extensions._
>
> and then you can do
>
> input.mapWith { case (x, y) => x }
>
> Best, Fabian
>
>
> 2016-09-08 18:30 GMT+02:00 Frank Dekervel <ke...@gmail.com>:
>
>> Hello,
>>
>> i'm new to flink, and i'm trying to get a mongodb hadoop input format
>> working in scala. However, i get lost in the scala generics system ...
>> could somebody help me ?
>>
>> Code is below, neither version works (compile error at the "map" call),
>> either because of method not applicable either because of ambiguous
>> reference to overloaded method map (flink 1.0.3)
>>
>> Thanks already
>> greetings,
>> Frank
>>
>>
>> import org.apache.flink.api.common.functions.MapFunction;
>> import org.apache.flink.api.scala.DataSet;
>> import org.apache.flink.api.scala.ExecutionEnvironment;
>> import org.apache.flink.api.scala.hadoop.mapred.HadoopInputFormat;
>>
>> import org.apache.hadoop.mapred.JobConf;
>> import org.bson.BSONObject;
>>
>> import com.mongodb.BasicDBObject;
>> import com.mongodb.hadoop.io.BSONWritable;
>> import com.mongodb.hadoop.mapred.MongoInputFormat;
>>
>> val hdIf = new HadoopInputFormat(new MongoInputFormat(),classOf[BSONWritable],
>> classOf[BSONWritable], new JobConf())
>>
>> hdIf.getJobConf().set("mongo.input.uri",
>>     "mongodb://localhost:27017/handling.event");
>>
>> val input = env.createInput(hdIf);
>>
>> def mapfunc(t1: BSONWritable, t2: BSONWritable): String  = {
>>     return t1.toString()
>> }
>>
>> // does not work
>> //input.map mapfunc
>>
>> // does not work either
>> input.map( (t1: BSONWritable, t2: BSONWritable) => t1 )
>> // does not work either
>> //input.map ( (t1, t2) => t1 )
>>
>>
>

Re: scala version of flink mongodb example

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Frank,

input should be of DataSet[(BSONWritable, BSONWritable)], so a
Tuple2[BSONWritable, BSONWritable], right?

Something like this should work:

input.map( pair => pair._1.toString )

Pair is a Tuple2[BSONWritable, BSONWritable], and pair._1 accesses the key
of the pair.

Alternatively you can also add an import
org.apache.flink.api.scala.extensions._

and then you can do

input.mapWith { case (x, y) => x }

Best, Fabian


2016-09-08 18:30 GMT+02:00 Frank Dekervel <ke...@gmail.com>:

> Hello,
>
> i'm new to flink, and i'm trying to get a mongodb hadoop input format
> working in scala. However, i get lost in the scala generics system ...
> could somebody help me ?
>
> Code is below, neither version works (compile error at the "map" call),
> either because of method not applicable either because of ambiguous
> reference to overloaded method map (flink 1.0.3)
>
> Thanks already
> greetings,
> Frank
>
>
> import org.apache.flink.api.common.functions.MapFunction;
> import org.apache.flink.api.scala.DataSet;
> import org.apache.flink.api.scala.ExecutionEnvironment;
> import org.apache.flink.api.scala.hadoop.mapred.HadoopInputFormat;
>
> import org.apache.hadoop.mapred.JobConf;
> import org.bson.BSONObject;
>
> import com.mongodb.BasicDBObject;
> import com.mongodb.hadoop.io.BSONWritable;
> import com.mongodb.hadoop.mapred.MongoInputFormat;
>
> val hdIf = new HadoopInputFormat(new MongoInputFormat(),classOf[BSONWritable],
> classOf[BSONWritable], new JobConf())
>
> hdIf.getJobConf().set("mongo.input.uri",
>     "mongodb://localhost:27017/handling.event");
>
> val input = env.createInput(hdIf);
>
> def mapfunc(t1: BSONWritable, t2: BSONWritable): String  = {
>     return t1.toString()
> }
>
> // does not work
> //input.map mapfunc
>
> // does not work either
> input.map( (t1: BSONWritable, t2: BSONWritable) => t1 )
> // does not work either
> //input.map ( (t1, t2) => t1 )
>
>