You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Felipe Gutierrez <fe...@gmail.com> on 2018/03/03 13:33:31 UTC

Flink join operator after sorting seems to group fields (Scala)

Hi all,

I have implemented a simple Scala object using Flink to play with joins
operator. After that, I put the join operator show my results I decided to
sort the output by the first field (.sortPartition(0, Order.ASCENDING)). It
seems that the output is ordered by group. The output shows two groups of
"Fyodor Dostoyevsky". Why is this happening? How do I sort the complete
DataSet?

Kind Regards,
Felipe

import org.apache.flink.api.common.operators.Orderimport
org.apache.flink.api.scala.{ExecutionEnvironment, _}
object JoinBooksAndAuthors {
  val AUTHOR_ID_FIELD: Int = 0
  val AUTHOR_NAME_FIELD: Int = 1

  val BOOK_AUTHORID_FIELD: Int = 0
  val BOOK_YEAR_FIELD: Int = 1
  val BOOK_NAME_FIELD: Int = 2

  def main(args: Array[String]) {

    val env = ExecutionEnvironment.getExecutionEnvironment

    val authors = env.readCsvFile[(Int, String)](
      "downloads/authors.tsv",
      fieldDelimiter = "\t",
      lineDelimiter = "\n",
      includedFields = Array(0, 1)
    )

    val books = env.readCsvFile[(Int, Short, String)](
      "downloads/books.tsv",
      fieldDelimiter = "\t",
      lineDelimiter = "\n",
      includedFields = Array(0, 1, 2)
    )

    authors
      .join(books)
      .where(AUTHOR_ID_FIELD)
      .equalTo(BOOK_AUTHORID_FIELD)
      .map(tuple => (tuple._1._2, tuple._2._3))
      .sortPartition(0, Order.ASCENDING)
      .print()
  }}

output

(Charles Bukowski,Women)(Charles Bukowski,The Most Beautiful Woman in
Town)(Charles Bukowski,Hot Water Music)(Charles
Bukowski,Barfly)(Charles Bukowski,Notes of a Dirty Old Man)(Charles
Bukowski,Ham on Rye)(Fyodor Dostoyevsky,The Brothers Karamazov)(Fyodor
Dostoyevsky,The Double: A Petersburg Poem)(Fyodor Dostoyevsky,Poor
Folk)(George Orwell,Coming Up for Air)(George Orwell,Burmese
Days)(George Orwell,A Clergyman's Daughter)(George Orwell,Down and Out
in Paris and London)(Albert Camus,The Plague)(Fyodor Dostoyevsky,The
Eternal Husband)(Fyodor Dostoyevsky,The Gambler)(Fyodor
Dostoyevsky,The House of the Dead)(Fyodor Dostoyevsky,Crime and
Punishment)(Fyodor Dostoyevsky,Netochka Nezvanova).....






-- 

*---- Felipe Oliveira Gutierrez*

*-- skype: felipe.o.gutierrez*
*--* *https://felipeogutierrez.blogspot.com
<https://felipeogutierrez.blogspot.com>*

Re: Flink join operator after sorting seems to group fields (Scala)

Posted by Felipe Gutierrez <fe...@gmail.com>.
thanks Xingcan!

Now my output is correct and I am understanding a little bit more about
Flink architecture. So I have to consider that the processing was done in
different partitions even I am running it locally.

Kind Regards,
Felipe

On Sat, Mar 3, 2018 at 11:25 AM, Xingcan Cui <xi...@gmail.com> wrote:

> Hi Felipe,
>
> the `sortPartition()` method just LOCALLY sorts each partition of a
> dataset. To achieve a global sorting, use this method after a
> `partitionByRange()` (e.g., `result.partitionByRange(0).sortPartition(0,
> Order.ASCENDING)`).
>
> Hope that helps,
> Xingcan
>
> On 3 Mar 2018, at 9:33 PM, Felipe Gutierrez <fe...@gmail.com>
> wrote:
>
> Hi all,
>
> I have implemented a simple Scala object using Flink to play with joins
> operator. After that, I put the join operator show my results I decided to
> sort the output by the first field (.sortPartition(0, Order.ASCENDING)).
> It seems that the output is ordered by group. The output shows two groups
> of "Fyodor Dostoyevsky". Why is this happening? How do I sort the complete
> DataSet?
>
> Kind Regards,
> Felipe
>
> import org.apache.flink.api.common.operators.Orderimport org.apache.flink.api.scala.{ExecutionEnvironment, _}
> object JoinBooksAndAuthors {
>   val AUTHOR_ID_FIELD: Int = 0
>   val AUTHOR_NAME_FIELD: Int = 1
>
>   val BOOK_AUTHORID_FIELD: Int = 0
>   val BOOK_YEAR_FIELD: Int = 1
>   val BOOK_NAME_FIELD: Int = 2
>
>   def main(args: Array[String]) {
>
>     val env = ExecutionEnvironment.getExecutionEnvironment
>
>     val authors = env.readCsvFile[(Int, String)](
>       "downloads/authors.tsv",
>       fieldDelimiter = "\t",
>       lineDelimiter = "\n",
>       includedFields = Array(0, 1)
>     )
>
>     val books = env.readCsvFile[(Int, Short, String)](
>       "downloads/books.tsv",
>       fieldDelimiter = "\t",
>       lineDelimiter = "\n",
>       includedFields = Array(0, 1, 2)
>     )
>
>     authors
>       .join(books)
>       .where(AUTHOR_ID_FIELD)
>       .equalTo(BOOK_AUTHORID_FIELD)
>       .map(tuple => (tuple._1._2, tuple._2._3))
>       .sortPartition(0, Order.ASCENDING)
>       .print()
>   }}
>
> output
>
> (Charles Bukowski,Women)(Charles Bukowski,The Most Beautiful Woman in Town)(Charles Bukowski,Hot Water Music)(Charles Bukowski,Barfly)(Charles Bukowski,Notes of a Dirty Old Man)(Charles Bukowski,Ham on Rye)(Fyodor Dostoyevsky,The Brothers Karamazov)(Fyodor Dostoyevsky,The Double: A Petersburg Poem)(Fyodor Dostoyevsky,Poor Folk)(George Orwell,Coming Up for Air)(George Orwell,Burmese Days)(George Orwell,A Clergyman's Daughter)(George Orwell,Down and Out in Paris and London)(Albert Camus,The Plague)(Fyodor Dostoyevsky,The Eternal Husband)(Fyodor Dostoyevsky,The Gambler)(Fyodor Dostoyevsky,The House of the Dead)(Fyodor Dostoyevsky,Crime and Punishment)(Fyodor Dostoyevsky,Netochka Nezvanova).....
>
>
>
>
>
>
> --
>
> *---- Felipe Oliveira Gutierrez*
>
> *-- skype: felipe.o.gutierrez*
> *--* *https://felipeogutierrez.blogspot.com
> <https://felipeogutierrez.blogspot.com/>*
>
>
>


-- 

*---- Felipe Oliveira Gutierrez*

*-- skype: felipe.o.gutierrez*
*--* *https://felipeogutierrez.blogspot.com
<https://felipeogutierrez.blogspot.com>*

Re: Flink join operator after sorting seems to group fields (Scala)

Posted by Xingcan Cui <xi...@gmail.com>.
Hi Felipe,

the `sortPartition()` method just LOCALLY sorts each partition of a dataset. To achieve a global sorting, use this method after a `partitionByRange()` (e.g., `result.partitionByRange(0).sortPartition(0, Order.ASCENDING)`).

Hope that helps,
Xingcan

> On 3 Mar 2018, at 9:33 PM, Felipe Gutierrez <fe...@gmail.com> wrote:
> 
> Hi all,
> 
> I have implemented a simple Scala object using Flink to play with joins operator. After that, I put the join operator show my results I decided to sort the output by the first field (.sortPartition(0, Order.ASCENDING)). It seems that the output is ordered by group. The output shows two groups of "Fyodor Dostoyevsky". Why is this happening? How do I sort the complete DataSet?
> 
> Kind Regards,
> Felipe
> 
> import org.apache.flink.api.common.operators.Order
> import org.apache.flink.api.scala.{ExecutionEnvironment, _}
> 
> object JoinBooksAndAuthors {
>   val AUTHOR_ID_FIELD: Int = 0
>   val AUTHOR_NAME_FIELD: Int = 1
> 
>   val BOOK_AUTHORID_FIELD: Int = 0
>   val BOOK_YEAR_FIELD: Int = 1
>   val BOOK_NAME_FIELD: Int = 2
> 
>   def main(args: Array[String]) {
> 
>     val env = ExecutionEnvironment.getExecutionEnvironment
> 
>     val authors = env.readCsvFile[(Int, String)](
>       "downloads/authors.tsv",
>       fieldDelimiter = "\t",
>       lineDelimiter = "\n",
>       includedFields = Array(0, 1)
>     )
> 
>     val books = env.readCsvFile[(Int, Short, String)](
>       "downloads/books.tsv",
>       fieldDelimiter = "\t",
>       lineDelimiter = "\n",
>       includedFields = Array(0, 1, 2)
>     )
> 
>     authors
>       .join(books)
>       .where(AUTHOR_ID_FIELD)
>       .equalTo(BOOK_AUTHORID_FIELD)
>       .map(tuple => (tuple._1._2, tuple._2._3))
>       .sortPartition(0, Order.ASCENDING)
>       .print()
>   }
> }
> output
> 
> (Charles Bukowski,Women)
> (Charles Bukowski,The Most Beautiful Woman in Town)
> (Charles Bukowski,Hot Water Music)
> (Charles Bukowski,Barfly)
> (Charles Bukowski,Notes of a Dirty Old Man)
> (Charles Bukowski,Ham on Rye)
> (Fyodor Dostoyevsky,The Brothers Karamazov)
> (Fyodor Dostoyevsky,The Double: A Petersburg Poem)
> (Fyodor Dostoyevsky,Poor Folk)
> (George Orwell,Coming Up for Air)
> (George Orwell,Burmese Days)
> (George Orwell,A Clergyman's Daughter)
> (George Orwell,Down and Out in Paris and London)
> (Albert Camus,The Plague)
> (Fyodor Dostoyevsky,The Eternal Husband)
> (Fyodor Dostoyevsky,The Gambler)
> (Fyodor Dostoyevsky,The House of the Dead)
> (Fyodor Dostoyevsky,Crime and Punishment)
> (Fyodor Dostoyevsky,Netochka Nezvanova)
> .....
> 
> 
> 
> 
> 
> -- 
> --
> -- Felipe Oliveira Gutierrez
> -- skype: felipe.o.gutierrez
> -- https://felipeogutierrez.blogspot.com <https://felipeogutierrez.blogspot.com/>