You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Ameet Kini <am...@gmail.com> on 2013/10/18 23:20:02 UTC

examples of map-side join of two hadoop sequence files

I've seen discussions where the suggestion is to do a map-side join, but
haven't seen an example yet, and can certainly use one. I have two sequence
files where the key is unique within each file, so the join is a one-to-one
join, and can hence benefit from a map-side join. However both sequence
files can be large, so reading one of them completely in the driver and
broadcasting it out would be expensive.

I don't think there is a map-side join implementation in Spark but earlier
suggestions have been to write one using mapPartitions on one of the
operands as the outer loop. If that is the case, how would I fetch the
split corresponding to the keys in the outer's partition. I'd prefer to do
a fetch-per-partition rather than a fetch-per-tuple.

In any case, some feedback, and preferably, an example of a map-side join
without broadcasting would help.

Thanks,
Ameet

Re: examples of map-side join of two hadoop sequence files

Posted by Ameet Kini <am...@gmail.com>.
So for now, I solved this problem of doing a map-side join by partitioning
my sequence files (actually, map files) such that they are range
partitioned on the key. For every new Spark partition (as returned by
mapPartition), I look at the very first key and based on that key, I open a
MapFile.Reader on the right map file. The successive keys are guaranteed to
be in that same map file, and hence can be served by the already opened
reader.

While this works, I would like to explore overriding HadoopRDD as per
Reynold's suggestion, as it would make for a cleaner implementation.

Thanks for your suggestions.

Ameet




On Mon, Oct 21, 2013 at 2:12 PM, Reynold Xin <rx...@apache.org> wrote:

> Maybe you can override HadoopRDD's compute method to do that?
>
>
> On Mon, Oct 21, 2013 at 8:16 AM, Ameet Kini <am...@gmail.com> wrote:
>
>> Right, except both my sequence files are large and so doing a "collect()"
>> and then broadcasting one of them would be costly. Since I have two large
>> sorted sequence files with a one-to-one relationship among the keys, I need
>> to perform the "merge" portion of a good old "sort-merge" join. And it is
>> actually a very simple merge, since each key is unique within the file.
>>
>> I was looking at the mapPartitions API:
>> def  mapPartitions[U](f: (Iterator[T]) => Iterator[U], preservesPartitioning:
>> Boolean)(implicit arg0: ClassManifest[U]): RDD<http://spark.incubator.apache.org/docs/latest/api/core/org/apache/spark/rdd/RDD.html>
>> [U]
>>
>> If somehow the function f has access to the underlying partition
>> information (e.g., HadoopPartition.inputSplit), then it could open a reader
>> on the actual hdfs file corresponding to that inputSplit, and manually do
>> the join. But looks like HadoopPartition is declared private. Is there some
>> other way to figure out which underlying HDFS file corresponds to the
>> partition being iterated upon in mapPartitions?
>>
>> Ameet
>>
>>
>>
>>
>> On Mon, Oct 21, 2013 at 12:54 AM, Reynold Xin <rx...@apache.org> wrote:
>>
>>> How about the following:
>>>
>>> val smallFile = sc.sequenceFile(....).collect()
>>> val largeFile = sc.sequenceFile(...)
>>>
>>> val small = sc.broadcast(smallFile)
>>> largeFile.mapPartitions { iter =>
>>>   // build up a hash table for small. called it smallTable
>>>   iter.filter(row => smallTable.contains(row.joinKey)).map { row =>
>>>     join smallTable.get(row.joinKey) with row itself
>>>   }
>>> }
>>>
>>>
>>>
>>>
>>> On Fri, Oct 18, 2013 at 2:22 PM, Ameet Kini <am...@gmail.com> wrote:
>>>
>>>> Forgot to add an important point. My sequence files are sorted (they're
>>>> actually Hadoop map files). Since they're sorted, it makes sense to do a
>>>> fetch at the partition-level of the inner sequence file.
>>>>
>>>> Thanks,
>>>> Ameet
>>>>
>>>>
>>>> On Fri, Oct 18, 2013 at 5:20 PM, Ameet Kini <am...@gmail.com>wrote:
>>>>
>>>>>
>>>>> I've seen discussions where the suggestion is to do a map-side join,
>>>>> but haven't seen an example yet, and can certainly use one. I have two
>>>>> sequence files where the key is unique within each file, so the join is a
>>>>> one-to-one join, and can hence benefit from a map-side join. However both
>>>>> sequence files can be large, so reading one of them completely in the
>>>>> driver and broadcasting it out would be expensive.
>>>>>
>>>>> I don't think there is a map-side join implementation in Spark but
>>>>> earlier suggestions have been to write one using mapPartitions on one of
>>>>> the operands as the outer loop. If that is the case, how would I fetch the
>>>>> split corresponding to the keys in the outer's partition. I'd prefer to do
>>>>> a fetch-per-partition rather than a fetch-per-tuple.
>>>>>
>>>>> In any case, some feedback, and preferably, an example of a map-side
>>>>> join without broadcasting would help.
>>>>>
>>>>> Thanks,
>>>>> Ameet
>>>>>
>>>>
>>>>
>>>
>>
>

Re: examples of map-side join of two hadoop sequence files

Posted by Reynold Xin <rx...@apache.org>.
Maybe you can override HadoopRDD's compute method to do that?


On Mon, Oct 21, 2013 at 8:16 AM, Ameet Kini <am...@gmail.com> wrote:

> Right, except both my sequence files are large and so doing a "collect()"
> and then broadcasting one of them would be costly. Since I have two large
> sorted sequence files with a one-to-one relationship among the keys, I need
> to perform the "merge" portion of a good old "sort-merge" join. And it is
> actually a very simple merge, since each key is unique within the file.
>
> I was looking at the mapPartitions API:
> def  mapPartitions[U](f: (Iterator[T]) ⇒ Iterator[U], preservesPartitioning:
> Boolean)(implicit arg0: ClassManifest[U]): RDD<http://spark.incubator.apache.org/docs/latest/api/core/org/apache/spark/rdd/RDD.html>
> [U]
>
> If somehow the function f has access to the underlying partition
> information (e.g., HadoopPartition.inputSplit), then it could open a reader
> on the actual hdfs file corresponding to that inputSplit, and manually do
> the join. But looks like HadoopPartition is declared private. Is there some
> other way to figure out which underlying HDFS file corresponds to the
> partition being iterated upon in mapPartitions?
>
> Ameet
>
>
>
>
> On Mon, Oct 21, 2013 at 12:54 AM, Reynold Xin <rx...@apache.org> wrote:
>
>> How about the following:
>>
>> val smallFile = sc.sequenceFile(....).collect()
>> val largeFile = sc.sequenceFile(...)
>>
>> val small = sc.broadcast(smallFile)
>> largeFile.mapPartitions { iter =>
>>   // build up a hash table for small. called it smallTable
>>   iter.filter(row => smallTable.contains(row.joinKey)).map { row =>
>>     join smallTable.get(row.joinKey) with row itself
>>   }
>> }
>>
>>
>>
>>
>> On Fri, Oct 18, 2013 at 2:22 PM, Ameet Kini <am...@gmail.com> wrote:
>>
>>> Forgot to add an important point. My sequence files are sorted (they're
>>> actually Hadoop map files). Since they're sorted, it makes sense to do a
>>> fetch at the partition-level of the inner sequence file.
>>>
>>> Thanks,
>>> Ameet
>>>
>>>
>>> On Fri, Oct 18, 2013 at 5:20 PM, Ameet Kini <am...@gmail.com> wrote:
>>>
>>>>
>>>> I've seen discussions where the suggestion is to do a map-side join,
>>>> but haven't seen an example yet, and can certainly use one. I have two
>>>> sequence files where the key is unique within each file, so the join is a
>>>> one-to-one join, and can hence benefit from a map-side join. However both
>>>> sequence files can be large, so reading one of them completely in the
>>>> driver and broadcasting it out would be expensive.
>>>>
>>>> I don't think there is a map-side join implementation in Spark but
>>>> earlier suggestions have been to write one using mapPartitions on one of
>>>> the operands as the outer loop. If that is the case, how would I fetch the
>>>> split corresponding to the keys in the outer's partition. I'd prefer to do
>>>> a fetch-per-partition rather than a fetch-per-tuple.
>>>>
>>>> In any case, some feedback, and preferably, an example of a map-side
>>>> join without broadcasting would help.
>>>>
>>>> Thanks,
>>>> Ameet
>>>>
>>>
>>>
>>
>

Re: examples of map-side join of two hadoop sequence files

Posted by Ameet Kini <am...@gmail.com>.
Right, except both my sequence files are large and so doing a "collect()"
and then broadcasting one of them would be costly. Since I have two large
sorted sequence files with a one-to-one relationship among the keys, I need
to perform the "merge" portion of a good old "sort-merge" join. And it is
actually a very simple merge, since each key is unique within the file.

I was looking at the mapPartitions API:
def  mapPartitions[U](f: (Iterator[T]) => Iterator[U], preservesPartitioning:
Boolean)(implicit arg0: ClassManifest[U]):
RDD<http://spark.incubator.apache.org/docs/latest/api/core/org/apache/spark/rdd/RDD.html>
[U]

If somehow the function f has access to the underlying partition
information (e.g., HadoopPartition.inputSplit), then it could open a reader
on the actual hdfs file corresponding to that inputSplit, and manually do
the join. But looks like HadoopPartition is declared private. Is there some
other way to figure out which underlying HDFS file corresponds to the
partition being iterated upon in mapPartitions?

Ameet




On Mon, Oct 21, 2013 at 12:54 AM, Reynold Xin <rx...@apache.org> wrote:

> How about the following:
>
> val smallFile = sc.sequenceFile(....).collect()
> val largeFile = sc.sequenceFile(...)
>
> val small = sc.broadcast(smallFile)
> largeFile.mapPartitions { iter =>
>   // build up a hash table for small. called it smallTable
>   iter.filter(row => smallTable.contains(row.joinKey)).map { row =>
>     join smallTable.get(row.joinKey) with row itself
>   }
> }
>
>
>
>
> On Fri, Oct 18, 2013 at 2:22 PM, Ameet Kini <am...@gmail.com> wrote:
>
>> Forgot to add an important point. My sequence files are sorted (they're
>> actually Hadoop map files). Since they're sorted, it makes sense to do a
>> fetch at the partition-level of the inner sequence file.
>>
>> Thanks,
>> Ameet
>>
>>
>> On Fri, Oct 18, 2013 at 5:20 PM, Ameet Kini <am...@gmail.com> wrote:
>>
>>>
>>> I've seen discussions where the suggestion is to do a map-side join, but
>>> haven't seen an example yet, and can certainly use one. I have two sequence
>>> files where the key is unique within each file, so the join is a one-to-one
>>> join, and can hence benefit from a map-side join. However both sequence
>>> files can be large, so reading one of them completely in the driver and
>>> broadcasting it out would be expensive.
>>>
>>> I don't think there is a map-side join implementation in Spark but
>>> earlier suggestions have been to write one using mapPartitions on one of
>>> the operands as the outer loop. If that is the case, how would I fetch the
>>> split corresponding to the keys in the outer's partition. I'd prefer to do
>>> a fetch-per-partition rather than a fetch-per-tuple.
>>>
>>> In any case, some feedback, and preferably, an example of a map-side
>>> join without broadcasting would help.
>>>
>>> Thanks,
>>> Ameet
>>>
>>
>>
>

Re: examples of map-side join of two hadoop sequence files

Posted by Reynold Xin <rx...@apache.org>.
How about the following:

val smallFile = sc.sequenceFile(....).collect()
val largeFile = sc.sequenceFile(...)

val small = sc.broadcast(smallFile)
largeFile.mapPartitions { iter =>
  // build up a hash table for small. called it smallTable
  iter.filter(row => smallTable.contains(row.joinKey)).map { row =>
    join smallTable.get(row.joinKey) with row itself
  }
}




On Fri, Oct 18, 2013 at 2:22 PM, Ameet Kini <am...@gmail.com> wrote:

> Forgot to add an important point. My sequence files are sorted (they're
> actually Hadoop map files). Since they're sorted, it makes sense to do a
> fetch at the partition-level of the inner sequence file.
>
> Thanks,
> Ameet
>
>
> On Fri, Oct 18, 2013 at 5:20 PM, Ameet Kini <am...@gmail.com> wrote:
>
>>
>> I've seen discussions where the suggestion is to do a map-side join, but
>> haven't seen an example yet, and can certainly use one. I have two sequence
>> files where the key is unique within each file, so the join is a one-to-one
>> join, and can hence benefit from a map-side join. However both sequence
>> files can be large, so reading one of them completely in the driver and
>> broadcasting it out would be expensive.
>>
>> I don't think there is a map-side join implementation in Spark but
>> earlier suggestions have been to write one using mapPartitions on one of
>> the operands as the outer loop. If that is the case, how would I fetch the
>> split corresponding to the keys in the outer's partition. I'd prefer to do
>> a fetch-per-partition rather than a fetch-per-tuple.
>>
>> In any case, some feedback, and preferably, an example of a map-side join
>> without broadcasting would help.
>>
>> Thanks,
>> Ameet
>>
>
>

Re: examples of map-side join of two hadoop sequence files

Posted by Ameet Kini <am...@gmail.com>.
Forgot to add an important point. My sequence files are sorted (they're
actually Hadoop map files). Since they're sorted, it makes sense to do a
fetch at the partition-level of the inner sequence file.

Thanks,
Ameet


On Fri, Oct 18, 2013 at 5:20 PM, Ameet Kini <am...@gmail.com> wrote:

>
> I've seen discussions where the suggestion is to do a map-side join, but
> haven't seen an example yet, and can certainly use one. I have two sequence
> files where the key is unique within each file, so the join is a one-to-one
> join, and can hence benefit from a map-side join. However both sequence
> files can be large, so reading one of them completely in the driver and
> broadcasting it out would be expensive.
>
> I don't think there is a map-side join implementation in Spark but earlier
> suggestions have been to write one using mapPartitions on one of the
> operands as the outer loop. If that is the case, how would I fetch the
> split corresponding to the keys in the outer's partition. I'd prefer to do
> a fetch-per-partition rather than a fetch-per-tuple.
>
> In any case, some feedback, and preferably, an example of a map-side join
> without broadcasting would help.
>
> Thanks,
> Ameet
>