You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Juan Rodríguez Hortalá <ju...@gmail.com> on 2019/07/22 06:29:36 UTC

Best way to compute the difference between 2 datasets

Hi,

I've been trying to write a function to compute the difference between 2
datasets. With that I mean computing a dataset that has all the elements of
a dataset that are not present in another dataset. I first tried using
coCogroup, but it was very slow in a local execution environment, and often
was crashing with OOM. Then I tried with leftOuterJoin and got similar
results. I then tried the following:

private[this] def minussWithSortPartition(other: DataSet[T]): DataSet[T] = {
  val selfMarked: DataSet[(T, Boolean)] = self.map((_, true))
  val otherMarked: DataSet[(T, Boolean)] = other.map((_, false))

  val all = selfMarked.union(otherMarked)
    .partitionByHash(0) // so occurrences of the same value in both
datasets go to the same partition
    .sortPartition[(T, Boolean)](identity, Order.ASCENDING)
  all.mapPartition[T] { (partitionIter: Iterator[(T, Boolean)],
collector: Collector[T]) =>
    var latestOtherOpt: Option[T] = None
    partitionIter.foreach {
      case (otherElem, false) => latestOtherOpt = Some(otherElem)
      case (selfElem, true) =>
        if (latestOtherOpt != Some(selfElem)) collector.collect(selfElem)
    }
  }
}


This is basically the idea of removing duplicates in a collection by first
sorting it, and then traversing it from beginning to end, removing the
elements that are consecutive to an element we just saw. That is extended
here to mark whether an element is coming from `self` or from `other`,
keeping only elements from `self` that are not following another occurrence
of the same element in `other`. That code is also really slow on a local
execution environment, and crashes a lot. But when I replace
`sortPartition` by sorting each partition in memory inside a mapPartition,
it works ok with the local execution environment.

private[this] def minusWithInMemoryPartition(other: DataSet[T]): DataSet[T] = {
  val selfMarked: DataSet[(T, Boolean)] = self.map((_, true))
  val otherMarked: DataSet[(T, Boolean)] = other.map((_, false))
  val all = selfMarked.union(otherMarked)
    .partitionByHash(0) // so occurrences of the same value in both
datasets go to the same partition
  all.mapPartition[T] { (partitionIter: Iterator[(T, Boolean)],
collector: Collector[T]) =>
    val sortedPartition = {
      val partition = partitionIter.toArray
      util.Sorting.quickSort(partition)
      partition
    }
    var latestOtherOpt: Option[T] = None
    sortedPartition.foreach {
      case (otherElem, false) => latestOtherOpt = Some(otherElem)
      case (selfElem, true) =>
        if (latestOtherOpt != Some(selfElem)) collector.collect(selfElem)
    }
  }
}


I'm surprised by such a big difference. This is my code
<https://github.com/demiourgoi/flink-check/blob/master/flink-check/src/main/scala/es/ucm/fdi/sscheck/matcher/specs2/flink/package.scala#L16>,
and a test
<https://github.com/demiourgoi/flink-check/blob/master/flink-check/src/test/scala/es/ucm/fdi/sscheck/matcher/specs2/flink/FlinkMatchersSpec.scala#L69>
I use for running this. I'm very surprised with these performance issues
with such small DataSet sizes, with less than 20 elements. Is this because
I'm running the program with a local execution environment?, are operations
like coGroup, leftOuterJoin or sorPartition implemented inefficiently in
the local environment? If that is the case, is there any other alternative
environment recommended for development in a single machine, where I won't
be experiencing these issues with those operations? Should I expect the
function `minussWithSortPartition` above to run efficiently on a cluster?
Or maybe there is something wrong with my code? Are there any plans to
provide a built-in minus operator in future versions of Flink?

Thanks,

Juan

Re: Best way to compute the difference between 2 datasets

Posted by Fabian Hueske <fh...@gmail.com>.
Btw. there is a set difference or minus operator in the Table API [1] that
might be helpful.

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/tableApi.html#set-operations

Am Fr., 20. Sept. 2019 um 15:30 Uhr schrieb Fabian Hueske <fhueske@gmail.com
>:

> Hi Juan,
>
> Both, the local execution environment and the remote execution environment
> run the same code to execute the program.
> The implementation of the sortPartition operator was designed to scale to
> data sizes that exceed the memory.
> Internally, it serializes all records into byte arrays and sorts the
> serialized data. This is of course more expensive than keeping all objects
> on the heap and sorting them there.
> Hence, a certain performance difference is to be expected. However,
> something that should not happen is that the program fails.
>
> What's the magnitude of the performance difference?
> Can you post a stack trace of the error?
>
> Thanks,
> Fabian
>
> Am Mo., 16. Sept. 2019 um 13:51 Uhr schrieb Juan Rodríguez Hortalá <
> juan.rodriguez.hortala@gmail.com>:
>
>> Hi Ken,
>>
>> Thanks for the suggestion, that idea should also work for implementing a
>> data set difference operation, which is what concerns me here. However, I
>> was also curious about why there is so much performance difference between
>> using sortPartition and sorting in memory by partition, for datasets as
>> small as 20 elements and running in local mode. For that data set sizes I
>> would expect no relevant performance difference, but with sortPartition the
>> program crashes, so I must be doing something wrong here.
>>
>> Thanks in any case for the idea.
>>
>> Greetings,
>>
>> Juan
>>
>> On Mon, Jul 22, 2019 at 8:49 AM Ken Krugler <kk...@transpac.com>
>> wrote:
>>
>>> Hi Juan,
>>>
>>> If you want to deduplicate, then you could group by the record, and use
>>> a (very simple) reduce function to only emit a record if the group contains
>>> one element.
>>>
>>> There will be performance issues, though - Flink will have to generate
>>> all groups first, which typically means spilling to disk if the data set
>>> has any significant size.
>>>
>>> — Ken
>>>
>>> PS - I assume that you’ve implemented a valid hashCode()/equals() for
>>> the record.
>>>
>>>
>>> On Jul 22, 2019, at 8:29 AM, Juan Rodríguez Hortalá <
>>> juan.rodriguez.hortala@gmail.com> wrote:
>>>
>>> Hi,
>>>
>>> I've been trying to write a function to compute the difference between 2
>>> datasets. With that I mean computing a dataset that has all the elements of
>>> a dataset that are not present in another dataset. I first tried using
>>> coCogroup, but it was very slow in a local execution environment, and often
>>> was crashing with OOM. Then I tried with leftOuterJoin and got similar
>>> results. I then tried the following:
>>>
>>> private[this] def minussWithSortPartition(other: DataSet[T]): DataSet[T] = {
>>>   val selfMarked: DataSet[(T, Boolean)] = self.map((_, true))
>>>   val otherMarked: DataSet[(T, Boolean)] = other.map((_, false))
>>>
>>>   val all = selfMarked.union(otherMarked)
>>>     .partitionByHash(0) // so occurrences of the same value in both datasets go to the same partition
>>>     .sortPartition[(T, Boolean)](identity, Order.ASCENDING)
>>>   all.mapPartition[T] { (partitionIter: Iterator[(T, Boolean)], collector: Collector[T]) =>
>>>     var latestOtherOpt: Option[T] = None
>>>     partitionIter.foreach {
>>>       case (otherElem, false) => latestOtherOpt = Some(otherElem)
>>>       case (selfElem, true) =>
>>>         if (latestOtherOpt != Some(selfElem)) collector.collect(selfElem)
>>>     }
>>>   }
>>> }
>>>
>>>
>>> This is basically the idea of removing duplicates in a collection by
>>> first sorting it, and then traversing it from beginning to end, removing
>>> the elements that are consecutive to an element we just saw. That is
>>> extended here to mark whether an element is coming from `self` or from
>>> `other`, keeping only elements from `self` that are not following another
>>> occurrence of the same element in `other`. That code is also really slow on
>>> a local execution environment, and crashes a lot. But when I replace
>>> `sortPartition` by sorting each partition in memory inside a mapPartition,
>>> it works ok with the local execution environment.
>>>
>>> private[this] def minusWithInMemoryPartition(other: DataSet[T]): DataSet[T] = {
>>>   val selfMarked: DataSet[(T, Boolean)] = self.map((_, true))
>>>   val otherMarked: DataSet[(T, Boolean)] = other.map((_, false))
>>>   val all = selfMarked.union(otherMarked)
>>>     .partitionByHash(0) // so occurrences of the same value in both datasets go to the same partition
>>>   all.mapPartition[T] { (partitionIter: Iterator[(T, Boolean)], collector: Collector[T]) =>
>>>     val sortedPartition = {
>>>       val partition = partitionIter.toArray
>>>       util.Sorting.quickSort(partition)
>>>       partition
>>>     }
>>>     var latestOtherOpt: Option[T] = None
>>>     sortedPartition.foreach {
>>>       case (otherElem, false) => latestOtherOpt = Some(otherElem)
>>>       case (selfElem, true) =>
>>>         if (latestOtherOpt != Some(selfElem)) collector.collect(selfElem)
>>>     }
>>>   }
>>> }
>>>
>>>
>>> I'm surprised by such a big difference. This is my code
>>> <https://github.com/demiourgoi/flink-check/blob/master/flink-check/src/main/scala/es/ucm/fdi/sscheck/matcher/specs2/flink/package.scala#L16>,
>>> and a test
>>> <https://github.com/demiourgoi/flink-check/blob/master/flink-check/src/test/scala/es/ucm/fdi/sscheck/matcher/specs2/flink/FlinkMatchersSpec.scala#L69>
>>> I use for running this. I'm very surprised with these performance issues
>>> with such small DataSet sizes, with less than 20 elements. Is this because
>>> I'm running the program with a local execution environment?, are operations
>>> like coGroup, leftOuterJoin or sorPartition implemented inefficiently in
>>> the local environment? If that is the case, is there any other alternative
>>> environment recommended for development in a single machine, where I won't
>>> be experiencing these issues with those operations? Should I expect the
>>> function `minussWithSortPartition` above to run efficiently on a cluster?
>>> Or maybe there is something wrong with my code? Are there any plans to
>>> provide a built-in minus operator in future versions of Flink?
>>>
>>> Thanks,
>>>
>>> Juan
>>>
>>>
>>>
>>> --------------------------
>>> Ken Krugler
>>> +1 530-210-6378
>>> http://www.scaleunlimited.com
>>> Custom big data solutions & training
>>> Flink, Solr, Hadoop, Cascading & Cassandra
>>>
>>>

Re: Best way to compute the difference between 2 datasets

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Juan,

Both, the local execution environment and the remote execution environment
run the same code to execute the program.
The implementation of the sortPartition operator was designed to scale to
data sizes that exceed the memory.
Internally, it serializes all records into byte arrays and sorts the
serialized data. This is of course more expensive than keeping all objects
on the heap and sorting them there.
Hence, a certain performance difference is to be expected. However,
something that should not happen is that the program fails.

What's the magnitude of the performance difference?
Can you post a stack trace of the error?

Thanks,
Fabian

Am Mo., 16. Sept. 2019 um 13:51 Uhr schrieb Juan Rodríguez Hortalá <
juan.rodriguez.hortala@gmail.com>:

> Hi Ken,
>
> Thanks for the suggestion, that idea should also work for implementing a
> data set difference operation, which is what concerns me here. However, I
> was also curious about why there is so much performance difference between
> using sortPartition and sorting in memory by partition, for datasets as
> small as 20 elements and running in local mode. For that data set sizes I
> would expect no relevant performance difference, but with sortPartition the
> program crashes, so I must be doing something wrong here.
>
> Thanks in any case for the idea.
>
> Greetings,
>
> Juan
>
> On Mon, Jul 22, 2019 at 8:49 AM Ken Krugler <kk...@transpac.com>
> wrote:
>
>> Hi Juan,
>>
>> If you want to deduplicate, then you could group by the record, and use a
>> (very simple) reduce function to only emit a record if the group contains
>> one element.
>>
>> There will be performance issues, though - Flink will have to generate
>> all groups first, which typically means spilling to disk if the data set
>> has any significant size.
>>
>> — Ken
>>
>> PS - I assume that you’ve implemented a valid hashCode()/equals() for the
>> record.
>>
>>
>> On Jul 22, 2019, at 8:29 AM, Juan Rodríguez Hortalá <
>> juan.rodriguez.hortala@gmail.com> wrote:
>>
>> Hi,
>>
>> I've been trying to write a function to compute the difference between 2
>> datasets. With that I mean computing a dataset that has all the elements of
>> a dataset that are not present in another dataset. I first tried using
>> coCogroup, but it was very slow in a local execution environment, and often
>> was crashing with OOM. Then I tried with leftOuterJoin and got similar
>> results. I then tried the following:
>>
>> private[this] def minussWithSortPartition(other: DataSet[T]): DataSet[T] = {
>>   val selfMarked: DataSet[(T, Boolean)] = self.map((_, true))
>>   val otherMarked: DataSet[(T, Boolean)] = other.map((_, false))
>>
>>   val all = selfMarked.union(otherMarked)
>>     .partitionByHash(0) // so occurrences of the same value in both datasets go to the same partition
>>     .sortPartition[(T, Boolean)](identity, Order.ASCENDING)
>>   all.mapPartition[T] { (partitionIter: Iterator[(T, Boolean)], collector: Collector[T]) =>
>>     var latestOtherOpt: Option[T] = None
>>     partitionIter.foreach {
>>       case (otherElem, false) => latestOtherOpt = Some(otherElem)
>>       case (selfElem, true) =>
>>         if (latestOtherOpt != Some(selfElem)) collector.collect(selfElem)
>>     }
>>   }
>> }
>>
>>
>> This is basically the idea of removing duplicates in a collection by
>> first sorting it, and then traversing it from beginning to end, removing
>> the elements that are consecutive to an element we just saw. That is
>> extended here to mark whether an element is coming from `self` or from
>> `other`, keeping only elements from `self` that are not following another
>> occurrence of the same element in `other`. That code is also really slow on
>> a local execution environment, and crashes a lot. But when I replace
>> `sortPartition` by sorting each partition in memory inside a mapPartition,
>> it works ok with the local execution environment.
>>
>> private[this] def minusWithInMemoryPartition(other: DataSet[T]): DataSet[T] = {
>>   val selfMarked: DataSet[(T, Boolean)] = self.map((_, true))
>>   val otherMarked: DataSet[(T, Boolean)] = other.map((_, false))
>>   val all = selfMarked.union(otherMarked)
>>     .partitionByHash(0) // so occurrences of the same value in both datasets go to the same partition
>>   all.mapPartition[T] { (partitionIter: Iterator[(T, Boolean)], collector: Collector[T]) =>
>>     val sortedPartition = {
>>       val partition = partitionIter.toArray
>>       util.Sorting.quickSort(partition)
>>       partition
>>     }
>>     var latestOtherOpt: Option[T] = None
>>     sortedPartition.foreach {
>>       case (otherElem, false) => latestOtherOpt = Some(otherElem)
>>       case (selfElem, true) =>
>>         if (latestOtherOpt != Some(selfElem)) collector.collect(selfElem)
>>     }
>>   }
>> }
>>
>>
>> I'm surprised by such a big difference. This is my code
>> <https://github.com/demiourgoi/flink-check/blob/master/flink-check/src/main/scala/es/ucm/fdi/sscheck/matcher/specs2/flink/package.scala#L16>,
>> and a test
>> <https://github.com/demiourgoi/flink-check/blob/master/flink-check/src/test/scala/es/ucm/fdi/sscheck/matcher/specs2/flink/FlinkMatchersSpec.scala#L69>
>> I use for running this. I'm very surprised with these performance issues
>> with such small DataSet sizes, with less than 20 elements. Is this because
>> I'm running the program with a local execution environment?, are operations
>> like coGroup, leftOuterJoin or sorPartition implemented inefficiently in
>> the local environment? If that is the case, is there any other alternative
>> environment recommended for development in a single machine, where I won't
>> be experiencing these issues with those operations? Should I expect the
>> function `minussWithSortPartition` above to run efficiently on a cluster?
>> Or maybe there is something wrong with my code? Are there any plans to
>> provide a built-in minus operator in future versions of Flink?
>>
>> Thanks,
>>
>> Juan
>>
>>
>>
>> --------------------------
>> Ken Krugler
>> +1 530-210-6378
>> http://www.scaleunlimited.com
>> Custom big data solutions & training
>> Flink, Solr, Hadoop, Cascading & Cassandra
>>
>>

Re: Best way to compute the difference between 2 datasets

Posted by Juan Rodríguez Hortalá <ju...@gmail.com>.
Hi Ken,

Thanks for the suggestion, that idea should also work for implementing a
data set difference operation, which is what concerns me here. However, I
was also curious about why there is so much performance difference between
using sortPartition and sorting in memory by partition, for datasets as
small as 20 elements and running in local mode. For that data set sizes I
would expect no relevant performance difference, but with sortPartition the
program crashes, so I must be doing something wrong here.

Thanks in any case for the idea.

Greetings,

Juan

On Mon, Jul 22, 2019 at 8:49 AM Ken Krugler <kk...@transpac.com>
wrote:

> Hi Juan,
>
> If you want to deduplicate, then you could group by the record, and use a
> (very simple) reduce function to only emit a record if the group contains
> one element.
>
> There will be performance issues, though - Flink will have to generate all
> groups first, which typically means spilling to disk if the data set has
> any significant size.
>
> — Ken
>
> PS - I assume that you’ve implemented a valid hashCode()/equals() for the
> record.
>
>
> On Jul 22, 2019, at 8:29 AM, Juan Rodríguez Hortalá <
> juan.rodriguez.hortala@gmail.com> wrote:
>
> Hi,
>
> I've been trying to write a function to compute the difference between 2
> datasets. With that I mean computing a dataset that has all the elements of
> a dataset that are not present in another dataset. I first tried using
> coCogroup, but it was very slow in a local execution environment, and often
> was crashing with OOM. Then I tried with leftOuterJoin and got similar
> results. I then tried the following:
>
> private[this] def minussWithSortPartition(other: DataSet[T]): DataSet[T] = {
>   val selfMarked: DataSet[(T, Boolean)] = self.map((_, true))
>   val otherMarked: DataSet[(T, Boolean)] = other.map((_, false))
>
>   val all = selfMarked.union(otherMarked)
>     .partitionByHash(0) // so occurrences of the same value in both datasets go to the same partition
>     .sortPartition[(T, Boolean)](identity, Order.ASCENDING)
>   all.mapPartition[T] { (partitionIter: Iterator[(T, Boolean)], collector: Collector[T]) =>
>     var latestOtherOpt: Option[T] = None
>     partitionIter.foreach {
>       case (otherElem, false) => latestOtherOpt = Some(otherElem)
>       case (selfElem, true) =>
>         if (latestOtherOpt != Some(selfElem)) collector.collect(selfElem)
>     }
>   }
> }
>
>
> This is basically the idea of removing duplicates in a collection by first
> sorting it, and then traversing it from beginning to end, removing the
> elements that are consecutive to an element we just saw. That is extended
> here to mark whether an element is coming from `self` or from `other`,
> keeping only elements from `self` that are not following another occurrence
> of the same element in `other`. That code is also really slow on a local
> execution environment, and crashes a lot. But when I replace
> `sortPartition` by sorting each partition in memory inside a mapPartition,
> it works ok with the local execution environment.
>
> private[this] def minusWithInMemoryPartition(other: DataSet[T]): DataSet[T] = {
>   val selfMarked: DataSet[(T, Boolean)] = self.map((_, true))
>   val otherMarked: DataSet[(T, Boolean)] = other.map((_, false))
>   val all = selfMarked.union(otherMarked)
>     .partitionByHash(0) // so occurrences of the same value in both datasets go to the same partition
>   all.mapPartition[T] { (partitionIter: Iterator[(T, Boolean)], collector: Collector[T]) =>
>     val sortedPartition = {
>       val partition = partitionIter.toArray
>       util.Sorting.quickSort(partition)
>       partition
>     }
>     var latestOtherOpt: Option[T] = None
>     sortedPartition.foreach {
>       case (otherElem, false) => latestOtherOpt = Some(otherElem)
>       case (selfElem, true) =>
>         if (latestOtherOpt != Some(selfElem)) collector.collect(selfElem)
>     }
>   }
> }
>
>
> I'm surprised by such a big difference. This is my code
> <https://github.com/demiourgoi/flink-check/blob/master/flink-check/src/main/scala/es/ucm/fdi/sscheck/matcher/specs2/flink/package.scala#L16>,
> and a test
> <https://github.com/demiourgoi/flink-check/blob/master/flink-check/src/test/scala/es/ucm/fdi/sscheck/matcher/specs2/flink/FlinkMatchersSpec.scala#L69>
> I use for running this. I'm very surprised with these performance issues
> with such small DataSet sizes, with less than 20 elements. Is this because
> I'm running the program with a local execution environment?, are operations
> like coGroup, leftOuterJoin or sorPartition implemented inefficiently in
> the local environment? If that is the case, is there any other alternative
> environment recommended for development in a single machine, where I won't
> be experiencing these issues with those operations? Should I expect the
> function `minussWithSortPartition` above to run efficiently on a cluster?
> Or maybe there is something wrong with my code? Are there any plans to
> provide a built-in minus operator in future versions of Flink?
>
> Thanks,
>
> Juan
>
>
>
> --------------------------
> Ken Krugler
> +1 530-210-6378
> http://www.scaleunlimited.com
> Custom big data solutions & training
> Flink, Solr, Hadoop, Cascading & Cassandra
>
>

Re: Best way to compute the difference between 2 datasets

Posted by Ken Krugler <kk...@transpac.com>.
Hi Juan,

If you want to deduplicate, then you could group by the record, and use a (very simple) reduce function to only emit a record if the group contains one element.

There will be performance issues, though - Flink will have to generate all groups first, which typically means spilling to disk if the data set has any significant size.

— Ken

PS - I assume that you’ve implemented a valid hashCode()/equals() for the record.


> On Jul 22, 2019, at 8:29 AM, Juan Rodríguez Hortalá <ju...@gmail.com> wrote:
> 
> Hi, 
> 
> I've been trying to write a function to compute the difference between 2 datasets. With that I mean computing a dataset that has all the elements of a dataset that are not present in another dataset. I first tried using coCogroup, but it was very slow in a local execution environment, and often was crashing with OOM. Then I tried with leftOuterJoin and got similar results. I then tried the following:
> 
> private[this] def minussWithSortPartition(other: DataSet[T]): DataSet[T] = {
>   val selfMarked: DataSet[(T, Boolean)] = self.map((_, true))
>   val otherMarked: DataSet[(T, Boolean)] = other.map((_, false))
> 
>   val all = selfMarked.union(otherMarked)
>     .partitionByHash(0) // so occurrences of the same value in both datasets go to the same partition
>     .sortPartition[(T, Boolean)](identity, Order.ASCENDING)
>   all.mapPartition[T] { (partitionIter: Iterator[(T, Boolean)], collector: Collector[T]) =>
>     var latestOtherOpt: Option[T] = None
>     partitionIter.foreach {
>       case (otherElem, false) => latestOtherOpt = Some(otherElem)
>       case (selfElem, true) =>
>         if (latestOtherOpt != Some(selfElem)) collector.collect(selfElem)
>     }
>   }
> }
> 
> This is basically the idea of removing duplicates in a collection by first sorting it, and then traversing it from beginning to end, removing the elements that are consecutive to an element we just saw. That is extended here to mark whether an element is coming from `self` or from `other`, keeping only elements from `self` that are not following another occurrence of the same element in `other`. That code is also really slow on a local execution environment, and crashes a lot. But when I replace `sortPartition` by sorting each partition in memory inside a mapPartition, it works ok with the local execution environment.
> 
> private[this] def minusWithInMemoryPartition(other: DataSet[T]): DataSet[T] = {
>   val selfMarked: DataSet[(T, Boolean)] = self.map((_, true))
>   val otherMarked: DataSet[(T, Boolean)] = other.map((_, false))
>   val all = selfMarked.union(otherMarked)
>     .partitionByHash(0) // so occurrences of the same value in both datasets go to the same partition
>   all.mapPartition[T] { (partitionIter: Iterator[(T, Boolean)], collector: Collector[T]) =>
>     val sortedPartition = {
>       val partition = partitionIter.toArray
>       util.Sorting.quickSort(partition)
>       partition
>     }
>     var latestOtherOpt: Option[T] = None
>     sortedPartition.foreach {
>       case (otherElem, false) => latestOtherOpt = Some(otherElem)
>       case (selfElem, true) =>
>         if (latestOtherOpt != Some(selfElem)) collector.collect(selfElem)
>     }
>   }
> }
> 
> I'm surprised by such a big difference. This is my code <https://github.com/demiourgoi/flink-check/blob/master/flink-check/src/main/scala/es/ucm/fdi/sscheck/matcher/specs2/flink/package.scala#L16>, and a test <https://github.com/demiourgoi/flink-check/blob/master/flink-check/src/test/scala/es/ucm/fdi/sscheck/matcher/specs2/flink/FlinkMatchersSpec.scala#L69> I use for running this. I'm very surprised with these performance issues with such small DataSet sizes, with less than 20 elements. Is this because I'm running the program with a local execution environment?, are operations like coGroup, leftOuterJoin or sorPartition implemented inefficiently in the local environment? If that is the case, is there any other alternative environment recommended for development in a single machine, where I won't be experiencing these issues with those operations? Should I expect the function `minussWithSortPartition` above to run efficiently on a cluster? Or maybe there is something wrong with my code? Are there any plans to provide a built-in minus operator in future versions of Flink?
> 
> Thanks, 
> 
> Juan 
> 
> 

--------------------------
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
Custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra