You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Robert Schmidtke <ro...@gmail.com> on 2016/05/04 12:00:52 UTC

Map from Tuple to Case Class

Hi everyone,

first up, I'm new to Scala, so please bear with me, but I could not find
any solution on the web or the Flink documentation. I'm having trouble
converting a DataSet[(LongWritable, Text)] to a DataSet of a custom case
class. I got it to work, however in a way that I feel is too verbose for
Scala:


import org.apache.flink.api.common.functions.MapFunction
import org.apache.flink.api.scala._

import org.apache.hadoop.io.LongWritable
import org.apache.hadoop.io.Text

case class Ranking(pageUrl: String, pageRank: Int, avgDuration: Int)
val rankingsInput: DataSet[Ranking] =
  env.readHadoopFile(inputFormat, classOf[LongWritable], classOf[Text],
rankingsInputPath, job).map[Ranking](new MapFunction[(LongWritable, Text),
Ranking] {
  override def map(value: (LongWritable, Text)) = {
      val splits = value._2.toString.split(",")
      new Ranking(splits(0), splits(1).toInt, splits(2).toInt)
    }
  })


Is there a simpler way of doing this? All other variants I've tried yield
some type information errors.

Thanks in advance!
Robert

-- 
My GPG Key ID: 336E2680

Re: Map from Tuple to Case Class

Posted by Stefano Baghino <st...@radicalbit.io>.
I just noticed my snippets contains a whole lot of errors, but I'm glad
it's been helpful. :)

On Wed, May 4, 2016 at 3:59 PM, Robert Schmidtke <ro...@gmail.com>
wrote:

> Thanks Stefano! I guess you're right, it's probably not too bad except the
> MapFunction, which I have swapped with your suggestion now. I was just a
> bit confused by the fact that I had to state so many types, where I thought
> they could be inferred automatically. I tried variations of the
> "non-explicit" MapFunction, but I must have messed up something. The Array
> matching is pretty handy as well. I'm good to go now, all works well and
> looks a bit more Scala-y now :)
>
> Robert
>
> On Wed, May 4, 2016 at 3:42 PM, Stefano Baghino <
> stefano.baghino@radicalbit.io> wrote:
>
>> The only real noise I see is the usage of a MapFunction, which can be
>> rewritten like this in Scala:
>>
>> case class Ranking(pageUrl: String, pageRank: Int, avgDuration: Int)
>> val rankingsInput: DataSet[Ranking] =
>>   env.readHadoopFile(inputFormat, classOf[LongWritable], classOf[Text],
>> rankingsInputPath, job).map[Ranking] {
>>     (value: (LongWritable, Text)) = {
>>       val Array(name, n, m) = value._2.toString.split(",")
>>       Ranking(name, n.toInt, m.toInt) // no new needed for case classes
>>     }
>>   })
>>
>> As you may have noticed, I've also destructured the tuple in the first
>> line. Another way to do this destructuring in a more concise way is to use
>> an API extension [1] (which won't be available before 1.1, I suppose).
>>
>> Since you're parsing textual date, it could also possibly make sense to
>> handle error conditions for malformed inputs; here is an example that uses
>> flatMap to do so:
>>
>> import scala.util.{Try, Success, Failure} // needed to work with the
>> "functional" Try
>> case class Ranking(pageUrl: String, pageRank: Int, avgDuration: Int)
>> val rankingsInput: DataSet[Ranking] =
>>   env.readHadoopFile(inputFormat, classOf[LongWritable], classOf[Text],
>> rankingsInputPath, job).flatMap[Ranking] {
>>     (value: (LongWritable, Text), out: Collector[Ranking]) = {
>>       Try {
>>         val Array(name, n, m) = value._2.toString.split(",") // exception
>> thrown if array size != 3
>>         Ranking(name, n.toInt, m.toInt) // exception thrown if n or m are
>> not numbers
>>       } match {
>>         case Success(ranking) => ranking
>>         case Failure(exception) => // deal with malformed input, perhaps
>> log
>>       }
>>     }
>>   })
>>
>> Feel free to ask me for any kind of clarifications on the snippets [2] I
>> posted, I'll gladly help you further if you need it.
>>
>> Last note: I'm not a user but I believe Shapeless has some very handy
>> constructs to move back and forth between tuples and case classes (but
>> please take this with a grain of salt).
>>
>> [1]:
>> https://ci.apache.org/projects/flink/flink-docs-master/apis/scala_api_extensions.html#accept-partial-functions
>> [2]: I didn't test them, so caution is advisable ;)
>>
>> On Wed, May 4, 2016 at 2:00 PM, Robert Schmidtke <ro...@gmail.com>
>> wrote:
>>
>>> Hi everyone,
>>>
>>> first up, I'm new to Scala, so please bear with me, but I could not find
>>> any solution on the web or the Flink documentation. I'm having trouble
>>> converting a DataSet[(LongWritable, Text)] to a DataSet of a custom case
>>> class. I got it to work, however in a way that I feel is too verbose for
>>> Scala:
>>>
>>>
>>> import org.apache.flink.api.common.functions.MapFunction
>>> import org.apache.flink.api.scala._
>>>
>>> import org.apache.hadoop.io.LongWritable
>>> import org.apache.hadoop.io.Text
>>>
>>> case class Ranking(pageUrl: String, pageRank: Int, avgDuration: Int)
>>> val rankingsInput: DataSet[Ranking] =
>>>   env.readHadoopFile(inputFormat, classOf[LongWritable], classOf[Text],
>>> rankingsInputPath, job).map[Ranking](new MapFunction[(LongWritable, Text),
>>> Ranking] {
>>>   override def map(value: (LongWritable, Text)) = {
>>>       val splits = value._2.toString.split(",")
>>>       new Ranking(splits(0), splits(1).toInt, splits(2).toInt)
>>>     }
>>>   })
>>>
>>>
>>> Is there a simpler way of doing this? All other variants I've tried
>>> yield some type information errors.
>>>
>>> Thanks in advance!
>>> Robert
>>>
>>> --
>>> My GPG Key ID: 336E2680
>>>
>>
>>
>>
>> --
>> BR,
>> Stefano Baghino
>>
>> Software Engineer @ Radicalbit
>>
>
>
>
> --
> My GPG Key ID: 336E2680
>



-- 
BR,
Stefano Baghino

Software Engineer @ Radicalbit

Re: Map from Tuple to Case Class

Posted by Robert Schmidtke <ro...@gmail.com>.
Thanks Stefano! I guess you're right, it's probably not too bad except the
MapFunction, which I have swapped with your suggestion now. I was just a
bit confused by the fact that I had to state so many types, where I thought
they could be inferred automatically. I tried variations of the
"non-explicit" MapFunction, but I must have messed up something. The Array
matching is pretty handy as well. I'm good to go now, all works well and
looks a bit more Scala-y now :)

Robert

On Wed, May 4, 2016 at 3:42 PM, Stefano Baghino <
stefano.baghino@radicalbit.io> wrote:

> The only real noise I see is the usage of a MapFunction, which can be
> rewritten like this in Scala:
>
> case class Ranking(pageUrl: String, pageRank: Int, avgDuration: Int)
> val rankingsInput: DataSet[Ranking] =
>   env.readHadoopFile(inputFormat, classOf[LongWritable], classOf[Text],
> rankingsInputPath, job).map[Ranking] {
>     (value: (LongWritable, Text)) = {
>       val Array(name, n, m) = value._2.toString.split(",")
>       Ranking(name, n.toInt, m.toInt) // no new needed for case classes
>     }
>   })
>
> As you may have noticed, I've also destructured the tuple in the first
> line. Another way to do this destructuring in a more concise way is to use
> an API extension [1] (which won't be available before 1.1, I suppose).
>
> Since you're parsing textual date, it could also possibly make sense to
> handle error conditions for malformed inputs; here is an example that uses
> flatMap to do so:
>
> import scala.util.{Try, Success, Failure} // needed to work with the
> "functional" Try
> case class Ranking(pageUrl: String, pageRank: Int, avgDuration: Int)
> val rankingsInput: DataSet[Ranking] =
>   env.readHadoopFile(inputFormat, classOf[LongWritable], classOf[Text],
> rankingsInputPath, job).flatMap[Ranking] {
>     (value: (LongWritable, Text), out: Collector[Ranking]) = {
>       Try {
>         val Array(name, n, m) = value._2.toString.split(",") // exception
> thrown if array size != 3
>         Ranking(name, n.toInt, m.toInt) // exception thrown if n or m are
> not numbers
>       } match {
>         case Success(ranking) => ranking
>         case Failure(exception) => // deal with malformed input, perhaps
> log
>       }
>     }
>   })
>
> Feel free to ask me for any kind of clarifications on the snippets [2] I
> posted, I'll gladly help you further if you need it.
>
> Last note: I'm not a user but I believe Shapeless has some very handy
> constructs to move back and forth between tuples and case classes (but
> please take this with a grain of salt).
>
> [1]:
> https://ci.apache.org/projects/flink/flink-docs-master/apis/scala_api_extensions.html#accept-partial-functions
> [2]: I didn't test them, so caution is advisable ;)
>
> On Wed, May 4, 2016 at 2:00 PM, Robert Schmidtke <ro...@gmail.com>
> wrote:
>
>> Hi everyone,
>>
>> first up, I'm new to Scala, so please bear with me, but I could not find
>> any solution on the web or the Flink documentation. I'm having trouble
>> converting a DataSet[(LongWritable, Text)] to a DataSet of a custom case
>> class. I got it to work, however in a way that I feel is too verbose for
>> Scala:
>>
>>
>> import org.apache.flink.api.common.functions.MapFunction
>> import org.apache.flink.api.scala._
>>
>> import org.apache.hadoop.io.LongWritable
>> import org.apache.hadoop.io.Text
>>
>> case class Ranking(pageUrl: String, pageRank: Int, avgDuration: Int)
>> val rankingsInput: DataSet[Ranking] =
>>   env.readHadoopFile(inputFormat, classOf[LongWritable], classOf[Text],
>> rankingsInputPath, job).map[Ranking](new MapFunction[(LongWritable, Text),
>> Ranking] {
>>   override def map(value: (LongWritable, Text)) = {
>>       val splits = value._2.toString.split(",")
>>       new Ranking(splits(0), splits(1).toInt, splits(2).toInt)
>>     }
>>   })
>>
>>
>> Is there a simpler way of doing this? All other variants I've tried yield
>> some type information errors.
>>
>> Thanks in advance!
>> Robert
>>
>> --
>> My GPG Key ID: 336E2680
>>
>
>
>
> --
> BR,
> Stefano Baghino
>
> Software Engineer @ Radicalbit
>



-- 
My GPG Key ID: 336E2680

Re: Map from Tuple to Case Class

Posted by Stefano Baghino <st...@radicalbit.io>.
The only real noise I see is the usage of a MapFunction, which can be
rewritten like this in Scala:

case class Ranking(pageUrl: String, pageRank: Int, avgDuration: Int)
val rankingsInput: DataSet[Ranking] =
  env.readHadoopFile(inputFormat, classOf[LongWritable], classOf[Text],
rankingsInputPath, job).map[Ranking] {
    (value: (LongWritable, Text)) = {
      val Array(name, n, m) = value._2.toString.split(",")
      Ranking(name, n.toInt, m.toInt) // no new needed for case classes
    }
  })

As you may have noticed, I've also destructured the tuple in the first
line. Another way to do this destructuring in a more concise way is to use
an API extension [1] (which won't be available before 1.1, I suppose).

Since you're parsing textual date, it could also possibly make sense to
handle error conditions for malformed inputs; here is an example that uses
flatMap to do so:

import scala.util.{Try, Success, Failure} // needed to work with the
"functional" Try
case class Ranking(pageUrl: String, pageRank: Int, avgDuration: Int)
val rankingsInput: DataSet[Ranking] =
  env.readHadoopFile(inputFormat, classOf[LongWritable], classOf[Text],
rankingsInputPath, job).flatMap[Ranking] {
    (value: (LongWritable, Text), out: Collector[Ranking]) = {
      Try {
        val Array(name, n, m) = value._2.toString.split(",") // exception
thrown if array size != 3
        Ranking(name, n.toInt, m.toInt) // exception thrown if n or m are
not numbers
      } match {
        case Success(ranking) => ranking
        case Failure(exception) => // deal with malformed input, perhaps log
      }
    }
  })

Feel free to ask me for any kind of clarifications on the snippets [2] I
posted, I'll gladly help you further if you need it.

Last note: I'm not a user but I believe Shapeless has some very handy
constructs to move back and forth between tuples and case classes (but
please take this with a grain of salt).

[1]:
https://ci.apache.org/projects/flink/flink-docs-master/apis/scala_api_extensions.html#accept-partial-functions
[2]: I didn't test them, so caution is advisable ;)

On Wed, May 4, 2016 at 2:00 PM, Robert Schmidtke <ro...@gmail.com>
wrote:

> Hi everyone,
>
> first up, I'm new to Scala, so please bear with me, but I could not find
> any solution on the web or the Flink documentation. I'm having trouble
> converting a DataSet[(LongWritable, Text)] to a DataSet of a custom case
> class. I got it to work, however in a way that I feel is too verbose for
> Scala:
>
>
> import org.apache.flink.api.common.functions.MapFunction
> import org.apache.flink.api.scala._
>
> import org.apache.hadoop.io.LongWritable
> import org.apache.hadoop.io.Text
>
> case class Ranking(pageUrl: String, pageRank: Int, avgDuration: Int)
> val rankingsInput: DataSet[Ranking] =
>   env.readHadoopFile(inputFormat, classOf[LongWritable], classOf[Text],
> rankingsInputPath, job).map[Ranking](new MapFunction[(LongWritable, Text),
> Ranking] {
>   override def map(value: (LongWritable, Text)) = {
>       val splits = value._2.toString.split(",")
>       new Ranking(splits(0), splits(1).toInt, splits(2).toInt)
>     }
>   })
>
>
> Is there a simpler way of doing this? All other variants I've tried yield
> some type information errors.
>
> Thanks in advance!
> Robert
>
> --
> My GPG Key ID: 336E2680
>



-- 
BR,
Stefano Baghino

Software Engineer @ Radicalbit