You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by weijie tong <to...@gmail.com> on 2017/05/14 03:23:39 UTC

questions about Flink's HashJoin performance

I has a test case to use Flink's MutableHashTable class to do a hash join
on a local machine with 64g memory, 64cores. The test case is one build
table with 14w rows ,one probe table with 320w rows ,the matched result
rows is 12 w.

It takes 2.2 seconds to complete the join.The performance seems bad. I
ensure there's no overflow, the smaller table is the build side. The
MutableObjectIterator is a sequence of Rows. The Row is composed of several
fields which are byte[]. Through my log,I find the open() method takes
1.560 seconds. The probe iterates phase takes 680ms.  And my Jprofiler's
profile shows the MutableObjectIterator's next() method call is the
hotspot.


I want to know how to tune this scenario. I find Drill's HashJoin is batch
model. Its build side's input is a RecordBatch which holds batch of rows
and memory size is approach to L2 cache. Through this strategy it will gain
less method calls (that means call to next() ) and much efficient to cpu
calculation.  I also find SQL server's paper noticed the batch model's
performance gains (
https://www.microsoft.com/en-us/research/wp-content/uploads/2013/06/Apollo3-Sigmod-2013-final.pdf)
 .   I guess the performance's down is due to the single row iterate model.


Hope someone to correct my opinion. Also maybe I have a wrong use  of the
MutableHashTable. wait for someone to give an advice.

Re: questions about Flink's HashJoin performance

Posted by Fabian Hueske <fh...@gmail.com>.
Hi,

I'm not aware of a performance report for this feature. I don't think it is
well known or used a lot.
The classes to check out for prepartitioned / presorted data are
SplitDataProperties [1], DataSource [2], and as an example
PropertyDataSourceTest [3].

[1]
https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/io/SplitDataProperties.java
[2]
https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSource.java
[3]
https://github.com/apache/flink/blob/master/flink-optimizer/src/test/java/org/apache/flink/optimizer/PropertyDataSourceTest.java

Best, Fabian
<https://github.com/apache/flink/commit/f0a28bf5345084a0a43df16021e60078e322e087#diff-c7c697fdb164023d6a737f3dcd23f2c0>

2017-05-18 13:54 GMT+02:00 weijie tong <to...@gmail.com>:

> thanks for tip @Stephan.
>
> To [1] , there's a description about  "I’ve got sooo much data to join,
> do I really need to ship it?" . How to configure Flink to touch that
> target? Is there a performance report ?
>
> [1] : https://flink.apache.org/news/2015/03/13/peeking-into-
> Apache-Flinks-Engine-Room.html
>
> On Wed, May 17, 2017 at 1:32 AM, Stephan Ewen <se...@apache.org> wrote:
>
>> Hi!
>>
>> Be aware that the "Row" and "Record" types are not very high performance
>> data types. You might be measuring the data type overhead, rather than the
>> hash table performance. Also, the build measurements include the data
>> generation, which influences the results.
>>
>> If you want to purely benchmark the HashTable performance, try using
>> something like "Tuple2<Long, Long>" or so (or write your own custom
>> TypeSerializer / TypeComparator).
>>
>> Stephan
>>
>>
>> On Tue, May 16, 2017 at 11:23 AM, weijie tong <to...@gmail.com>
>> wrote:
>>
>>> Thanks for all your enthusiastic response. Yes, My target was to try to
>>> find the best performance in memory. I got that.
>>>
>>> On Tue, 16 May 2017 at 4:10 PM Fabian Hueske <fh...@gmail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> Flink's HashJoin implementation was designed to gracefully handle
>>>> inputs that exceed the main memory.
>>>> It is not explicitly optimized for in-memory processing and does not
>>>> play fancy tricks like optimizing cache accesses or batching.
>>>> I assume your benchmark is about in-memory joins only. This was not the
>>>> main design goal when the join was implemented but robustness.
>>>> Since most of the development of Flink focuses on streaming
>>>> applications at the moment, the join implementation has barely been touched
>>>> in recent years (except for minor extensions and bugfixes).
>>>>
>>>> Regarding your tests, Tuple should give better performance than Row
>>>> because Row is null-sensitive and serialized a null-mask.
>>>> There is also a blog post about Flink's join performance [1] which is
>>>> already a bit dusty but as I said, the algorithm hasn't change much since
>>>> then.
>>>>
>>>> Best, Fabian
>>>>
>>>> [1] https://flink.apache.org/news/2015/03/13/peeking-into-Apache
>>>> -Flinks-Engine-Room.html
>>>>
>>>>
>>>> 2017-05-15 16:26 GMT+02:00 weijie tong <to...@gmail.com>:
>>>>
>>>>> The Flink version is 1.2.0
>>>>>
>>>>> On Mon, May 15, 2017 at 10:24 PM, weijie tong <tongweijie178@gmail.com
>>>>> > wrote:
>>>>>
>>>>>> @Till thanks for your reply.
>>>>>>
>>>>>> My code is similar to   HashTableITCase.testInMemory
>>>>>> MutableHashTable()   . It just use the MutableHashTable class ,
>>>>>> there's  no other Flink's configuration.  The main code body is:
>>>>>>
>>>>>> this.recordBuildSideAccessor = RecordSerializer.get();
>>>>>>> this.recordProbeSideAccessor = RecordSerializer.get();
>>>>>>> final int[] buildKeyPos = new int[]{buildSideJoinIndex};
>>>>>>> final int[] probeKeyPos = new int[]{probeSideJoinIndex};
>>>>>>> final Class<? extends Value>[] keyType = (Class<? extends Value>[]) new Class[]{BytesValue.class};
>>>>>>> this.recordBuildSideComparator = new RecordComparator(buildKeyPos, keyType);
>>>>>>> this.recordProbeSideComparator = new RecordComparator(probeKeyPos, keyType);
>>>>>>> this.pactRecordComparator = new HashJoinVectorJointGroupIterator.RecordPairComparator(buildSideJoinIndex);
>>>>>>> Sequence<Record> buildSideRecordsSeq = makeSequenceRecordOfSameSideSegments(buildSideSegs, localJoinQuery);
>>>>>>> Sequence<Record> probeSideRecordsSeq = makeSequenceRecordOfSameSideSegments(probeSideSegs, localJoinQuery);
>>>>>>> List<MemorySegment> memorySegments;
>>>>>>> int pageSize = hashTableMemoryManager.getTotalNumPages();
>>>>>>> try {
>>>>>>>   memorySegments = this.hashTableMemoryManager.allocatePages(MEM_OWNER, pageSize);
>>>>>>> }
>>>>>>> catch (MemoryAllocationException e) {
>>>>>>>   LOGGER.error("could not allocate " + pageSize + " pages memory for HashJoin", e);
>>>>>>>   Throwables.propagate(e);
>>>>>>>   return;
>>>>>>> }
>>>>>>> try {
>>>>>>>   Stopwatch stopwatch = Stopwatch.createStarted();
>>>>>>>   UniformRecordGenerator buildInput = new UniformRecordGenerator(buildSideRecordsSeq);
>>>>>>>   UniformRecordGenerator probeInput = new UniformRecordGenerator(probeSideRecordsSeq);
>>>>>>>   join = new MutableHashTable<Record, Record>(
>>>>>>>       recordBuildSideAccessor,
>>>>>>>       recordProbeSideAccessor,
>>>>>>>       recordBuildSideComparator,
>>>>>>>       recordProbeSideComparator,
>>>>>>>       pactRecordComparator,
>>>>>>>       memorySegments,
>>>>>>>       ioManager
>>>>>>>   );
>>>>>>>   join.open(buildInput,probeInput);
>>>>>>>
>>>>>>>   LOGGER.info("construct hash table elapsed:" + stopwatch.elapsed(TimeUnit.MILLISECONDS) + "ms");
>>>>>>>
>>>>>>>
>>>>>> The BytesValue type is self defined one which holds byte[] , but just
>>>>>> like the original StringValue, also has the same serDe performance.
>>>>>>
>>>>>>
>>>>>> while (join.nextRecord()) {
>>>>>>   Record currentProbeRecord = join.getCurrentProbeRecord();
>>>>>>   MutableObjectIterator<Record> buildSideIterator = join.getBuildSideIterator();
>>>>>>   while (buildSideIterator.next(reusedBuildSideRow) != null) {
>>>>>>     materializeRecord2OutVector(reusedBuildSideRow, buildSideIndex2Value, buildSideIndex2Vector, rowNum);
>>>>>>     materializeRecord2OutVector(currentProbeRecord, probeSideIndex2Value, probeSideIndex2Vector, rowNum);
>>>>>>     rowNum++;
>>>>>>   }}
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> I have tried both the Record ,Row class as the type of records
>>>>>> without any better improved performance . I also tried batched the input
>>>>>> records. That means the  buildInput or probeInput variables of the
>>>>>> first code block which iterate one Record a time from another batched
>>>>>> Records . Batched records's content stay in memory in Drill's ValueVector
>>>>>> format. Once a record is need to participate in the build or probe phase
>>>>>> from a iterate.next() call,
>>>>>> it will be fetched from the batched in memory ValueVector content.
>>>>>> But no performance gains.
>>>>>>
>>>>>>
>>>>>> The top hotspot profile from Jprofiler is below:
>>>>>> >
>>>>>> Hot spot,"Self time (microseconds)","Average Time","Invocations"
>>>>>> org.apache.flink.types.Record.serialize,1014127,"n/a","n/a"
>>>>>> org.apache.flink.types.Record.deserialize,60684,"n/a","n/a"
>>>>>> org.apache.flink.types.Record.copyTo,83007,"n/a","n/a"
>>>>>> org.apache.flink.runtime.operators.hash.MutableHashTable.ope
>>>>>> n,55238,"n/a","n/a"
>>>>>> org.apache.flink.runtime.operators.hash.MutableHashTable.nex
>>>>>> tRecord,10955,"n/a","n/a"
>>>>>> org.apache.flink.runtime.memory.MemoryManager.release,33484,
>>>>>> "n/a","n/a"
>>>>>> org.apache.flink.runtime.memory.MemoryManager.allocatePages,
>>>>>> 104259,"n/a","n/a"
>>>>>>
>>>>>>
>>>>>> My log show that hashjoin.open()  method costs too much time.
>>>>>> >
>>>>>> construct hash table elapsed:1885ms
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Mon, May 15, 2017 at 6:20 PM, Till Rohrmann <tr...@apache.org>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Weijie,
>>>>>>>
>>>>>>> it might be the case that batching the processing of multiple rows
>>>>>>> can give you an improved performance compared to single row processing.
>>>>>>>
>>>>>>> Maybe you could share the exact benchmark base line results and the
>>>>>>> code you use to test Flink's MutableHashTable with us. Also the Flink
>>>>>>> configuration and how you run it would be of interest. That way we might be
>>>>>>> able to see if we can tune Flink a bit more.
>>>>>>>
>>>>>>> Cheers,
>>>>>>> Till
>>>>>>>
>>>>>>> On Sun, May 14, 2017 at 5:23 AM, weijie tong <
>>>>>>> tongweijie178@gmail.com> wrote:
>>>>>>>
>>>>>>>> I has a test case to use Flink's MutableHashTable class to do a
>>>>>>>> hash join on a local machine with 64g memory, 64cores. The test case is one
>>>>>>>> build table with 14w rows ,one probe table with 320w rows ,the matched
>>>>>>>> result rows is 12 w.
>>>>>>>>
>>>>>>>> It takes 2.2 seconds to complete the join.The performance seems
>>>>>>>> bad. I ensure there's no overflow, the smaller table is the build side. The
>>>>>>>> MutableObjectIterator is a sequence of Rows. The Row is composed of several
>>>>>>>> fields which are byte[]. Through my log,I find the open() method takes
>>>>>>>> 1.560 seconds. The probe iterates phase takes 680ms.  And my Jprofiler's
>>>>>>>> profile shows the MutableObjectIterator's next() method call is the
>>>>>>>> hotspot.
>>>>>>>>
>>>>>>>>
>>>>>>>> I want to know how to tune this scenario. I find Drill's HashJoin
>>>>>>>> is batch model. Its build side's input is a RecordBatch which holds batch
>>>>>>>> of rows and memory size is approach to L2 cache. Through this strategy it
>>>>>>>> will gain less method calls (that means call to next() ) and much efficient
>>>>>>>> to cpu calculation.  I also find SQL server's paper noticed the batch
>>>>>>>> model's performance gains (https://www.microsoft.com/en-
>>>>>>>> us/research/wp-content/uploads/2013/06/Apollo3-Sigmod-2013-f
>>>>>>>> inal.pdf)  .   I guess the performance's down is due to the single
>>>>>>>> row iterate model.
>>>>>>>>
>>>>>>>>
>>>>>>>> Hope someone to correct my opinion. Also maybe I have a wrong use
>>>>>>>>  of the MutableHashTable. wait for someone to give an advice.
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>
>

Re: questions about Flink's HashJoin performance

Posted by Fabian Hueske <fh...@gmail.com>.
Hi,

I'm not aware of a performance report for this feature. I don't think it is
well known or used a lot.
The classes to check out for prepartitioned / presorted data are
SplitDataProperties [1], DataSource [2], and as an example
PropertyDataSourceTest [3].

[1]
https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/io/SplitDataProperties.java
[2]
https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSource.java
[3]
https://github.com/apache/flink/blob/master/flink-optimizer/src/test/java/org/apache/flink/optimizer/PropertyDataSourceTest.java

Best, Fabian
<https://github.com/apache/flink/commit/f0a28bf5345084a0a43df16021e60078e322e087#diff-c7c697fdb164023d6a737f3dcd23f2c0>

2017-05-18 13:54 GMT+02:00 weijie tong <to...@gmail.com>:

> thanks for tip @Stephan.
>
> To [1] , there's a description about  "I’ve got sooo much data to join,
> do I really need to ship it?" . How to configure Flink to touch that
> target? Is there a performance report ?
>
> [1] : https://flink.apache.org/news/2015/03/13/peeking-into-
> Apache-Flinks-Engine-Room.html
>
> On Wed, May 17, 2017 at 1:32 AM, Stephan Ewen <se...@apache.org> wrote:
>
>> Hi!
>>
>> Be aware that the "Row" and "Record" types are not very high performance
>> data types. You might be measuring the data type overhead, rather than the
>> hash table performance. Also, the build measurements include the data
>> generation, which influences the results.
>>
>> If you want to purely benchmark the HashTable performance, try using
>> something like "Tuple2<Long, Long>" or so (or write your own custom
>> TypeSerializer / TypeComparator).
>>
>> Stephan
>>
>>
>> On Tue, May 16, 2017 at 11:23 AM, weijie tong <to...@gmail.com>
>> wrote:
>>
>>> Thanks for all your enthusiastic response. Yes, My target was to try to
>>> find the best performance in memory. I got that.
>>>
>>> On Tue, 16 May 2017 at 4:10 PM Fabian Hueske <fh...@gmail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> Flink's HashJoin implementation was designed to gracefully handle
>>>> inputs that exceed the main memory.
>>>> It is not explicitly optimized for in-memory processing and does not
>>>> play fancy tricks like optimizing cache accesses or batching.
>>>> I assume your benchmark is about in-memory joins only. This was not the
>>>> main design goal when the join was implemented but robustness.
>>>> Since most of the development of Flink focuses on streaming
>>>> applications at the moment, the join implementation has barely been touched
>>>> in recent years (except for minor extensions and bugfixes).
>>>>
>>>> Regarding your tests, Tuple should give better performance than Row
>>>> because Row is null-sensitive and serialized a null-mask.
>>>> There is also a blog post about Flink's join performance [1] which is
>>>> already a bit dusty but as I said, the algorithm hasn't change much since
>>>> then.
>>>>
>>>> Best, Fabian
>>>>
>>>> [1] https://flink.apache.org/news/2015/03/13/peeking-into-Apache
>>>> -Flinks-Engine-Room.html
>>>>
>>>>
>>>> 2017-05-15 16:26 GMT+02:00 weijie tong <to...@gmail.com>:
>>>>
>>>>> The Flink version is 1.2.0
>>>>>
>>>>> On Mon, May 15, 2017 at 10:24 PM, weijie tong <tongweijie178@gmail.com
>>>>> > wrote:
>>>>>
>>>>>> @Till thanks for your reply.
>>>>>>
>>>>>> My code is similar to   HashTableITCase.testInMemory
>>>>>> MutableHashTable()   . It just use the MutableHashTable class ,
>>>>>> there's  no other Flink's configuration.  The main code body is:
>>>>>>
>>>>>> this.recordBuildSideAccessor = RecordSerializer.get();
>>>>>>> this.recordProbeSideAccessor = RecordSerializer.get();
>>>>>>> final int[] buildKeyPos = new int[]{buildSideJoinIndex};
>>>>>>> final int[] probeKeyPos = new int[]{probeSideJoinIndex};
>>>>>>> final Class<? extends Value>[] keyType = (Class<? extends Value>[]) new Class[]{BytesValue.class};
>>>>>>> this.recordBuildSideComparator = new RecordComparator(buildKeyPos, keyType);
>>>>>>> this.recordProbeSideComparator = new RecordComparator(probeKeyPos, keyType);
>>>>>>> this.pactRecordComparator = new HashJoinVectorJointGroupIterator.RecordPairComparator(buildSideJoinIndex);
>>>>>>> Sequence<Record> buildSideRecordsSeq = makeSequenceRecordOfSameSideSegments(buildSideSegs, localJoinQuery);
>>>>>>> Sequence<Record> probeSideRecordsSeq = makeSequenceRecordOfSameSideSegments(probeSideSegs, localJoinQuery);
>>>>>>> List<MemorySegment> memorySegments;
>>>>>>> int pageSize = hashTableMemoryManager.getTotalNumPages();
>>>>>>> try {
>>>>>>>   memorySegments = this.hashTableMemoryManager.allocatePages(MEM_OWNER, pageSize);
>>>>>>> }
>>>>>>> catch (MemoryAllocationException e) {
>>>>>>>   LOGGER.error("could not allocate " + pageSize + " pages memory for HashJoin", e);
>>>>>>>   Throwables.propagate(e);
>>>>>>>   return;
>>>>>>> }
>>>>>>> try {
>>>>>>>   Stopwatch stopwatch = Stopwatch.createStarted();
>>>>>>>   UniformRecordGenerator buildInput = new UniformRecordGenerator(buildSideRecordsSeq);
>>>>>>>   UniformRecordGenerator probeInput = new UniformRecordGenerator(probeSideRecordsSeq);
>>>>>>>   join = new MutableHashTable<Record, Record>(
>>>>>>>       recordBuildSideAccessor,
>>>>>>>       recordProbeSideAccessor,
>>>>>>>       recordBuildSideComparator,
>>>>>>>       recordProbeSideComparator,
>>>>>>>       pactRecordComparator,
>>>>>>>       memorySegments,
>>>>>>>       ioManager
>>>>>>>   );
>>>>>>>   join.open(buildInput,probeInput);
>>>>>>>
>>>>>>>   LOGGER.info("construct hash table elapsed:" + stopwatch.elapsed(TimeUnit.MILLISECONDS) + "ms");
>>>>>>>
>>>>>>>
>>>>>> The BytesValue type is self defined one which holds byte[] , but just
>>>>>> like the original StringValue, also has the same serDe performance.
>>>>>>
>>>>>>
>>>>>> while (join.nextRecord()) {
>>>>>>   Record currentProbeRecord = join.getCurrentProbeRecord();
>>>>>>   MutableObjectIterator<Record> buildSideIterator = join.getBuildSideIterator();
>>>>>>   while (buildSideIterator.next(reusedBuildSideRow) != null) {
>>>>>>     materializeRecord2OutVector(reusedBuildSideRow, buildSideIndex2Value, buildSideIndex2Vector, rowNum);
>>>>>>     materializeRecord2OutVector(currentProbeRecord, probeSideIndex2Value, probeSideIndex2Vector, rowNum);
>>>>>>     rowNum++;
>>>>>>   }}
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> I have tried both the Record ,Row class as the type of records
>>>>>> without any better improved performance . I also tried batched the input
>>>>>> records. That means the  buildInput or probeInput variables of the
>>>>>> first code block which iterate one Record a time from another batched
>>>>>> Records . Batched records's content stay in memory in Drill's ValueVector
>>>>>> format. Once a record is need to participate in the build or probe phase
>>>>>> from a iterate.next() call,
>>>>>> it will be fetched from the batched in memory ValueVector content.
>>>>>> But no performance gains.
>>>>>>
>>>>>>
>>>>>> The top hotspot profile from Jprofiler is below:
>>>>>> >
>>>>>> Hot spot,"Self time (microseconds)","Average Time","Invocations"
>>>>>> org.apache.flink.types.Record.serialize,1014127,"n/a","n/a"
>>>>>> org.apache.flink.types.Record.deserialize,60684,"n/a","n/a"
>>>>>> org.apache.flink.types.Record.copyTo,83007,"n/a","n/a"
>>>>>> org.apache.flink.runtime.operators.hash.MutableHashTable.ope
>>>>>> n,55238,"n/a","n/a"
>>>>>> org.apache.flink.runtime.operators.hash.MutableHashTable.nex
>>>>>> tRecord,10955,"n/a","n/a"
>>>>>> org.apache.flink.runtime.memory.MemoryManager.release,33484,
>>>>>> "n/a","n/a"
>>>>>> org.apache.flink.runtime.memory.MemoryManager.allocatePages,
>>>>>> 104259,"n/a","n/a"
>>>>>>
>>>>>>
>>>>>> My log show that hashjoin.open()  method costs too much time.
>>>>>> >
>>>>>> construct hash table elapsed:1885ms
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Mon, May 15, 2017 at 6:20 PM, Till Rohrmann <tr...@apache.org>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Weijie,
>>>>>>>
>>>>>>> it might be the case that batching the processing of multiple rows
>>>>>>> can give you an improved performance compared to single row processing.
>>>>>>>
>>>>>>> Maybe you could share the exact benchmark base line results and the
>>>>>>> code you use to test Flink's MutableHashTable with us. Also the Flink
>>>>>>> configuration and how you run it would be of interest. That way we might be
>>>>>>> able to see if we can tune Flink a bit more.
>>>>>>>
>>>>>>> Cheers,
>>>>>>> Till
>>>>>>>
>>>>>>> On Sun, May 14, 2017 at 5:23 AM, weijie tong <
>>>>>>> tongweijie178@gmail.com> wrote:
>>>>>>>
>>>>>>>> I has a test case to use Flink's MutableHashTable class to do a
>>>>>>>> hash join on a local machine with 64g memory, 64cores. The test case is one
>>>>>>>> build table with 14w rows ,one probe table with 320w rows ,the matched
>>>>>>>> result rows is 12 w.
>>>>>>>>
>>>>>>>> It takes 2.2 seconds to complete the join.The performance seems
>>>>>>>> bad. I ensure there's no overflow, the smaller table is the build side. The
>>>>>>>> MutableObjectIterator is a sequence of Rows. The Row is composed of several
>>>>>>>> fields which are byte[]. Through my log,I find the open() method takes
>>>>>>>> 1.560 seconds. The probe iterates phase takes 680ms.  And my Jprofiler's
>>>>>>>> profile shows the MutableObjectIterator's next() method call is the
>>>>>>>> hotspot.
>>>>>>>>
>>>>>>>>
>>>>>>>> I want to know how to tune this scenario. I find Drill's HashJoin
>>>>>>>> is batch model. Its build side's input is a RecordBatch which holds batch
>>>>>>>> of rows and memory size is approach to L2 cache. Through this strategy it
>>>>>>>> will gain less method calls (that means call to next() ) and much efficient
>>>>>>>> to cpu calculation.  I also find SQL server's paper noticed the batch
>>>>>>>> model's performance gains (https://www.microsoft.com/en-
>>>>>>>> us/research/wp-content/uploads/2013/06/Apollo3-Sigmod-2013-f
>>>>>>>> inal.pdf)  .   I guess the performance's down is due to the single
>>>>>>>> row iterate model.
>>>>>>>>
>>>>>>>>
>>>>>>>> Hope someone to correct my opinion. Also maybe I have a wrong use
>>>>>>>>  of the MutableHashTable. wait for someone to give an advice.
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>
>

Re: questions about Flink's HashJoin performance

Posted by weijie tong <to...@gmail.com>.
thanks for tip @Stephan.

To [1] , there's a description about  "I’ve got sooo much data to join, do
I really need to ship it?" . How to configure Flink to touch that target?
Is there a performance report ?

[1] :
https://flink.apache.org/news/2015/03/13/peeking-into-Apache-Flinks-Engine-Room.html

On Wed, May 17, 2017 at 1:32 AM, Stephan Ewen <se...@apache.org> wrote:

> Hi!
>
> Be aware that the "Row" and "Record" types are not very high performance
> data types. You might be measuring the data type overhead, rather than the
> hash table performance. Also, the build measurements include the data
> generation, which influences the results.
>
> If you want to purely benchmark the HashTable performance, try using
> something like "Tuple2<Long, Long>" or so (or write your own custom
> TypeSerializer / TypeComparator).
>
> Stephan
>
>
> On Tue, May 16, 2017 at 11:23 AM, weijie tong <to...@gmail.com>
> wrote:
>
>> Thanks for all your enthusiastic response. Yes, My target was to try to
>> find the best performance in memory. I got that.
>>
>> On Tue, 16 May 2017 at 4:10 PM Fabian Hueske <fh...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> Flink's HashJoin implementation was designed to gracefully handle inputs
>>> that exceed the main memory.
>>> It is not explicitly optimized for in-memory processing and does not
>>> play fancy tricks like optimizing cache accesses or batching.
>>> I assume your benchmark is about in-memory joins only. This was not the
>>> main design goal when the join was implemented but robustness.
>>> Since most of the development of Flink focuses on streaming applications
>>> at the moment, the join implementation has barely been touched in recent
>>> years (except for minor extensions and bugfixes).
>>>
>>> Regarding your tests, Tuple should give better performance than Row
>>> because Row is null-sensitive and serialized a null-mask.
>>> There is also a blog post about Flink's join performance [1] which is
>>> already a bit dusty but as I said, the algorithm hasn't change much since
>>> then.
>>>
>>> Best, Fabian
>>>
>>> [1] https://flink.apache.org/news/2015/03/13/peeking-into-Apache
>>> -Flinks-Engine-Room.html
>>>
>>>
>>> 2017-05-15 16:26 GMT+02:00 weijie tong <to...@gmail.com>:
>>>
>>>> The Flink version is 1.2.0
>>>>
>>>> On Mon, May 15, 2017 at 10:24 PM, weijie tong <to...@gmail.com>
>>>> wrote:
>>>>
>>>>> @Till thanks for your reply.
>>>>>
>>>>> My code is similar to   HashTableITCase.testInMemory
>>>>> MutableHashTable()   . It just use the MutableHashTable class ,
>>>>> there's  no other Flink's configuration.  The main code body is:
>>>>>
>>>>> this.recordBuildSideAccessor = RecordSerializer.get();
>>>>>> this.recordProbeSideAccessor = RecordSerializer.get();
>>>>>> final int[] buildKeyPos = new int[]{buildSideJoinIndex};
>>>>>> final int[] probeKeyPos = new int[]{probeSideJoinIndex};
>>>>>> final Class<? extends Value>[] keyType = (Class<? extends Value>[]) new Class[]{BytesValue.class};
>>>>>> this.recordBuildSideComparator = new RecordComparator(buildKeyPos, keyType);
>>>>>> this.recordProbeSideComparator = new RecordComparator(probeKeyPos, keyType);
>>>>>> this.pactRecordComparator = new HashJoinVectorJointGroupIterator.RecordPairComparator(buildSideJoinIndex);
>>>>>> Sequence<Record> buildSideRecordsSeq = makeSequenceRecordOfSameSideSegments(buildSideSegs, localJoinQuery);
>>>>>> Sequence<Record> probeSideRecordsSeq = makeSequenceRecordOfSameSideSegments(probeSideSegs, localJoinQuery);
>>>>>> List<MemorySegment> memorySegments;
>>>>>> int pageSize = hashTableMemoryManager.getTotalNumPages();
>>>>>> try {
>>>>>>   memorySegments = this.hashTableMemoryManager.allocatePages(MEM_OWNER, pageSize);
>>>>>> }
>>>>>> catch (MemoryAllocationException e) {
>>>>>>   LOGGER.error("could not allocate " + pageSize + " pages memory for HashJoin", e);
>>>>>>   Throwables.propagate(e);
>>>>>>   return;
>>>>>> }
>>>>>> try {
>>>>>>   Stopwatch stopwatch = Stopwatch.createStarted();
>>>>>>   UniformRecordGenerator buildInput = new UniformRecordGenerator(buildSideRecordsSeq);
>>>>>>   UniformRecordGenerator probeInput = new UniformRecordGenerator(probeSideRecordsSeq);
>>>>>>   join = new MutableHashTable<Record, Record>(
>>>>>>       recordBuildSideAccessor,
>>>>>>       recordProbeSideAccessor,
>>>>>>       recordBuildSideComparator,
>>>>>>       recordProbeSideComparator,
>>>>>>       pactRecordComparator,
>>>>>>       memorySegments,
>>>>>>       ioManager
>>>>>>   );
>>>>>>   join.open(buildInput,probeInput);
>>>>>>
>>>>>>   LOGGER.info("construct hash table elapsed:" + stopwatch.elapsed(TimeUnit.MILLISECONDS) + "ms");
>>>>>>
>>>>>>
>>>>> The BytesValue type is self defined one which holds byte[] , but just
>>>>> like the original StringValue, also has the same serDe performance.
>>>>>
>>>>>
>>>>> while (join.nextRecord()) {
>>>>>   Record currentProbeRecord = join.getCurrentProbeRecord();
>>>>>   MutableObjectIterator<Record> buildSideIterator = join.getBuildSideIterator();
>>>>>   while (buildSideIterator.next(reusedBuildSideRow) != null) {
>>>>>     materializeRecord2OutVector(reusedBuildSideRow, buildSideIndex2Value, buildSideIndex2Vector, rowNum);
>>>>>     materializeRecord2OutVector(currentProbeRecord, probeSideIndex2Value, probeSideIndex2Vector, rowNum);
>>>>>     rowNum++;
>>>>>   }}
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> I have tried both the Record ,Row class as the type of records without
>>>>> any better improved performance . I also tried batched the input records.
>>>>> That means the  buildInput or probeInput variables of the first code
>>>>> block which iterate one Record a time from another batched Records .
>>>>> Batched records's content stay in memory in Drill's ValueVector format.
>>>>> Once a record is need to participate in the build or probe phase from a
>>>>> iterate.next() call,
>>>>> it will be fetched from the batched in memory ValueVector content. But
>>>>> no performance gains.
>>>>>
>>>>>
>>>>> The top hotspot profile from Jprofiler is below:
>>>>> >
>>>>> Hot spot,"Self time (microseconds)","Average Time","Invocations"
>>>>> org.apache.flink.types.Record.serialize,1014127,"n/a","n/a"
>>>>> org.apache.flink.types.Record.deserialize,60684,"n/a","n/a"
>>>>> org.apache.flink.types.Record.copyTo,83007,"n/a","n/a"
>>>>> org.apache.flink.runtime.operators.hash.MutableHashTable.
>>>>> open,55238,"n/a","n/a"
>>>>> org.apache.flink.runtime.operators.hash.MutableHashTable.
>>>>> nextRecord,10955,"n/a","n/a"
>>>>> org.apache.flink.runtime.memory.MemoryManager.release,33484,
>>>>> "n/a","n/a"
>>>>> org.apache.flink.runtime.memory.MemoryManager.allocatePages,
>>>>> 104259,"n/a","n/a"
>>>>>
>>>>>
>>>>> My log show that hashjoin.open()  method costs too much time.
>>>>> >
>>>>> construct hash table elapsed:1885ms
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Mon, May 15, 2017 at 6:20 PM, Till Rohrmann <tr...@apache.org>
>>>>> wrote:
>>>>>
>>>>>> Hi Weijie,
>>>>>>
>>>>>> it might be the case that batching the processing of multiple rows
>>>>>> can give you an improved performance compared to single row processing.
>>>>>>
>>>>>> Maybe you could share the exact benchmark base line results and the
>>>>>> code you use to test Flink's MutableHashTable with us. Also the Flink
>>>>>> configuration and how you run it would be of interest. That way we might be
>>>>>> able to see if we can tune Flink a bit more.
>>>>>>
>>>>>> Cheers,
>>>>>> Till
>>>>>>
>>>>>> On Sun, May 14, 2017 at 5:23 AM, weijie tong <tongweijie178@gmail.com
>>>>>> > wrote:
>>>>>>
>>>>>>> I has a test case to use Flink's MutableHashTable class to do a hash
>>>>>>> join on a local machine with 64g memory, 64cores. The test case is one
>>>>>>> build table with 14w rows ,one probe table with 320w rows ,the matched
>>>>>>> result rows is 12 w.
>>>>>>>
>>>>>>> It takes 2.2 seconds to complete the join.The performance seems bad.
>>>>>>> I ensure there's no overflow, the smaller table is the build side. The
>>>>>>> MutableObjectIterator is a sequence of Rows. The Row is composed of several
>>>>>>> fields which are byte[]. Through my log,I find the open() method takes
>>>>>>> 1.560 seconds. The probe iterates phase takes 680ms.  And my Jprofiler's
>>>>>>> profile shows the MutableObjectIterator's next() method call is the
>>>>>>> hotspot.
>>>>>>>
>>>>>>>
>>>>>>> I want to know how to tune this scenario. I find Drill's HashJoin is
>>>>>>> batch model. Its build side's input is a RecordBatch which holds batch of
>>>>>>> rows and memory size is approach to L2 cache. Through this strategy it will
>>>>>>> gain less method calls (that means call to next() ) and much efficient to
>>>>>>> cpu calculation.  I also find SQL server's paper noticed the batch model's
>>>>>>> performance gains (https://www.microsoft.com/en-
>>>>>>> us/research/wp-content/uploads/2013/06/Apollo3-Sigmod-2013-final.pdf)
>>>>>>>  .   I guess the performance's down is due to the single row iterate model.
>>>>>>>
>>>>>>>
>>>>>>> Hope someone to correct my opinion. Also maybe I have a wrong use
>>>>>>>  of the MutableHashTable. wait for someone to give an advice.
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>

Re: questions about Flink's HashJoin performance

Posted by weijie tong <to...@gmail.com>.
thanks for tip @Stephan.

To [1] , there's a description about  "I’ve got sooo much data to join, do
I really need to ship it?" . How to configure Flink to touch that target?
Is there a performance report ?

[1] :
https://flink.apache.org/news/2015/03/13/peeking-into-Apache-Flinks-Engine-Room.html

On Wed, May 17, 2017 at 1:32 AM, Stephan Ewen <se...@apache.org> wrote:

> Hi!
>
> Be aware that the "Row" and "Record" types are not very high performance
> data types. You might be measuring the data type overhead, rather than the
> hash table performance. Also, the build measurements include the data
> generation, which influences the results.
>
> If you want to purely benchmark the HashTable performance, try using
> something like "Tuple2<Long, Long>" or so (or write your own custom
> TypeSerializer / TypeComparator).
>
> Stephan
>
>
> On Tue, May 16, 2017 at 11:23 AM, weijie tong <to...@gmail.com>
> wrote:
>
>> Thanks for all your enthusiastic response. Yes, My target was to try to
>> find the best performance in memory. I got that.
>>
>> On Tue, 16 May 2017 at 4:10 PM Fabian Hueske <fh...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> Flink's HashJoin implementation was designed to gracefully handle inputs
>>> that exceed the main memory.
>>> It is not explicitly optimized for in-memory processing and does not
>>> play fancy tricks like optimizing cache accesses or batching.
>>> I assume your benchmark is about in-memory joins only. This was not the
>>> main design goal when the join was implemented but robustness.
>>> Since most of the development of Flink focuses on streaming applications
>>> at the moment, the join implementation has barely been touched in recent
>>> years (except for minor extensions and bugfixes).
>>>
>>> Regarding your tests, Tuple should give better performance than Row
>>> because Row is null-sensitive and serialized a null-mask.
>>> There is also a blog post about Flink's join performance [1] which is
>>> already a bit dusty but as I said, the algorithm hasn't change much since
>>> then.
>>>
>>> Best, Fabian
>>>
>>> [1] https://flink.apache.org/news/2015/03/13/peeking-into-Apache
>>> -Flinks-Engine-Room.html
>>>
>>>
>>> 2017-05-15 16:26 GMT+02:00 weijie tong <to...@gmail.com>:
>>>
>>>> The Flink version is 1.2.0
>>>>
>>>> On Mon, May 15, 2017 at 10:24 PM, weijie tong <to...@gmail.com>
>>>> wrote:
>>>>
>>>>> @Till thanks for your reply.
>>>>>
>>>>> My code is similar to   HashTableITCase.testInMemory
>>>>> MutableHashTable()   . It just use the MutableHashTable class ,
>>>>> there's  no other Flink's configuration.  The main code body is:
>>>>>
>>>>> this.recordBuildSideAccessor = RecordSerializer.get();
>>>>>> this.recordProbeSideAccessor = RecordSerializer.get();
>>>>>> final int[] buildKeyPos = new int[]{buildSideJoinIndex};
>>>>>> final int[] probeKeyPos = new int[]{probeSideJoinIndex};
>>>>>> final Class<? extends Value>[] keyType = (Class<? extends Value>[]) new Class[]{BytesValue.class};
>>>>>> this.recordBuildSideComparator = new RecordComparator(buildKeyPos, keyType);
>>>>>> this.recordProbeSideComparator = new RecordComparator(probeKeyPos, keyType);
>>>>>> this.pactRecordComparator = new HashJoinVectorJointGroupIterator.RecordPairComparator(buildSideJoinIndex);
>>>>>> Sequence<Record> buildSideRecordsSeq = makeSequenceRecordOfSameSideSegments(buildSideSegs, localJoinQuery);
>>>>>> Sequence<Record> probeSideRecordsSeq = makeSequenceRecordOfSameSideSegments(probeSideSegs, localJoinQuery);
>>>>>> List<MemorySegment> memorySegments;
>>>>>> int pageSize = hashTableMemoryManager.getTotalNumPages();
>>>>>> try {
>>>>>>   memorySegments = this.hashTableMemoryManager.allocatePages(MEM_OWNER, pageSize);
>>>>>> }
>>>>>> catch (MemoryAllocationException e) {
>>>>>>   LOGGER.error("could not allocate " + pageSize + " pages memory for HashJoin", e);
>>>>>>   Throwables.propagate(e);
>>>>>>   return;
>>>>>> }
>>>>>> try {
>>>>>>   Stopwatch stopwatch = Stopwatch.createStarted();
>>>>>>   UniformRecordGenerator buildInput = new UniformRecordGenerator(buildSideRecordsSeq);
>>>>>>   UniformRecordGenerator probeInput = new UniformRecordGenerator(probeSideRecordsSeq);
>>>>>>   join = new MutableHashTable<Record, Record>(
>>>>>>       recordBuildSideAccessor,
>>>>>>       recordProbeSideAccessor,
>>>>>>       recordBuildSideComparator,
>>>>>>       recordProbeSideComparator,
>>>>>>       pactRecordComparator,
>>>>>>       memorySegments,
>>>>>>       ioManager
>>>>>>   );
>>>>>>   join.open(buildInput,probeInput);
>>>>>>
>>>>>>   LOGGER.info("construct hash table elapsed:" + stopwatch.elapsed(TimeUnit.MILLISECONDS) + "ms");
>>>>>>
>>>>>>
>>>>> The BytesValue type is self defined one which holds byte[] , but just
>>>>> like the original StringValue, also has the same serDe performance.
>>>>>
>>>>>
>>>>> while (join.nextRecord()) {
>>>>>   Record currentProbeRecord = join.getCurrentProbeRecord();
>>>>>   MutableObjectIterator<Record> buildSideIterator = join.getBuildSideIterator();
>>>>>   while (buildSideIterator.next(reusedBuildSideRow) != null) {
>>>>>     materializeRecord2OutVector(reusedBuildSideRow, buildSideIndex2Value, buildSideIndex2Vector, rowNum);
>>>>>     materializeRecord2OutVector(currentProbeRecord, probeSideIndex2Value, probeSideIndex2Vector, rowNum);
>>>>>     rowNum++;
>>>>>   }}
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> I have tried both the Record ,Row class as the type of records without
>>>>> any better improved performance . I also tried batched the input records.
>>>>> That means the  buildInput or probeInput variables of the first code
>>>>> block which iterate one Record a time from another batched Records .
>>>>> Batched records's content stay in memory in Drill's ValueVector format.
>>>>> Once a record is need to participate in the build or probe phase from a
>>>>> iterate.next() call,
>>>>> it will be fetched from the batched in memory ValueVector content. But
>>>>> no performance gains.
>>>>>
>>>>>
>>>>> The top hotspot profile from Jprofiler is below:
>>>>> >
>>>>> Hot spot,"Self time (microseconds)","Average Time","Invocations"
>>>>> org.apache.flink.types.Record.serialize,1014127,"n/a","n/a"
>>>>> org.apache.flink.types.Record.deserialize,60684,"n/a","n/a"
>>>>> org.apache.flink.types.Record.copyTo,83007,"n/a","n/a"
>>>>> org.apache.flink.runtime.operators.hash.MutableHashTable.
>>>>> open,55238,"n/a","n/a"
>>>>> org.apache.flink.runtime.operators.hash.MutableHashTable.
>>>>> nextRecord,10955,"n/a","n/a"
>>>>> org.apache.flink.runtime.memory.MemoryManager.release,33484,
>>>>> "n/a","n/a"
>>>>> org.apache.flink.runtime.memory.MemoryManager.allocatePages,
>>>>> 104259,"n/a","n/a"
>>>>>
>>>>>
>>>>> My log show that hashjoin.open()  method costs too much time.
>>>>> >
>>>>> construct hash table elapsed:1885ms
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Mon, May 15, 2017 at 6:20 PM, Till Rohrmann <tr...@apache.org>
>>>>> wrote:
>>>>>
>>>>>> Hi Weijie,
>>>>>>
>>>>>> it might be the case that batching the processing of multiple rows
>>>>>> can give you an improved performance compared to single row processing.
>>>>>>
>>>>>> Maybe you could share the exact benchmark base line results and the
>>>>>> code you use to test Flink's MutableHashTable with us. Also the Flink
>>>>>> configuration and how you run it would be of interest. That way we might be
>>>>>> able to see if we can tune Flink a bit more.
>>>>>>
>>>>>> Cheers,
>>>>>> Till
>>>>>>
>>>>>> On Sun, May 14, 2017 at 5:23 AM, weijie tong <tongweijie178@gmail.com
>>>>>> > wrote:
>>>>>>
>>>>>>> I has a test case to use Flink's MutableHashTable class to do a hash
>>>>>>> join on a local machine with 64g memory, 64cores. The test case is one
>>>>>>> build table with 14w rows ,one probe table with 320w rows ,the matched
>>>>>>> result rows is 12 w.
>>>>>>>
>>>>>>> It takes 2.2 seconds to complete the join.The performance seems bad.
>>>>>>> I ensure there's no overflow, the smaller table is the build side. The
>>>>>>> MutableObjectIterator is a sequence of Rows. The Row is composed of several
>>>>>>> fields which are byte[]. Through my log,I find the open() method takes
>>>>>>> 1.560 seconds. The probe iterates phase takes 680ms.  And my Jprofiler's
>>>>>>> profile shows the MutableObjectIterator's next() method call is the
>>>>>>> hotspot.
>>>>>>>
>>>>>>>
>>>>>>> I want to know how to tune this scenario. I find Drill's HashJoin is
>>>>>>> batch model. Its build side's input is a RecordBatch which holds batch of
>>>>>>> rows and memory size is approach to L2 cache. Through this strategy it will
>>>>>>> gain less method calls (that means call to next() ) and much efficient to
>>>>>>> cpu calculation.  I also find SQL server's paper noticed the batch model's
>>>>>>> performance gains (https://www.microsoft.com/en-
>>>>>>> us/research/wp-content/uploads/2013/06/Apollo3-Sigmod-2013-final.pdf)
>>>>>>>  .   I guess the performance's down is due to the single row iterate model.
>>>>>>>
>>>>>>>
>>>>>>> Hope someone to correct my opinion. Also maybe I have a wrong use
>>>>>>>  of the MutableHashTable. wait for someone to give an advice.
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>

Re: questions about Flink's HashJoin performance

Posted by Stephan Ewen <se...@apache.org>.
Hi!

Be aware that the "Row" and "Record" types are not very high performance
data types. You might be measuring the data type overhead, rather than the
hash table performance. Also, the build measurements include the data
generation, which influences the results.

If you want to purely benchmark the HashTable performance, try using
something like "Tuple2<Long, Long>" or so (or write your own custom
TypeSerializer / TypeComparator).

Stephan


On Tue, May 16, 2017 at 11:23 AM, weijie tong <to...@gmail.com>
wrote:

> Thanks for all your enthusiastic response. Yes, My target was to try to
> find the best performance in memory. I got that.
>
> On Tue, 16 May 2017 at 4:10 PM Fabian Hueske <fh...@gmail.com> wrote:
>
>> Hi,
>>
>> Flink's HashJoin implementation was designed to gracefully handle inputs
>> that exceed the main memory.
>> It is not explicitly optimized for in-memory processing and does not play
>> fancy tricks like optimizing cache accesses or batching.
>> I assume your benchmark is about in-memory joins only. This was not the
>> main design goal when the join was implemented but robustness.
>> Since most of the development of Flink focuses on streaming applications
>> at the moment, the join implementation has barely been touched in recent
>> years (except for minor extensions and bugfixes).
>>
>> Regarding your tests, Tuple should give better performance than Row
>> because Row is null-sensitive and serialized a null-mask.
>> There is also a blog post about Flink's join performance [1] which is
>> already a bit dusty but as I said, the algorithm hasn't change much since
>> then.
>>
>> Best, Fabian
>>
>> [1] https://flink.apache.org/news/2015/03/13/peeking-into-
>> Apache-Flinks-Engine-Room.html
>>
>>
>> 2017-05-15 16:26 GMT+02:00 weijie tong <to...@gmail.com>:
>>
>>> The Flink version is 1.2.0
>>>
>>> On Mon, May 15, 2017 at 10:24 PM, weijie tong <to...@gmail.com>
>>> wrote:
>>>
>>>> @Till thanks for your reply.
>>>>
>>>> My code is similar to   HashTableITCase.testInMemoryMutableHashTable()
>>>> . It just use the MutableHashTable class , there's  no other Flink's
>>>> configuration.  The main code body is:
>>>>
>>>> this.recordBuildSideAccessor = RecordSerializer.get();
>>>>> this.recordProbeSideAccessor = RecordSerializer.get();
>>>>> final int[] buildKeyPos = new int[]{buildSideJoinIndex};
>>>>> final int[] probeKeyPos = new int[]{probeSideJoinIndex};
>>>>> final Class<? extends Value>[] keyType = (Class<? extends Value>[]) new Class[]{BytesValue.class};
>>>>> this.recordBuildSideComparator = new RecordComparator(buildKeyPos, keyType);
>>>>> this.recordProbeSideComparator = new RecordComparator(probeKeyPos, keyType);
>>>>> this.pactRecordComparator = new HashJoinVectorJointGroupIterator.RecordPairComparator(buildSideJoinIndex);
>>>>> Sequence<Record> buildSideRecordsSeq = makeSequenceRecordOfSameSideSegments(buildSideSegs, localJoinQuery);
>>>>> Sequence<Record> probeSideRecordsSeq = makeSequenceRecordOfSameSideSegments(probeSideSegs, localJoinQuery);
>>>>> List<MemorySegment> memorySegments;
>>>>> int pageSize = hashTableMemoryManager.getTotalNumPages();
>>>>> try {
>>>>>   memorySegments = this.hashTableMemoryManager.allocatePages(MEM_OWNER, pageSize);
>>>>> }
>>>>> catch (MemoryAllocationException e) {
>>>>>   LOGGER.error("could not allocate " + pageSize + " pages memory for HashJoin", e);
>>>>>   Throwables.propagate(e);
>>>>>   return;
>>>>> }
>>>>> try {
>>>>>   Stopwatch stopwatch = Stopwatch.createStarted();
>>>>>   UniformRecordGenerator buildInput = new UniformRecordGenerator(buildSideRecordsSeq);
>>>>>   UniformRecordGenerator probeInput = new UniformRecordGenerator(probeSideRecordsSeq);
>>>>>   join = new MutableHashTable<Record, Record>(
>>>>>       recordBuildSideAccessor,
>>>>>       recordProbeSideAccessor,
>>>>>       recordBuildSideComparator,
>>>>>       recordProbeSideComparator,
>>>>>       pactRecordComparator,
>>>>>       memorySegments,
>>>>>       ioManager
>>>>>   );
>>>>>   join.open(buildInput,probeInput);
>>>>>
>>>>>   LOGGER.info("construct hash table elapsed:" + stopwatch.elapsed(TimeUnit.MILLISECONDS) + "ms");
>>>>>
>>>>>
>>>> The BytesValue type is self defined one which holds byte[] , but just
>>>> like the original StringValue, also has the same serDe performance.
>>>>
>>>>
>>>> while (join.nextRecord()) {
>>>>   Record currentProbeRecord = join.getCurrentProbeRecord();
>>>>   MutableObjectIterator<Record> buildSideIterator = join.getBuildSideIterator();
>>>>   while (buildSideIterator.next(reusedBuildSideRow) != null) {
>>>>     materializeRecord2OutVector(reusedBuildSideRow, buildSideIndex2Value, buildSideIndex2Vector, rowNum);
>>>>     materializeRecord2OutVector(currentProbeRecord, probeSideIndex2Value, probeSideIndex2Vector, rowNum);
>>>>     rowNum++;
>>>>   }}
>>>>
>>>>
>>>>
>>>>
>>>> I have tried both the Record ,Row class as the type of records without
>>>> any better improved performance . I also tried batched the input records.
>>>> That means the  buildInput or probeInput variables of the first code
>>>> block which iterate one Record a time from another batched Records .
>>>> Batched records's content stay in memory in Drill's ValueVector format.
>>>> Once a record is need to participate in the build or probe phase from a
>>>> iterate.next() call,
>>>> it will be fetched from the batched in memory ValueVector content. But
>>>> no performance gains.
>>>>
>>>>
>>>> The top hotspot profile from Jprofiler is below:
>>>> >
>>>> Hot spot,"Self time (microseconds)","Average Time","Invocations"
>>>> org.apache.flink.types.Record.serialize,1014127,"n/a","n/a"
>>>> org.apache.flink.types.Record.deserialize,60684,"n/a","n/a"
>>>> org.apache.flink.types.Record.copyTo,83007,"n/a","n/a"
>>>> org.apache.flink.runtime.operators.hash.MutableHashTable.open,55238,"
>>>> n/a","n/a"
>>>> org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord,
>>>> 10955,"n/a","n/a"
>>>> org.apache.flink.runtime.memory.MemoryManager.release,33484,"n/a","n/a"
>>>> org.apache.flink.runtime.memory.MemoryManager.
>>>> allocatePages,104259,"n/a","n/a"
>>>>
>>>>
>>>> My log show that hashjoin.open()  method costs too much time.
>>>> >
>>>> construct hash table elapsed:1885ms
>>>>
>>>>
>>>>
>>>>
>>>> On Mon, May 15, 2017 at 6:20 PM, Till Rohrmann <tr...@apache.org>
>>>> wrote:
>>>>
>>>>> Hi Weijie,
>>>>>
>>>>> it might be the case that batching the processing of multiple rows can
>>>>> give you an improved performance compared to single row processing.
>>>>>
>>>>> Maybe you could share the exact benchmark base line results and the
>>>>> code you use to test Flink's MutableHashTable with us. Also the Flink
>>>>> configuration and how you run it would be of interest. That way we might be
>>>>> able to see if we can tune Flink a bit more.
>>>>>
>>>>> Cheers,
>>>>> Till
>>>>>
>>>>> On Sun, May 14, 2017 at 5:23 AM, weijie tong <to...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> I has a test case to use Flink's MutableHashTable class to do a hash
>>>>>> join on a local machine with 64g memory, 64cores. The test case is one
>>>>>> build table with 14w rows ,one probe table with 320w rows ,the matched
>>>>>> result rows is 12 w.
>>>>>>
>>>>>> It takes 2.2 seconds to complete the join.The performance seems bad.
>>>>>> I ensure there's no overflow, the smaller table is the build side. The
>>>>>> MutableObjectIterator is a sequence of Rows. The Row is composed of several
>>>>>> fields which are byte[]. Through my log,I find the open() method takes
>>>>>> 1.560 seconds. The probe iterates phase takes 680ms.  And my Jprofiler's
>>>>>> profile shows the MutableObjectIterator's next() method call is the
>>>>>> hotspot.
>>>>>>
>>>>>>
>>>>>> I want to know how to tune this scenario. I find Drill's HashJoin is
>>>>>> batch model. Its build side's input is a RecordBatch which holds batch of
>>>>>> rows and memory size is approach to L2 cache. Through this strategy it will
>>>>>> gain less method calls (that means call to next() ) and much efficient to
>>>>>> cpu calculation.  I also find SQL server's paper noticed the batch model's
>>>>>> performance gains (https://www.microsoft.com/en-
>>>>>> us/research/wp-content/uploads/2013/06/Apollo3-Sigmod-2013-final.pdf)
>>>>>>  .   I guess the performance's down is due to the single row iterate model.
>>>>>>
>>>>>>
>>>>>> Hope someone to correct my opinion. Also maybe I have a wrong use  of
>>>>>> the MutableHashTable. wait for someone to give an advice.
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>

Re: questions about Flink's HashJoin performance

Posted by Stephan Ewen <se...@apache.org>.
Hi!

Be aware that the "Row" and "Record" types are not very high performance
data types. You might be measuring the data type overhead, rather than the
hash table performance. Also, the build measurements include the data
generation, which influences the results.

If you want to purely benchmark the HashTable performance, try using
something like "Tuple2<Long, Long>" or so (or write your own custom
TypeSerializer / TypeComparator).

Stephan


On Tue, May 16, 2017 at 11:23 AM, weijie tong <to...@gmail.com>
wrote:

> Thanks for all your enthusiastic response. Yes, My target was to try to
> find the best performance in memory. I got that.
>
> On Tue, 16 May 2017 at 4:10 PM Fabian Hueske <fh...@gmail.com> wrote:
>
>> Hi,
>>
>> Flink's HashJoin implementation was designed to gracefully handle inputs
>> that exceed the main memory.
>> It is not explicitly optimized for in-memory processing and does not play
>> fancy tricks like optimizing cache accesses or batching.
>> I assume your benchmark is about in-memory joins only. This was not the
>> main design goal when the join was implemented but robustness.
>> Since most of the development of Flink focuses on streaming applications
>> at the moment, the join implementation has barely been touched in recent
>> years (except for minor extensions and bugfixes).
>>
>> Regarding your tests, Tuple should give better performance than Row
>> because Row is null-sensitive and serialized a null-mask.
>> There is also a blog post about Flink's join performance [1] which is
>> already a bit dusty but as I said, the algorithm hasn't change much since
>> then.
>>
>> Best, Fabian
>>
>> [1] https://flink.apache.org/news/2015/03/13/peeking-into-
>> Apache-Flinks-Engine-Room.html
>>
>>
>> 2017-05-15 16:26 GMT+02:00 weijie tong <to...@gmail.com>:
>>
>>> The Flink version is 1.2.0
>>>
>>> On Mon, May 15, 2017 at 10:24 PM, weijie tong <to...@gmail.com>
>>> wrote:
>>>
>>>> @Till thanks for your reply.
>>>>
>>>> My code is similar to   HashTableITCase.testInMemoryMutableHashTable()
>>>> . It just use the MutableHashTable class , there's  no other Flink's
>>>> configuration.  The main code body is:
>>>>
>>>> this.recordBuildSideAccessor = RecordSerializer.get();
>>>>> this.recordProbeSideAccessor = RecordSerializer.get();
>>>>> final int[] buildKeyPos = new int[]{buildSideJoinIndex};
>>>>> final int[] probeKeyPos = new int[]{probeSideJoinIndex};
>>>>> final Class<? extends Value>[] keyType = (Class<? extends Value>[]) new Class[]{BytesValue.class};
>>>>> this.recordBuildSideComparator = new RecordComparator(buildKeyPos, keyType);
>>>>> this.recordProbeSideComparator = new RecordComparator(probeKeyPos, keyType);
>>>>> this.pactRecordComparator = new HashJoinVectorJointGroupIterator.RecordPairComparator(buildSideJoinIndex);
>>>>> Sequence<Record> buildSideRecordsSeq = makeSequenceRecordOfSameSideSegments(buildSideSegs, localJoinQuery);
>>>>> Sequence<Record> probeSideRecordsSeq = makeSequenceRecordOfSameSideSegments(probeSideSegs, localJoinQuery);
>>>>> List<MemorySegment> memorySegments;
>>>>> int pageSize = hashTableMemoryManager.getTotalNumPages();
>>>>> try {
>>>>>   memorySegments = this.hashTableMemoryManager.allocatePages(MEM_OWNER, pageSize);
>>>>> }
>>>>> catch (MemoryAllocationException e) {
>>>>>   LOGGER.error("could not allocate " + pageSize + " pages memory for HashJoin", e);
>>>>>   Throwables.propagate(e);
>>>>>   return;
>>>>> }
>>>>> try {
>>>>>   Stopwatch stopwatch = Stopwatch.createStarted();
>>>>>   UniformRecordGenerator buildInput = new UniformRecordGenerator(buildSideRecordsSeq);
>>>>>   UniformRecordGenerator probeInput = new UniformRecordGenerator(probeSideRecordsSeq);
>>>>>   join = new MutableHashTable<Record, Record>(
>>>>>       recordBuildSideAccessor,
>>>>>       recordProbeSideAccessor,
>>>>>       recordBuildSideComparator,
>>>>>       recordProbeSideComparator,
>>>>>       pactRecordComparator,
>>>>>       memorySegments,
>>>>>       ioManager
>>>>>   );
>>>>>   join.open(buildInput,probeInput);
>>>>>
>>>>>   LOGGER.info("construct hash table elapsed:" + stopwatch.elapsed(TimeUnit.MILLISECONDS) + "ms");
>>>>>
>>>>>
>>>> The BytesValue type is self defined one which holds byte[] , but just
>>>> like the original StringValue, also has the same serDe performance.
>>>>
>>>>
>>>> while (join.nextRecord()) {
>>>>   Record currentProbeRecord = join.getCurrentProbeRecord();
>>>>   MutableObjectIterator<Record> buildSideIterator = join.getBuildSideIterator();
>>>>   while (buildSideIterator.next(reusedBuildSideRow) != null) {
>>>>     materializeRecord2OutVector(reusedBuildSideRow, buildSideIndex2Value, buildSideIndex2Vector, rowNum);
>>>>     materializeRecord2OutVector(currentProbeRecord, probeSideIndex2Value, probeSideIndex2Vector, rowNum);
>>>>     rowNum++;
>>>>   }}
>>>>
>>>>
>>>>
>>>>
>>>> I have tried both the Record ,Row class as the type of records without
>>>> any better improved performance . I also tried batched the input records.
>>>> That means the  buildInput or probeInput variables of the first code
>>>> block which iterate one Record a time from another batched Records .
>>>> Batched records's content stay in memory in Drill's ValueVector format.
>>>> Once a record is need to participate in the build or probe phase from a
>>>> iterate.next() call,
>>>> it will be fetched from the batched in memory ValueVector content. But
>>>> no performance gains.
>>>>
>>>>
>>>> The top hotspot profile from Jprofiler is below:
>>>> >
>>>> Hot spot,"Self time (microseconds)","Average Time","Invocations"
>>>> org.apache.flink.types.Record.serialize,1014127,"n/a","n/a"
>>>> org.apache.flink.types.Record.deserialize,60684,"n/a","n/a"
>>>> org.apache.flink.types.Record.copyTo,83007,"n/a","n/a"
>>>> org.apache.flink.runtime.operators.hash.MutableHashTable.open,55238,"
>>>> n/a","n/a"
>>>> org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord,
>>>> 10955,"n/a","n/a"
>>>> org.apache.flink.runtime.memory.MemoryManager.release,33484,"n/a","n/a"
>>>> org.apache.flink.runtime.memory.MemoryManager.
>>>> allocatePages,104259,"n/a","n/a"
>>>>
>>>>
>>>> My log show that hashjoin.open()  method costs too much time.
>>>> >
>>>> construct hash table elapsed:1885ms
>>>>
>>>>
>>>>
>>>>
>>>> On Mon, May 15, 2017 at 6:20 PM, Till Rohrmann <tr...@apache.org>
>>>> wrote:
>>>>
>>>>> Hi Weijie,
>>>>>
>>>>> it might be the case that batching the processing of multiple rows can
>>>>> give you an improved performance compared to single row processing.
>>>>>
>>>>> Maybe you could share the exact benchmark base line results and the
>>>>> code you use to test Flink's MutableHashTable with us. Also the Flink
>>>>> configuration and how you run it would be of interest. That way we might be
>>>>> able to see if we can tune Flink a bit more.
>>>>>
>>>>> Cheers,
>>>>> Till
>>>>>
>>>>> On Sun, May 14, 2017 at 5:23 AM, weijie tong <to...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> I has a test case to use Flink's MutableHashTable class to do a hash
>>>>>> join on a local machine with 64g memory, 64cores. The test case is one
>>>>>> build table with 14w rows ,one probe table with 320w rows ,the matched
>>>>>> result rows is 12 w.
>>>>>>
>>>>>> It takes 2.2 seconds to complete the join.The performance seems bad.
>>>>>> I ensure there's no overflow, the smaller table is the build side. The
>>>>>> MutableObjectIterator is a sequence of Rows. The Row is composed of several
>>>>>> fields which are byte[]. Through my log,I find the open() method takes
>>>>>> 1.560 seconds. The probe iterates phase takes 680ms.  And my Jprofiler's
>>>>>> profile shows the MutableObjectIterator's next() method call is the
>>>>>> hotspot.
>>>>>>
>>>>>>
>>>>>> I want to know how to tune this scenario. I find Drill's HashJoin is
>>>>>> batch model. Its build side's input is a RecordBatch which holds batch of
>>>>>> rows and memory size is approach to L2 cache. Through this strategy it will
>>>>>> gain less method calls (that means call to next() ) and much efficient to
>>>>>> cpu calculation.  I also find SQL server's paper noticed the batch model's
>>>>>> performance gains (https://www.microsoft.com/en-
>>>>>> us/research/wp-content/uploads/2013/06/Apollo3-Sigmod-2013-final.pdf)
>>>>>>  .   I guess the performance's down is due to the single row iterate model.
>>>>>>
>>>>>>
>>>>>> Hope someone to correct my opinion. Also maybe I have a wrong use  of
>>>>>> the MutableHashTable. wait for someone to give an advice.
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>

Re: questions about Flink's HashJoin performance

Posted by weijie tong <to...@gmail.com>.
Thanks for all your enthusiastic response. Yes, My target was to try to
find the best performance in memory. I got that.
On Tue, 16 May 2017 at 4:10 PM Fabian Hueske <fh...@gmail.com> wrote:

> Hi,
>
> Flink's HashJoin implementation was designed to gracefully handle inputs
> that exceed the main memory.
> It is not explicitly optimized for in-memory processing and does not play
> fancy tricks like optimizing cache accesses or batching.
> I assume your benchmark is about in-memory joins only. This was not the
> main design goal when the join was implemented but robustness.
> Since most of the development of Flink focuses on streaming applications
> at the moment, the join implementation has barely been touched in recent
> years (except for minor extensions and bugfixes).
>
> Regarding your tests, Tuple should give better performance than Row
> because Row is null-sensitive and serialized a null-mask.
> There is also a blog post about Flink's join performance [1] which is
> already a bit dusty but as I said, the algorithm hasn't change much since
> then.
>
> Best, Fabian
>
> [1]
> https://flink.apache.org/news/2015/03/13/peeking-into-Apache-Flinks-Engine-Room.html
>
>
> 2017-05-15 16:26 GMT+02:00 weijie tong <to...@gmail.com>:
>
>> The Flink version is 1.2.0
>>
>> On Mon, May 15, 2017 at 10:24 PM, weijie tong <to...@gmail.com>
>> wrote:
>>
>>> @Till thanks for your reply.
>>>
>>> My code is similar to   HashTableITCase.testInMemoryMutableHashTable()
>>> . It just use the MutableHashTable class , there's  no other Flink's
>>> configuration.  The main code body is:
>>>
>>> this.recordBuildSideAccessor = RecordSerializer.get();
>>>> this.recordProbeSideAccessor = RecordSerializer.get();
>>>> final int[] buildKeyPos = new int[]{buildSideJoinIndex};
>>>> final int[] probeKeyPos = new int[]{probeSideJoinIndex};
>>>> final Class<? extends Value>[] keyType = (Class<? extends Value>[]) new Class[]{BytesValue.class};
>>>> this.recordBuildSideComparator = new RecordComparator(buildKeyPos, keyType);
>>>> this.recordProbeSideComparator = new RecordComparator(probeKeyPos, keyType);
>>>> this.pactRecordComparator = new HashJoinVectorJointGroupIterator.RecordPairComparator(buildSideJoinIndex);
>>>> Sequence<Record> buildSideRecordsSeq = makeSequenceRecordOfSameSideSegments(buildSideSegs, localJoinQuery);
>>>> Sequence<Record> probeSideRecordsSeq = makeSequenceRecordOfSameSideSegments(probeSideSegs, localJoinQuery);
>>>> List<MemorySegment> memorySegments;
>>>> int pageSize = hashTableMemoryManager.getTotalNumPages();
>>>> try {
>>>>   memorySegments = this.hashTableMemoryManager.allocatePages(MEM_OWNER, pageSize);
>>>> }
>>>> catch (MemoryAllocationException e) {
>>>>   LOGGER.error("could not allocate " + pageSize + " pages memory for HashJoin", e);
>>>>   Throwables.propagate(e);
>>>>   return;
>>>> }
>>>> try {
>>>>   Stopwatch stopwatch = Stopwatch.createStarted();
>>>>   UniformRecordGenerator buildInput = new UniformRecordGenerator(buildSideRecordsSeq);
>>>>   UniformRecordGenerator probeInput = new UniformRecordGenerator(probeSideRecordsSeq);
>>>>   join = new MutableHashTable<Record, Record>(
>>>>       recordBuildSideAccessor,
>>>>       recordProbeSideAccessor,
>>>>       recordBuildSideComparator,
>>>>       recordProbeSideComparator,
>>>>       pactRecordComparator,
>>>>       memorySegments,
>>>>       ioManager
>>>>   );
>>>>   join.open(buildInput,probeInput);
>>>>
>>>>   LOGGER.info("construct hash table elapsed:" + stopwatch.elapsed(TimeUnit.MILLISECONDS) + "ms");
>>>>
>>>>
>>> The BytesValue type is self defined one which holds byte[] , but just
>>> like the original StringValue, also has the same serDe performance.
>>>
>>>
>>> while (join.nextRecord()) {
>>>   Record currentProbeRecord = join.getCurrentProbeRecord();
>>>   MutableObjectIterator<Record> buildSideIterator = join.getBuildSideIterator();
>>>   while (buildSideIterator.next(reusedBuildSideRow) != null) {
>>>     materializeRecord2OutVector(reusedBuildSideRow, buildSideIndex2Value, buildSideIndex2Vector, rowNum);
>>>     materializeRecord2OutVector(currentProbeRecord, probeSideIndex2Value, probeSideIndex2Vector, rowNum);
>>>     rowNum++;
>>>   }}
>>>
>>>
>>>
>>>
>>> I have tried both the Record ,Row class as the type of records without
>>> any better improved performance . I also tried batched the input records.
>>> That means the  buildInput or probeInput variables of the first code
>>> block which iterate one Record a time from another batched Records .
>>> Batched records's content stay in memory in Drill's ValueVector format.
>>> Once a record is need to participate in the build or probe phase from a
>>> iterate.next() call,
>>> it will be fetched from the batched in memory ValueVector content. But
>>> no performance gains.
>>>
>>>
>>> The top hotspot profile from Jprofiler is below:
>>> >
>>> Hot spot,"Self time (microseconds)","Average Time","Invocations"
>>> org.apache.flink.types.Record.serialize,1014127,"n/a","n/a"
>>> org.apache.flink.types.Record.deserialize,60684,"n/a","n/a"
>>> org.apache.flink.types.Record.copyTo,83007,"n/a","n/a"
>>>
>>> org.apache.flink.runtime.operators.hash.MutableHashTable.open,55238,"n/a","n/a"
>>>
>>> org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord,10955,"n/a","n/a"
>>> org.apache.flink.runtime.memory.MemoryManager.release,33484,"n/a","n/a"
>>>
>>> org.apache.flink.runtime.memory.MemoryManager.allocatePages,104259,"n/a","n/a"
>>>
>>>
>>> My log show that hashjoin.open()  method costs too much time.
>>> >
>>> construct hash table elapsed:1885ms
>>>
>>>
>>>
>>>
>>> On Mon, May 15, 2017 at 6:20 PM, Till Rohrmann <tr...@apache.org>
>>> wrote:
>>>
>>>> Hi Weijie,
>>>>
>>>> it might be the case that batching the processing of multiple rows can
>>>> give you an improved performance compared to single row processing.
>>>>
>>>> Maybe you could share the exact benchmark base line results and the
>>>> code you use to test Flink's MutableHashTable with us. Also the Flink
>>>> configuration and how you run it would be of interest. That way we might be
>>>> able to see if we can tune Flink a bit more.
>>>>
>>>> Cheers,
>>>> Till
>>>>
>>>> On Sun, May 14, 2017 at 5:23 AM, weijie tong <to...@gmail.com>
>>>> wrote:
>>>>
>>>>> I has a test case to use Flink's MutableHashTable class to do a hash
>>>>> join on a local machine with 64g memory, 64cores. The test case is one
>>>>> build table with 14w rows ,one probe table with 320w rows ,the matched
>>>>> result rows is 12 w.
>>>>>
>>>>> It takes 2.2 seconds to complete the join.The performance seems bad. I
>>>>> ensure there's no overflow, the smaller table is the build side. The
>>>>> MutableObjectIterator is a sequence of Rows. The Row is composed of several
>>>>> fields which are byte[]. Through my log,I find the open() method takes
>>>>> 1.560 seconds. The probe iterates phase takes 680ms.  And my Jprofiler's
>>>>> profile shows the MutableObjectIterator's next() method call is the
>>>>> hotspot.
>>>>>
>>>>>
>>>>> I want to know how to tune this scenario. I find Drill's HashJoin is
>>>>> batch model. Its build side's input is a RecordBatch which holds batch of
>>>>> rows and memory size is approach to L2 cache. Through this strategy it will
>>>>> gain less method calls (that means call to next() ) and much efficient to
>>>>> cpu calculation.  I also find SQL server's paper noticed the batch model's
>>>>> performance gains (
>>>>> https://www.microsoft.com/en-us/research/wp-content/uploads/2013/06/Apollo3-Sigmod-2013-final.pdf)
>>>>>  .   I guess the performance's down is due to the single row iterate model.
>>>>>
>>>>>
>>>>> Hope someone to correct my opinion. Also maybe I have a wrong use  of
>>>>> the MutableHashTable. wait for someone to give an advice.
>>>>>
>>>>
>>>>
>>>
>>
>

Re: questions about Flink's HashJoin performance

Posted by weijie tong <to...@gmail.com>.
Thanks for all your enthusiastic response. Yes, My target was to try to
find the best performance in memory. I got that.
On Tue, 16 May 2017 at 4:10 PM Fabian Hueske <fh...@gmail.com> wrote:

> Hi,
>
> Flink's HashJoin implementation was designed to gracefully handle inputs
> that exceed the main memory.
> It is not explicitly optimized for in-memory processing and does not play
> fancy tricks like optimizing cache accesses or batching.
> I assume your benchmark is about in-memory joins only. This was not the
> main design goal when the join was implemented but robustness.
> Since most of the development of Flink focuses on streaming applications
> at the moment, the join implementation has barely been touched in recent
> years (except for minor extensions and bugfixes).
>
> Regarding your tests, Tuple should give better performance than Row
> because Row is null-sensitive and serialized a null-mask.
> There is also a blog post about Flink's join performance [1] which is
> already a bit dusty but as I said, the algorithm hasn't change much since
> then.
>
> Best, Fabian
>
> [1]
> https://flink.apache.org/news/2015/03/13/peeking-into-Apache-Flinks-Engine-Room.html
>
>
> 2017-05-15 16:26 GMT+02:00 weijie tong <to...@gmail.com>:
>
>> The Flink version is 1.2.0
>>
>> On Mon, May 15, 2017 at 10:24 PM, weijie tong <to...@gmail.com>
>> wrote:
>>
>>> @Till thanks for your reply.
>>>
>>> My code is similar to   HashTableITCase.testInMemoryMutableHashTable()
>>> . It just use the MutableHashTable class , there's  no other Flink's
>>> configuration.  The main code body is:
>>>
>>> this.recordBuildSideAccessor = RecordSerializer.get();
>>>> this.recordProbeSideAccessor = RecordSerializer.get();
>>>> final int[] buildKeyPos = new int[]{buildSideJoinIndex};
>>>> final int[] probeKeyPos = new int[]{probeSideJoinIndex};
>>>> final Class<? extends Value>[] keyType = (Class<? extends Value>[]) new Class[]{BytesValue.class};
>>>> this.recordBuildSideComparator = new RecordComparator(buildKeyPos, keyType);
>>>> this.recordProbeSideComparator = new RecordComparator(probeKeyPos, keyType);
>>>> this.pactRecordComparator = new HashJoinVectorJointGroupIterator.RecordPairComparator(buildSideJoinIndex);
>>>> Sequence<Record> buildSideRecordsSeq = makeSequenceRecordOfSameSideSegments(buildSideSegs, localJoinQuery);
>>>> Sequence<Record> probeSideRecordsSeq = makeSequenceRecordOfSameSideSegments(probeSideSegs, localJoinQuery);
>>>> List<MemorySegment> memorySegments;
>>>> int pageSize = hashTableMemoryManager.getTotalNumPages();
>>>> try {
>>>>   memorySegments = this.hashTableMemoryManager.allocatePages(MEM_OWNER, pageSize);
>>>> }
>>>> catch (MemoryAllocationException e) {
>>>>   LOGGER.error("could not allocate " + pageSize + " pages memory for HashJoin", e);
>>>>   Throwables.propagate(e);
>>>>   return;
>>>> }
>>>> try {
>>>>   Stopwatch stopwatch = Stopwatch.createStarted();
>>>>   UniformRecordGenerator buildInput = new UniformRecordGenerator(buildSideRecordsSeq);
>>>>   UniformRecordGenerator probeInput = new UniformRecordGenerator(probeSideRecordsSeq);
>>>>   join = new MutableHashTable<Record, Record>(
>>>>       recordBuildSideAccessor,
>>>>       recordProbeSideAccessor,
>>>>       recordBuildSideComparator,
>>>>       recordProbeSideComparator,
>>>>       pactRecordComparator,
>>>>       memorySegments,
>>>>       ioManager
>>>>   );
>>>>   join.open(buildInput,probeInput);
>>>>
>>>>   LOGGER.info("construct hash table elapsed:" + stopwatch.elapsed(TimeUnit.MILLISECONDS) + "ms");
>>>>
>>>>
>>> The BytesValue type is self defined one which holds byte[] , but just
>>> like the original StringValue, also has the same serDe performance.
>>>
>>>
>>> while (join.nextRecord()) {
>>>   Record currentProbeRecord = join.getCurrentProbeRecord();
>>>   MutableObjectIterator<Record> buildSideIterator = join.getBuildSideIterator();
>>>   while (buildSideIterator.next(reusedBuildSideRow) != null) {
>>>     materializeRecord2OutVector(reusedBuildSideRow, buildSideIndex2Value, buildSideIndex2Vector, rowNum);
>>>     materializeRecord2OutVector(currentProbeRecord, probeSideIndex2Value, probeSideIndex2Vector, rowNum);
>>>     rowNum++;
>>>   }}
>>>
>>>
>>>
>>>
>>> I have tried both the Record ,Row class as the type of records without
>>> any better improved performance . I also tried batched the input records.
>>> That means the  buildInput or probeInput variables of the first code
>>> block which iterate one Record a time from another batched Records .
>>> Batched records's content stay in memory in Drill's ValueVector format.
>>> Once a record is need to participate in the build or probe phase from a
>>> iterate.next() call,
>>> it will be fetched from the batched in memory ValueVector content. But
>>> no performance gains.
>>>
>>>
>>> The top hotspot profile from Jprofiler is below:
>>> >
>>> Hot spot,"Self time (microseconds)","Average Time","Invocations"
>>> org.apache.flink.types.Record.serialize,1014127,"n/a","n/a"
>>> org.apache.flink.types.Record.deserialize,60684,"n/a","n/a"
>>> org.apache.flink.types.Record.copyTo,83007,"n/a","n/a"
>>>
>>> org.apache.flink.runtime.operators.hash.MutableHashTable.open,55238,"n/a","n/a"
>>>
>>> org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord,10955,"n/a","n/a"
>>> org.apache.flink.runtime.memory.MemoryManager.release,33484,"n/a","n/a"
>>>
>>> org.apache.flink.runtime.memory.MemoryManager.allocatePages,104259,"n/a","n/a"
>>>
>>>
>>> My log show that hashjoin.open()  method costs too much time.
>>> >
>>> construct hash table elapsed:1885ms
>>>
>>>
>>>
>>>
>>> On Mon, May 15, 2017 at 6:20 PM, Till Rohrmann <tr...@apache.org>
>>> wrote:
>>>
>>>> Hi Weijie,
>>>>
>>>> it might be the case that batching the processing of multiple rows can
>>>> give you an improved performance compared to single row processing.
>>>>
>>>> Maybe you could share the exact benchmark base line results and the
>>>> code you use to test Flink's MutableHashTable with us. Also the Flink
>>>> configuration and how you run it would be of interest. That way we might be
>>>> able to see if we can tune Flink a bit more.
>>>>
>>>> Cheers,
>>>> Till
>>>>
>>>> On Sun, May 14, 2017 at 5:23 AM, weijie tong <to...@gmail.com>
>>>> wrote:
>>>>
>>>>> I has a test case to use Flink's MutableHashTable class to do a hash
>>>>> join on a local machine with 64g memory, 64cores. The test case is one
>>>>> build table with 14w rows ,one probe table with 320w rows ,the matched
>>>>> result rows is 12 w.
>>>>>
>>>>> It takes 2.2 seconds to complete the join.The performance seems bad. I
>>>>> ensure there's no overflow, the smaller table is the build side. The
>>>>> MutableObjectIterator is a sequence of Rows. The Row is composed of several
>>>>> fields which are byte[]. Through my log,I find the open() method takes
>>>>> 1.560 seconds. The probe iterates phase takes 680ms.  And my Jprofiler's
>>>>> profile shows the MutableObjectIterator's next() method call is the
>>>>> hotspot.
>>>>>
>>>>>
>>>>> I want to know how to tune this scenario. I find Drill's HashJoin is
>>>>> batch model. Its build side's input is a RecordBatch which holds batch of
>>>>> rows and memory size is approach to L2 cache. Through this strategy it will
>>>>> gain less method calls (that means call to next() ) and much efficient to
>>>>> cpu calculation.  I also find SQL server's paper noticed the batch model's
>>>>> performance gains (
>>>>> https://www.microsoft.com/en-us/research/wp-content/uploads/2013/06/Apollo3-Sigmod-2013-final.pdf)
>>>>>  .   I guess the performance's down is due to the single row iterate model.
>>>>>
>>>>>
>>>>> Hope someone to correct my opinion. Also maybe I have a wrong use  of
>>>>> the MutableHashTable. wait for someone to give an advice.
>>>>>
>>>>
>>>>
>>>
>>
>

Re: questions about Flink's HashJoin performance

Posted by Fabian Hueske <fh...@gmail.com>.
Hi,

Flink's HashJoin implementation was designed to gracefully handle inputs
that exceed the main memory.
It is not explicitly optimized for in-memory processing and does not play
fancy tricks like optimizing cache accesses or batching.
I assume your benchmark is about in-memory joins only. This was not the
main design goal when the join was implemented but robustness.
Since most of the development of Flink focuses on streaming applications at
the moment, the join implementation has barely been touched in recent years
(except for minor extensions and bugfixes).

Regarding your tests, Tuple should give better performance than Row because
Row is null-sensitive and serialized a null-mask.
There is also a blog post about Flink's join performance [1] which is
already a bit dusty but as I said, the algorithm hasn't change much since
then.

Best, Fabian

[1]
https://flink.apache.org/news/2015/03/13/peeking-into-Apache-Flinks-Engine-Room.html


2017-05-15 16:26 GMT+02:00 weijie tong <to...@gmail.com>:

> The Flink version is 1.2.0
>
> On Mon, May 15, 2017 at 10:24 PM, weijie tong <to...@gmail.com>
> wrote:
>
>> @Till thanks for your reply.
>>
>> My code is similar to   HashTableITCase.testInMemoryMutableHashTable()
>> . It just use the MutableHashTable class , there's  no other Flink's
>> configuration.  The main code body is:
>>
>> this.recordBuildSideAccessor = RecordSerializer.get();
>>> this.recordProbeSideAccessor = RecordSerializer.get();
>>> final int[] buildKeyPos = new int[]{buildSideJoinIndex};
>>> final int[] probeKeyPos = new int[]{probeSideJoinIndex};
>>> final Class<? extends Value>[] keyType = (Class<? extends Value>[]) new Class[]{BytesValue.class};
>>> this.recordBuildSideComparator = new RecordComparator(buildKeyPos, keyType);
>>> this.recordProbeSideComparator = new RecordComparator(probeKeyPos, keyType);
>>> this.pactRecordComparator = new HashJoinVectorJointGroupIterator.RecordPairComparator(buildSideJoinIndex);
>>> Sequence<Record> buildSideRecordsSeq = makeSequenceRecordOfSameSideSegments(buildSideSegs, localJoinQuery);
>>> Sequence<Record> probeSideRecordsSeq = makeSequenceRecordOfSameSideSegments(probeSideSegs, localJoinQuery);
>>> List<MemorySegment> memorySegments;
>>> int pageSize = hashTableMemoryManager.getTotalNumPages();
>>> try {
>>>   memorySegments = this.hashTableMemoryManager.allocatePages(MEM_OWNER, pageSize);
>>> }
>>> catch (MemoryAllocationException e) {
>>>   LOGGER.error("could not allocate " + pageSize + " pages memory for HashJoin", e);
>>>   Throwables.propagate(e);
>>>   return;
>>> }
>>> try {
>>>   Stopwatch stopwatch = Stopwatch.createStarted();
>>>   UniformRecordGenerator buildInput = new UniformRecordGenerator(buildSideRecordsSeq);
>>>   UniformRecordGenerator probeInput = new UniformRecordGenerator(probeSideRecordsSeq);
>>>   join = new MutableHashTable<Record, Record>(
>>>       recordBuildSideAccessor,
>>>       recordProbeSideAccessor,
>>>       recordBuildSideComparator,
>>>       recordProbeSideComparator,
>>>       pactRecordComparator,
>>>       memorySegments,
>>>       ioManager
>>>   );
>>>   join.open(buildInput,probeInput);
>>>
>>>   LOGGER.info("construct hash table elapsed:" + stopwatch.elapsed(TimeUnit.MILLISECONDS) + "ms");
>>>
>>>
>> The BytesValue type is self defined one which holds byte[] , but just
>> like the original StringValue, also has the same serDe performance.
>>
>>
>> while (join.nextRecord()) {
>>   Record currentProbeRecord = join.getCurrentProbeRecord();
>>   MutableObjectIterator<Record> buildSideIterator = join.getBuildSideIterator();
>>   while (buildSideIterator.next(reusedBuildSideRow) != null) {
>>     materializeRecord2OutVector(reusedBuildSideRow, buildSideIndex2Value, buildSideIndex2Vector, rowNum);
>>     materializeRecord2OutVector(currentProbeRecord, probeSideIndex2Value, probeSideIndex2Vector, rowNum);
>>     rowNum++;
>>   }}
>>
>>
>>
>>
>> I have tried both the Record ,Row class as the type of records without
>> any better improved performance . I also tried batched the input records.
>> That means the  buildInput or probeInput variables of the first code
>> block which iterate one Record a time from another batched Records .
>> Batched records's content stay in memory in Drill's ValueVector format.
>> Once a record is need to participate in the build or probe phase from a
>> iterate.next() call,
>> it will be fetched from the batched in memory ValueVector content. But no
>> performance gains.
>>
>>
>> The top hotspot profile from Jprofiler is below:
>> >
>> Hot spot,"Self time (microseconds)","Average Time","Invocations"
>> org.apache.flink.types.Record.serialize,1014127,"n/a","n/a"
>> org.apache.flink.types.Record.deserialize,60684,"n/a","n/a"
>> org.apache.flink.types.Record.copyTo,83007,"n/a","n/a"
>> org.apache.flink.runtime.operators.hash.MutableHashTable.
>> open,55238,"n/a","n/a"
>> org.apache.flink.runtime.operators.hash.MutableHashTable.
>> nextRecord,10955,"n/a","n/a"
>> org.apache.flink.runtime.memory.MemoryManager.release,33484,"n/a","n/a"
>> org.apache.flink.runtime.memory.MemoryManager.allocatePages,
>> 104259,"n/a","n/a"
>>
>>
>> My log show that hashjoin.open()  method costs too much time.
>> >
>> construct hash table elapsed:1885ms
>>
>>
>>
>>
>> On Mon, May 15, 2017 at 6:20 PM, Till Rohrmann <tr...@apache.org>
>> wrote:
>>
>>> Hi Weijie,
>>>
>>> it might be the case that batching the processing of multiple rows can
>>> give you an improved performance compared to single row processing.
>>>
>>> Maybe you could share the exact benchmark base line results and the code
>>> you use to test Flink's MutableHashTable with us. Also the Flink
>>> configuration and how you run it would be of interest. That way we might be
>>> able to see if we can tune Flink a bit more.
>>>
>>> Cheers,
>>> Till
>>>
>>> On Sun, May 14, 2017 at 5:23 AM, weijie tong <to...@gmail.com>
>>> wrote:
>>>
>>>> I has a test case to use Flink's MutableHashTable class to do a hash
>>>> join on a local machine with 64g memory, 64cores. The test case is one
>>>> build table with 14w rows ,one probe table with 320w rows ,the matched
>>>> result rows is 12 w.
>>>>
>>>> It takes 2.2 seconds to complete the join.The performance seems bad. I
>>>> ensure there's no overflow, the smaller table is the build side. The
>>>> MutableObjectIterator is a sequence of Rows. The Row is composed of several
>>>> fields which are byte[]. Through my log,I find the open() method takes
>>>> 1.560 seconds. The probe iterates phase takes 680ms.  And my Jprofiler's
>>>> profile shows the MutableObjectIterator's next() method call is the
>>>> hotspot.
>>>>
>>>>
>>>> I want to know how to tune this scenario. I find Drill's HashJoin is
>>>> batch model. Its build side's input is a RecordBatch which holds batch of
>>>> rows and memory size is approach to L2 cache. Through this strategy it will
>>>> gain less method calls (that means call to next() ) and much efficient to
>>>> cpu calculation.  I also find SQL server's paper noticed the batch model's
>>>> performance gains (https://www.microsoft.com/en-
>>>> us/research/wp-content/uploads/2013/06/Apollo3-Sigmod-2013-final.pdf)
>>>>  .   I guess the performance's down is due to the single row iterate model.
>>>>
>>>>
>>>> Hope someone to correct my opinion. Also maybe I have a wrong use  of
>>>> the MutableHashTable. wait for someone to give an advice.
>>>>
>>>
>>>
>>
>

Re: questions about Flink's HashJoin performance

Posted by Fabian Hueske <fh...@gmail.com>.
Hi,

Flink's HashJoin implementation was designed to gracefully handle inputs
that exceed the main memory.
It is not explicitly optimized for in-memory processing and does not play
fancy tricks like optimizing cache accesses or batching.
I assume your benchmark is about in-memory joins only. This was not the
main design goal when the join was implemented but robustness.
Since most of the development of Flink focuses on streaming applications at
the moment, the join implementation has barely been touched in recent years
(except for minor extensions and bugfixes).

Regarding your tests, Tuple should give better performance than Row because
Row is null-sensitive and serialized a null-mask.
There is also a blog post about Flink's join performance [1] which is
already a bit dusty but as I said, the algorithm hasn't change much since
then.

Best, Fabian

[1]
https://flink.apache.org/news/2015/03/13/peeking-into-Apache-Flinks-Engine-Room.html


2017-05-15 16:26 GMT+02:00 weijie tong <to...@gmail.com>:

> The Flink version is 1.2.0
>
> On Mon, May 15, 2017 at 10:24 PM, weijie tong <to...@gmail.com>
> wrote:
>
>> @Till thanks for your reply.
>>
>> My code is similar to   HashTableITCase.testInMemoryMutableHashTable()
>> . It just use the MutableHashTable class , there's  no other Flink's
>> configuration.  The main code body is:
>>
>> this.recordBuildSideAccessor = RecordSerializer.get();
>>> this.recordProbeSideAccessor = RecordSerializer.get();
>>> final int[] buildKeyPos = new int[]{buildSideJoinIndex};
>>> final int[] probeKeyPos = new int[]{probeSideJoinIndex};
>>> final Class<? extends Value>[] keyType = (Class<? extends Value>[]) new Class[]{BytesValue.class};
>>> this.recordBuildSideComparator = new RecordComparator(buildKeyPos, keyType);
>>> this.recordProbeSideComparator = new RecordComparator(probeKeyPos, keyType);
>>> this.pactRecordComparator = new HashJoinVectorJointGroupIterator.RecordPairComparator(buildSideJoinIndex);
>>> Sequence<Record> buildSideRecordsSeq = makeSequenceRecordOfSameSideSegments(buildSideSegs, localJoinQuery);
>>> Sequence<Record> probeSideRecordsSeq = makeSequenceRecordOfSameSideSegments(probeSideSegs, localJoinQuery);
>>> List<MemorySegment> memorySegments;
>>> int pageSize = hashTableMemoryManager.getTotalNumPages();
>>> try {
>>>   memorySegments = this.hashTableMemoryManager.allocatePages(MEM_OWNER, pageSize);
>>> }
>>> catch (MemoryAllocationException e) {
>>>   LOGGER.error("could not allocate " + pageSize + " pages memory for HashJoin", e);
>>>   Throwables.propagate(e);
>>>   return;
>>> }
>>> try {
>>>   Stopwatch stopwatch = Stopwatch.createStarted();
>>>   UniformRecordGenerator buildInput = new UniformRecordGenerator(buildSideRecordsSeq);
>>>   UniformRecordGenerator probeInput = new UniformRecordGenerator(probeSideRecordsSeq);
>>>   join = new MutableHashTable<Record, Record>(
>>>       recordBuildSideAccessor,
>>>       recordProbeSideAccessor,
>>>       recordBuildSideComparator,
>>>       recordProbeSideComparator,
>>>       pactRecordComparator,
>>>       memorySegments,
>>>       ioManager
>>>   );
>>>   join.open(buildInput,probeInput);
>>>
>>>   LOGGER.info("construct hash table elapsed:" + stopwatch.elapsed(TimeUnit.MILLISECONDS) + "ms");
>>>
>>>
>> The BytesValue type is self defined one which holds byte[] , but just
>> like the original StringValue, also has the same serDe performance.
>>
>>
>> while (join.nextRecord()) {
>>   Record currentProbeRecord = join.getCurrentProbeRecord();
>>   MutableObjectIterator<Record> buildSideIterator = join.getBuildSideIterator();
>>   while (buildSideIterator.next(reusedBuildSideRow) != null) {
>>     materializeRecord2OutVector(reusedBuildSideRow, buildSideIndex2Value, buildSideIndex2Vector, rowNum);
>>     materializeRecord2OutVector(currentProbeRecord, probeSideIndex2Value, probeSideIndex2Vector, rowNum);
>>     rowNum++;
>>   }}
>>
>>
>>
>>
>> I have tried both the Record ,Row class as the type of records without
>> any better improved performance . I also tried batched the input records.
>> That means the  buildInput or probeInput variables of the first code
>> block which iterate one Record a time from another batched Records .
>> Batched records's content stay in memory in Drill's ValueVector format.
>> Once a record is need to participate in the build or probe phase from a
>> iterate.next() call,
>> it will be fetched from the batched in memory ValueVector content. But no
>> performance gains.
>>
>>
>> The top hotspot profile from Jprofiler is below:
>> >
>> Hot spot,"Self time (microseconds)","Average Time","Invocations"
>> org.apache.flink.types.Record.serialize,1014127,"n/a","n/a"
>> org.apache.flink.types.Record.deserialize,60684,"n/a","n/a"
>> org.apache.flink.types.Record.copyTo,83007,"n/a","n/a"
>> org.apache.flink.runtime.operators.hash.MutableHashTable.
>> open,55238,"n/a","n/a"
>> org.apache.flink.runtime.operators.hash.MutableHashTable.
>> nextRecord,10955,"n/a","n/a"
>> org.apache.flink.runtime.memory.MemoryManager.release,33484,"n/a","n/a"
>> org.apache.flink.runtime.memory.MemoryManager.allocatePages,
>> 104259,"n/a","n/a"
>>
>>
>> My log show that hashjoin.open()  method costs too much time.
>> >
>> construct hash table elapsed:1885ms
>>
>>
>>
>>
>> On Mon, May 15, 2017 at 6:20 PM, Till Rohrmann <tr...@apache.org>
>> wrote:
>>
>>> Hi Weijie,
>>>
>>> it might be the case that batching the processing of multiple rows can
>>> give you an improved performance compared to single row processing.
>>>
>>> Maybe you could share the exact benchmark base line results and the code
>>> you use to test Flink's MutableHashTable with us. Also the Flink
>>> configuration and how you run it would be of interest. That way we might be
>>> able to see if we can tune Flink a bit more.
>>>
>>> Cheers,
>>> Till
>>>
>>> On Sun, May 14, 2017 at 5:23 AM, weijie tong <to...@gmail.com>
>>> wrote:
>>>
>>>> I has a test case to use Flink's MutableHashTable class to do a hash
>>>> join on a local machine with 64g memory, 64cores. The test case is one
>>>> build table with 14w rows ,one probe table with 320w rows ,the matched
>>>> result rows is 12 w.
>>>>
>>>> It takes 2.2 seconds to complete the join.The performance seems bad. I
>>>> ensure there's no overflow, the smaller table is the build side. The
>>>> MutableObjectIterator is a sequence of Rows. The Row is composed of several
>>>> fields which are byte[]. Through my log,I find the open() method takes
>>>> 1.560 seconds. The probe iterates phase takes 680ms.  And my Jprofiler's
>>>> profile shows the MutableObjectIterator's next() method call is the
>>>> hotspot.
>>>>
>>>>
>>>> I want to know how to tune this scenario. I find Drill's HashJoin is
>>>> batch model. Its build side's input is a RecordBatch which holds batch of
>>>> rows and memory size is approach to L2 cache. Through this strategy it will
>>>> gain less method calls (that means call to next() ) and much efficient to
>>>> cpu calculation.  I also find SQL server's paper noticed the batch model's
>>>> performance gains (https://www.microsoft.com/en-
>>>> us/research/wp-content/uploads/2013/06/Apollo3-Sigmod-2013-final.pdf)
>>>>  .   I guess the performance's down is due to the single row iterate model.
>>>>
>>>>
>>>> Hope someone to correct my opinion. Also maybe I have a wrong use  of
>>>> the MutableHashTable. wait for someone to give an advice.
>>>>
>>>
>>>
>>
>

Re: questions about Flink's HashJoin performance

Posted by weijie tong <to...@gmail.com>.
The Flink version is 1.2.0

On Mon, May 15, 2017 at 10:24 PM, weijie tong <to...@gmail.com>
wrote:

> @Till thanks for your reply.
>
> My code is similar to   HashTableITCase.testInMemoryMutableHashTable()
> . It just use the MutableHashTable class , there's  no other Flink's
> configuration.  The main code body is:
>
> this.recordBuildSideAccessor = RecordSerializer.get();
>> this.recordProbeSideAccessor = RecordSerializer.get();
>> final int[] buildKeyPos = new int[]{buildSideJoinIndex};
>> final int[] probeKeyPos = new int[]{probeSideJoinIndex};
>> final Class<? extends Value>[] keyType = (Class<? extends Value>[]) new Class[]{BytesValue.class};
>> this.recordBuildSideComparator = new RecordComparator(buildKeyPos, keyType);
>> this.recordProbeSideComparator = new RecordComparator(probeKeyPos, keyType);
>> this.pactRecordComparator = new HashJoinVectorJointGroupIterator.RecordPairComparator(buildSideJoinIndex);
>> Sequence<Record> buildSideRecordsSeq = makeSequenceRecordOfSameSideSegments(buildSideSegs, localJoinQuery);
>> Sequence<Record> probeSideRecordsSeq = makeSequenceRecordOfSameSideSegments(probeSideSegs, localJoinQuery);
>> List<MemorySegment> memorySegments;
>> int pageSize = hashTableMemoryManager.getTotalNumPages();
>> try {
>>   memorySegments = this.hashTableMemoryManager.allocatePages(MEM_OWNER, pageSize);
>> }
>> catch (MemoryAllocationException e) {
>>   LOGGER.error("could not allocate " + pageSize + " pages memory for HashJoin", e);
>>   Throwables.propagate(e);
>>   return;
>> }
>> try {
>>   Stopwatch stopwatch = Stopwatch.createStarted();
>>   UniformRecordGenerator buildInput = new UniformRecordGenerator(buildSideRecordsSeq);
>>   UniformRecordGenerator probeInput = new UniformRecordGenerator(probeSideRecordsSeq);
>>   join = new MutableHashTable<Record, Record>(
>>       recordBuildSideAccessor,
>>       recordProbeSideAccessor,
>>       recordBuildSideComparator,
>>       recordProbeSideComparator,
>>       pactRecordComparator,
>>       memorySegments,
>>       ioManager
>>   );
>>   join.open(buildInput,probeInput);
>>
>>   LOGGER.info("construct hash table elapsed:" + stopwatch.elapsed(TimeUnit.MILLISECONDS) + "ms");
>>
>>
> The BytesValue type is self defined one which holds byte[] , but just like
> the original StringValue, also has the same serDe performance.
>
>
> while (join.nextRecord()) {
>   Record currentProbeRecord = join.getCurrentProbeRecord();
>   MutableObjectIterator<Record> buildSideIterator = join.getBuildSideIterator();
>   while (buildSideIterator.next(reusedBuildSideRow) != null) {
>     materializeRecord2OutVector(reusedBuildSideRow, buildSideIndex2Value, buildSideIndex2Vector, rowNum);
>     materializeRecord2OutVector(currentProbeRecord, probeSideIndex2Value, probeSideIndex2Vector, rowNum);
>     rowNum++;
>   }}
>
>
>
>
> I have tried both the Record ,Row class as the type of records without any
> better improved performance . I also tried batched the input records. That
> means the  buildInput or probeInput variables of the first code block
> which iterate one Record a time from another batched Records . Batched
> records's content stay in memory in Drill's ValueVector format. Once a
> record is need to participate in the build or probe phase from a
> iterate.next() call,
> it will be fetched from the batched in memory ValueVector content. But no
> performance gains.
>
>
> The top hotspot profile from Jprofiler is below:
> >
> Hot spot,"Self time (microseconds)","Average Time","Invocations"
> org.apache.flink.types.Record.serialize,1014127,"n/a","n/a"
> org.apache.flink.types.Record.deserialize,60684,"n/a","n/a"
> org.apache.flink.types.Record.copyTo,83007,"n/a","n/a"
> org.apache.flink.runtime.operators.hash.MutableHashTable.open,55238,"
> n/a","n/a"
> org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord,
> 10955,"n/a","n/a"
> org.apache.flink.runtime.memory.MemoryManager.release,33484,"n/a","n/a"
> org.apache.flink.runtime.memory.MemoryManager.
> allocatePages,104259,"n/a","n/a"
>
>
> My log show that hashjoin.open()  method costs too much time.
> >
> construct hash table elapsed:1885ms
>
>
>
>
> On Mon, May 15, 2017 at 6:20 PM, Till Rohrmann <tr...@apache.org>
> wrote:
>
>> Hi Weijie,
>>
>> it might be the case that batching the processing of multiple rows can
>> give you an improved performance compared to single row processing.
>>
>> Maybe you could share the exact benchmark base line results and the code
>> you use to test Flink's MutableHashTable with us. Also the Flink
>> configuration and how you run it would be of interest. That way we might be
>> able to see if we can tune Flink a bit more.
>>
>> Cheers,
>> Till
>>
>> On Sun, May 14, 2017 at 5:23 AM, weijie tong <to...@gmail.com>
>> wrote:
>>
>>> I has a test case to use Flink's MutableHashTable class to do a hash
>>> join on a local machine with 64g memory, 64cores. The test case is one
>>> build table with 14w rows ,one probe table with 320w rows ,the matched
>>> result rows is 12 w.
>>>
>>> It takes 2.2 seconds to complete the join.The performance seems bad. I
>>> ensure there's no overflow, the smaller table is the build side. The
>>> MutableObjectIterator is a sequence of Rows. The Row is composed of several
>>> fields which are byte[]. Through my log,I find the open() method takes
>>> 1.560 seconds. The probe iterates phase takes 680ms.  And my Jprofiler's
>>> profile shows the MutableObjectIterator's next() method call is the
>>> hotspot.
>>>
>>>
>>> I want to know how to tune this scenario. I find Drill's HashJoin is
>>> batch model. Its build side's input is a RecordBatch which holds batch of
>>> rows and memory size is approach to L2 cache. Through this strategy it will
>>> gain less method calls (that means call to next() ) and much efficient to
>>> cpu calculation.  I also find SQL server's paper noticed the batch model's
>>> performance gains (https://www.microsoft.com/en-
>>> us/research/wp-content/uploads/2013/06/Apollo3-Sigmod-2013-final.pdf)
>>>  .   I guess the performance's down is due to the single row iterate model.
>>>
>>>
>>> Hope someone to correct my opinion. Also maybe I have a wrong use  of
>>> the MutableHashTable. wait for someone to give an advice.
>>>
>>
>>
>

Re: questions about Flink's HashJoin performance

Posted by weijie tong <to...@gmail.com>.
The Flink version is 1.2.0

On Mon, May 15, 2017 at 10:24 PM, weijie tong <to...@gmail.com>
wrote:

> @Till thanks for your reply.
>
> My code is similar to   HashTableITCase.testInMemoryMutableHashTable()
> . It just use the MutableHashTable class , there's  no other Flink's
> configuration.  The main code body is:
>
> this.recordBuildSideAccessor = RecordSerializer.get();
>> this.recordProbeSideAccessor = RecordSerializer.get();
>> final int[] buildKeyPos = new int[]{buildSideJoinIndex};
>> final int[] probeKeyPos = new int[]{probeSideJoinIndex};
>> final Class<? extends Value>[] keyType = (Class<? extends Value>[]) new Class[]{BytesValue.class};
>> this.recordBuildSideComparator = new RecordComparator(buildKeyPos, keyType);
>> this.recordProbeSideComparator = new RecordComparator(probeKeyPos, keyType);
>> this.pactRecordComparator = new HashJoinVectorJointGroupIterator.RecordPairComparator(buildSideJoinIndex);
>> Sequence<Record> buildSideRecordsSeq = makeSequenceRecordOfSameSideSegments(buildSideSegs, localJoinQuery);
>> Sequence<Record> probeSideRecordsSeq = makeSequenceRecordOfSameSideSegments(probeSideSegs, localJoinQuery);
>> List<MemorySegment> memorySegments;
>> int pageSize = hashTableMemoryManager.getTotalNumPages();
>> try {
>>   memorySegments = this.hashTableMemoryManager.allocatePages(MEM_OWNER, pageSize);
>> }
>> catch (MemoryAllocationException e) {
>>   LOGGER.error("could not allocate " + pageSize + " pages memory for HashJoin", e);
>>   Throwables.propagate(e);
>>   return;
>> }
>> try {
>>   Stopwatch stopwatch = Stopwatch.createStarted();
>>   UniformRecordGenerator buildInput = new UniformRecordGenerator(buildSideRecordsSeq);
>>   UniformRecordGenerator probeInput = new UniformRecordGenerator(probeSideRecordsSeq);
>>   join = new MutableHashTable<Record, Record>(
>>       recordBuildSideAccessor,
>>       recordProbeSideAccessor,
>>       recordBuildSideComparator,
>>       recordProbeSideComparator,
>>       pactRecordComparator,
>>       memorySegments,
>>       ioManager
>>   );
>>   join.open(buildInput,probeInput);
>>
>>   LOGGER.info("construct hash table elapsed:" + stopwatch.elapsed(TimeUnit.MILLISECONDS) + "ms");
>>
>>
> The BytesValue type is self defined one which holds byte[] , but just like
> the original StringValue, also has the same serDe performance.
>
>
> while (join.nextRecord()) {
>   Record currentProbeRecord = join.getCurrentProbeRecord();
>   MutableObjectIterator<Record> buildSideIterator = join.getBuildSideIterator();
>   while (buildSideIterator.next(reusedBuildSideRow) != null) {
>     materializeRecord2OutVector(reusedBuildSideRow, buildSideIndex2Value, buildSideIndex2Vector, rowNum);
>     materializeRecord2OutVector(currentProbeRecord, probeSideIndex2Value, probeSideIndex2Vector, rowNum);
>     rowNum++;
>   }}
>
>
>
>
> I have tried both the Record ,Row class as the type of records without any
> better improved performance . I also tried batched the input records. That
> means the  buildInput or probeInput variables of the first code block
> which iterate one Record a time from another batched Records . Batched
> records's content stay in memory in Drill's ValueVector format. Once a
> record is need to participate in the build or probe phase from a
> iterate.next() call,
> it will be fetched from the batched in memory ValueVector content. But no
> performance gains.
>
>
> The top hotspot profile from Jprofiler is below:
> >
> Hot spot,"Self time (microseconds)","Average Time","Invocations"
> org.apache.flink.types.Record.serialize,1014127,"n/a","n/a"
> org.apache.flink.types.Record.deserialize,60684,"n/a","n/a"
> org.apache.flink.types.Record.copyTo,83007,"n/a","n/a"
> org.apache.flink.runtime.operators.hash.MutableHashTable.open,55238,"
> n/a","n/a"
> org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord,
> 10955,"n/a","n/a"
> org.apache.flink.runtime.memory.MemoryManager.release,33484,"n/a","n/a"
> org.apache.flink.runtime.memory.MemoryManager.
> allocatePages,104259,"n/a","n/a"
>
>
> My log show that hashjoin.open()  method costs too much time.
> >
> construct hash table elapsed:1885ms
>
>
>
>
> On Mon, May 15, 2017 at 6:20 PM, Till Rohrmann <tr...@apache.org>
> wrote:
>
>> Hi Weijie,
>>
>> it might be the case that batching the processing of multiple rows can
>> give you an improved performance compared to single row processing.
>>
>> Maybe you could share the exact benchmark base line results and the code
>> you use to test Flink's MutableHashTable with us. Also the Flink
>> configuration and how you run it would be of interest. That way we might be
>> able to see if we can tune Flink a bit more.
>>
>> Cheers,
>> Till
>>
>> On Sun, May 14, 2017 at 5:23 AM, weijie tong <to...@gmail.com>
>> wrote:
>>
>>> I has a test case to use Flink's MutableHashTable class to do a hash
>>> join on a local machine with 64g memory, 64cores. The test case is one
>>> build table with 14w rows ,one probe table with 320w rows ,the matched
>>> result rows is 12 w.
>>>
>>> It takes 2.2 seconds to complete the join.The performance seems bad. I
>>> ensure there's no overflow, the smaller table is the build side. The
>>> MutableObjectIterator is a sequence of Rows. The Row is composed of several
>>> fields which are byte[]. Through my log,I find the open() method takes
>>> 1.560 seconds. The probe iterates phase takes 680ms.  And my Jprofiler's
>>> profile shows the MutableObjectIterator's next() method call is the
>>> hotspot.
>>>
>>>
>>> I want to know how to tune this scenario. I find Drill's HashJoin is
>>> batch model. Its build side's input is a RecordBatch which holds batch of
>>> rows and memory size is approach to L2 cache. Through this strategy it will
>>> gain less method calls (that means call to next() ) and much efficient to
>>> cpu calculation.  I also find SQL server's paper noticed the batch model's
>>> performance gains (https://www.microsoft.com/en-
>>> us/research/wp-content/uploads/2013/06/Apollo3-Sigmod-2013-final.pdf)
>>>  .   I guess the performance's down is due to the single row iterate model.
>>>
>>>
>>> Hope someone to correct my opinion. Also maybe I have a wrong use  of
>>> the MutableHashTable. wait for someone to give an advice.
>>>
>>
>>
>

Re: questions about Flink's HashJoin performance

Posted by weijie tong <to...@gmail.com>.
@Till thanks for your reply.

My code is similar to   HashTableITCase.testInMemoryMutableHashTable()   .
It just use the MutableHashTable class , there's  no other Flink's
configuration.  The main code body is:

this.recordBuildSideAccessor = RecordSerializer.get();
> this.recordProbeSideAccessor = RecordSerializer.get();
> final int[] buildKeyPos = new int[]{buildSideJoinIndex};
> final int[] probeKeyPos = new int[]{probeSideJoinIndex};
> final Class<? extends Value>[] keyType = (Class<? extends Value>[]) new Class[]{BytesValue.class};
> this.recordBuildSideComparator = new RecordComparator(buildKeyPos, keyType);
> this.recordProbeSideComparator = new RecordComparator(probeKeyPos, keyType);
> this.pactRecordComparator = new HashJoinVectorJointGroupIterator.RecordPairComparator(buildSideJoinIndex);
> Sequence<Record> buildSideRecordsSeq = makeSequenceRecordOfSameSideSegments(buildSideSegs, localJoinQuery);
> Sequence<Record> probeSideRecordsSeq = makeSequenceRecordOfSameSideSegments(probeSideSegs, localJoinQuery);
> List<MemorySegment> memorySegments;
> int pageSize = hashTableMemoryManager.getTotalNumPages();
> try {
>   memorySegments = this.hashTableMemoryManager.allocatePages(MEM_OWNER, pageSize);
> }
> catch (MemoryAllocationException e) {
>   LOGGER.error("could not allocate " + pageSize + " pages memory for HashJoin", e);
>   Throwables.propagate(e);
>   return;
> }
> try {
>   Stopwatch stopwatch = Stopwatch.createStarted();
>   UniformRecordGenerator buildInput = new UniformRecordGenerator(buildSideRecordsSeq);
>   UniformRecordGenerator probeInput = new UniformRecordGenerator(probeSideRecordsSeq);
>   join = new MutableHashTable<Record, Record>(
>       recordBuildSideAccessor,
>       recordProbeSideAccessor,
>       recordBuildSideComparator,
>       recordProbeSideComparator,
>       pactRecordComparator,
>       memorySegments,
>       ioManager
>   );
>   join.open(buildInput,probeInput);
>
>   LOGGER.info("construct hash table elapsed:" + stopwatch.elapsed(TimeUnit.MILLISECONDS) + "ms");
>
>
The BytesValue type is self defined one which holds byte[] , but just like
the original StringValue, also has the same serDe performance.


while (join.nextRecord()) {
  Record currentProbeRecord = join.getCurrentProbeRecord();
  MutableObjectIterator<Record> buildSideIterator = join.getBuildSideIterator();
  while (buildSideIterator.next(reusedBuildSideRow) != null) {
    materializeRecord2OutVector(reusedBuildSideRow,
buildSideIndex2Value, buildSideIndex2Vector, rowNum);
    materializeRecord2OutVector(currentProbeRecord,
probeSideIndex2Value, probeSideIndex2Vector, rowNum);
    rowNum++;
  }}




I have tried both the Record ,Row class as the type of records without any
better improved performance . I also tried batched the input records. That
means the  buildInput or probeInput variables of the first code block which
iterate one Record a time from another batched Records . Batched records's
content stay in memory in Drill's ValueVector format. Once a record is need
to participate in the build or probe phase from a iterate.next() call,
it will be fetched from the batched in memory ValueVector content. But no
performance gains.


The top hotspot profile from Jprofiler is below:
>
Hot spot,"Self time (microseconds)","Average Time","Invocations"
org.apache.flink.types.Record.serialize,1014127,"n/a","n/a"
org.apache.flink.types.Record.deserialize,60684,"n/a","n/a"
org.apache.flink.types.Record.copyTo,83007,"n/a","n/a"
org.apache.flink.runtime.operators.hash.MutableHashTable.open,55238,"n/a","n/a"
org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord,10955,"n/a","n/a"
org.apache.flink.runtime.memory.MemoryManager.release,33484,"n/a","n/a"
org.apache.flink.runtime.memory.MemoryManager.allocatePages,104259,"n/a","n/a"


My log show that hashjoin.open()  method costs too much time.
>
construct hash table elapsed:1885ms




On Mon, May 15, 2017 at 6:20 PM, Till Rohrmann <tr...@apache.org> wrote:

> Hi Weijie,
>
> it might be the case that batching the processing of multiple rows can
> give you an improved performance compared to single row processing.
>
> Maybe you could share the exact benchmark base line results and the code
> you use to test Flink's MutableHashTable with us. Also the Flink
> configuration and how you run it would be of interest. That way we might be
> able to see if we can tune Flink a bit more.
>
> Cheers,
> Till
>
> On Sun, May 14, 2017 at 5:23 AM, weijie tong <to...@gmail.com>
> wrote:
>
>> I has a test case to use Flink's MutableHashTable class to do a hash join
>> on a local machine with 64g memory, 64cores. The test case is one build
>> table with 14w rows ,one probe table with 320w rows ,the matched result
>> rows is 12 w.
>>
>> It takes 2.2 seconds to complete the join.The performance seems bad. I
>> ensure there's no overflow, the smaller table is the build side. The
>> MutableObjectIterator is a sequence of Rows. The Row is composed of several
>> fields which are byte[]. Through my log,I find the open() method takes
>> 1.560 seconds. The probe iterates phase takes 680ms.  And my Jprofiler's
>> profile shows the MutableObjectIterator's next() method call is the
>> hotspot.
>>
>>
>> I want to know how to tune this scenario. I find Drill's HashJoin is
>> batch model. Its build side's input is a RecordBatch which holds batch of
>> rows and memory size is approach to L2 cache. Through this strategy it will
>> gain less method calls (that means call to next() ) and much efficient to
>> cpu calculation.  I also find SQL server's paper noticed the batch model's
>> performance gains (https://www.microsoft.com/en-
>> us/research/wp-content/uploads/2013/06/Apollo3-Sigmod-2013-final.pdf)  .
>>   I guess the performance's down is due to the single row iterate model.
>>
>>
>> Hope someone to correct my opinion. Also maybe I have a wrong use  of the
>> MutableHashTable. wait for someone to give an advice.
>>
>
>

Re: questions about Flink's HashJoin performance

Posted by weijie tong <to...@gmail.com>.
@Till thanks for your reply.

My code is similar to   HashTableITCase.testInMemoryMutableHashTable()   .
It just use the MutableHashTable class , there's  no other Flink's
configuration.  The main code body is:

this.recordBuildSideAccessor = RecordSerializer.get();
> this.recordProbeSideAccessor = RecordSerializer.get();
> final int[] buildKeyPos = new int[]{buildSideJoinIndex};
> final int[] probeKeyPos = new int[]{probeSideJoinIndex};
> final Class<? extends Value>[] keyType = (Class<? extends Value>[]) new Class[]{BytesValue.class};
> this.recordBuildSideComparator = new RecordComparator(buildKeyPos, keyType);
> this.recordProbeSideComparator = new RecordComparator(probeKeyPos, keyType);
> this.pactRecordComparator = new HashJoinVectorJointGroupIterator.RecordPairComparator(buildSideJoinIndex);
> Sequence<Record> buildSideRecordsSeq = makeSequenceRecordOfSameSideSegments(buildSideSegs, localJoinQuery);
> Sequence<Record> probeSideRecordsSeq = makeSequenceRecordOfSameSideSegments(probeSideSegs, localJoinQuery);
> List<MemorySegment> memorySegments;
> int pageSize = hashTableMemoryManager.getTotalNumPages();
> try {
>   memorySegments = this.hashTableMemoryManager.allocatePages(MEM_OWNER, pageSize);
> }
> catch (MemoryAllocationException e) {
>   LOGGER.error("could not allocate " + pageSize + " pages memory for HashJoin", e);
>   Throwables.propagate(e);
>   return;
> }
> try {
>   Stopwatch stopwatch = Stopwatch.createStarted();
>   UniformRecordGenerator buildInput = new UniformRecordGenerator(buildSideRecordsSeq);
>   UniformRecordGenerator probeInput = new UniformRecordGenerator(probeSideRecordsSeq);
>   join = new MutableHashTable<Record, Record>(
>       recordBuildSideAccessor,
>       recordProbeSideAccessor,
>       recordBuildSideComparator,
>       recordProbeSideComparator,
>       pactRecordComparator,
>       memorySegments,
>       ioManager
>   );
>   join.open(buildInput,probeInput);
>
>   LOGGER.info("construct hash table elapsed:" + stopwatch.elapsed(TimeUnit.MILLISECONDS) + "ms");
>
>
The BytesValue type is self defined one which holds byte[] , but just like
the original StringValue, also has the same serDe performance.


while (join.nextRecord()) {
  Record currentProbeRecord = join.getCurrentProbeRecord();
  MutableObjectIterator<Record> buildSideIterator = join.getBuildSideIterator();
  while (buildSideIterator.next(reusedBuildSideRow) != null) {
    materializeRecord2OutVector(reusedBuildSideRow,
buildSideIndex2Value, buildSideIndex2Vector, rowNum);
    materializeRecord2OutVector(currentProbeRecord,
probeSideIndex2Value, probeSideIndex2Vector, rowNum);
    rowNum++;
  }}




I have tried both the Record ,Row class as the type of records without any
better improved performance . I also tried batched the input records. That
means the  buildInput or probeInput variables of the first code block which
iterate one Record a time from another batched Records . Batched records's
content stay in memory in Drill's ValueVector format. Once a record is need
to participate in the build or probe phase from a iterate.next() call,
it will be fetched from the batched in memory ValueVector content. But no
performance gains.


The top hotspot profile from Jprofiler is below:
>
Hot spot,"Self time (microseconds)","Average Time","Invocations"
org.apache.flink.types.Record.serialize,1014127,"n/a","n/a"
org.apache.flink.types.Record.deserialize,60684,"n/a","n/a"
org.apache.flink.types.Record.copyTo,83007,"n/a","n/a"
org.apache.flink.runtime.operators.hash.MutableHashTable.open,55238,"n/a","n/a"
org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord,10955,"n/a","n/a"
org.apache.flink.runtime.memory.MemoryManager.release,33484,"n/a","n/a"
org.apache.flink.runtime.memory.MemoryManager.allocatePages,104259,"n/a","n/a"


My log show that hashjoin.open()  method costs too much time.
>
construct hash table elapsed:1885ms




On Mon, May 15, 2017 at 6:20 PM, Till Rohrmann <tr...@apache.org> wrote:

> Hi Weijie,
>
> it might be the case that batching the processing of multiple rows can
> give you an improved performance compared to single row processing.
>
> Maybe you could share the exact benchmark base line results and the code
> you use to test Flink's MutableHashTable with us. Also the Flink
> configuration and how you run it would be of interest. That way we might be
> able to see if we can tune Flink a bit more.
>
> Cheers,
> Till
>
> On Sun, May 14, 2017 at 5:23 AM, weijie tong <to...@gmail.com>
> wrote:
>
>> I has a test case to use Flink's MutableHashTable class to do a hash join
>> on a local machine with 64g memory, 64cores. The test case is one build
>> table with 14w rows ,one probe table with 320w rows ,the matched result
>> rows is 12 w.
>>
>> It takes 2.2 seconds to complete the join.The performance seems bad. I
>> ensure there's no overflow, the smaller table is the build side. The
>> MutableObjectIterator is a sequence of Rows. The Row is composed of several
>> fields which are byte[]. Through my log,I find the open() method takes
>> 1.560 seconds. The probe iterates phase takes 680ms.  And my Jprofiler's
>> profile shows the MutableObjectIterator's next() method call is the
>> hotspot.
>>
>>
>> I want to know how to tune this scenario. I find Drill's HashJoin is
>> batch model. Its build side's input is a RecordBatch which holds batch of
>> rows and memory size is approach to L2 cache. Through this strategy it will
>> gain less method calls (that means call to next() ) and much efficient to
>> cpu calculation.  I also find SQL server's paper noticed the batch model's
>> performance gains (https://www.microsoft.com/en-
>> us/research/wp-content/uploads/2013/06/Apollo3-Sigmod-2013-final.pdf)  .
>>   I guess the performance's down is due to the single row iterate model.
>>
>>
>> Hope someone to correct my opinion. Also maybe I have a wrong use  of the
>> MutableHashTable. wait for someone to give an advice.
>>
>
>

Re: questions about Flink's HashJoin performance

Posted by Till Rohrmann <tr...@apache.org>.
Hi Weijie,

it might be the case that batching the processing of multiple rows can give
you an improved performance compared to single row processing.

Maybe you could share the exact benchmark base line results and the code
you use to test Flink's MutableHashTable with us. Also the Flink
configuration and how you run it would be of interest. That way we might be
able to see if we can tune Flink a bit more.

Cheers,
Till

On Sun, May 14, 2017 at 5:23 AM, weijie tong <to...@gmail.com>
wrote:

> I has a test case to use Flink's MutableHashTable class to do a hash join
> on a local machine with 64g memory, 64cores. The test case is one build
> table with 14w rows ,one probe table with 320w rows ,the matched result
> rows is 12 w.
>
> It takes 2.2 seconds to complete the join.The performance seems bad. I
> ensure there's no overflow, the smaller table is the build side. The
> MutableObjectIterator is a sequence of Rows. The Row is composed of several
> fields which are byte[]. Through my log,I find the open() method takes
> 1.560 seconds. The probe iterates phase takes 680ms.  And my Jprofiler's
> profile shows the MutableObjectIterator's next() method call is the
> hotspot.
>
>
> I want to know how to tune this scenario. I find Drill's HashJoin is batch
> model. Its build side's input is a RecordBatch which holds batch of rows
> and memory size is approach to L2 cache. Through this strategy it will gain
> less method calls (that means call to next() ) and much efficient to cpu
> calculation.  I also find SQL server's paper noticed the batch model's
> performance gains (https://www.microsoft.com/en-us/research/wp-content/
> uploads/2013/06/Apollo3-Sigmod-2013-final.pdf)  .   I guess the
> performance's down is due to the single row iterate model.
>
>
> Hope someone to correct my opinion. Also maybe I have a wrong use  of the
> MutableHashTable. wait for someone to give an advice.
>

Re: questions about Flink's HashJoin performance

Posted by Till Rohrmann <tr...@apache.org>.
Hi Weijie,

it might be the case that batching the processing of multiple rows can give
you an improved performance compared to single row processing.

Maybe you could share the exact benchmark base line results and the code
you use to test Flink's MutableHashTable with us. Also the Flink
configuration and how you run it would be of interest. That way we might be
able to see if we can tune Flink a bit more.

Cheers,
Till

On Sun, May 14, 2017 at 5:23 AM, weijie tong <to...@gmail.com>
wrote:

> I has a test case to use Flink's MutableHashTable class to do a hash join
> on a local machine with 64g memory, 64cores. The test case is one build
> table with 14w rows ,one probe table with 320w rows ,the matched result
> rows is 12 w.
>
> It takes 2.2 seconds to complete the join.The performance seems bad. I
> ensure there's no overflow, the smaller table is the build side. The
> MutableObjectIterator is a sequence of Rows. The Row is composed of several
> fields which are byte[]. Through my log,I find the open() method takes
> 1.560 seconds. The probe iterates phase takes 680ms.  And my Jprofiler's
> profile shows the MutableObjectIterator's next() method call is the
> hotspot.
>
>
> I want to know how to tune this scenario. I find Drill's HashJoin is batch
> model. Its build side's input is a RecordBatch which holds batch of rows
> and memory size is approach to L2 cache. Through this strategy it will gain
> less method calls (that means call to next() ) and much efficient to cpu
> calculation.  I also find SQL server's paper noticed the batch model's
> performance gains (https://www.microsoft.com/en-us/research/wp-content/
> uploads/2013/06/Apollo3-Sigmod-2013-final.pdf)  .   I guess the
> performance's down is due to the single row iterate model.
>
>
> Hope someone to correct my opinion. Also maybe I have a wrong use  of the
> MutableHashTable. wait for someone to give an advice.
>