You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@hama.apache.org by "Thomas Jungblut (JIRA)" <ji...@apache.org> on 2012/10/17 22:36:04 UTC

[jira] [Commented] (HAMA-559) Add a spilling message queue

    [ https://issues.apache.org/jira/browse/HAMA-559?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13478342#comment-13478342 ] 

Thomas Jungblut commented on HAMA-559:
--------------------------------------

Good work, looks solid. 

Errors:

- messageClass_ is not initialized = NPEs
- directories before the filename needs to be checked if exists (need to be mkdir'd or deleted)
- in the threadpool shutdown, use shutdownNow
- SpillWriteIndexStatus waits forever in the main thread (and never finishes the process), see the threaddump

{noformat}
"main" prio=6 tid=0x0000000001d7a000 nid=0x1408 in Object.wait() [0x000000000213e000]
   java.lang.Thread.State: WAITING (on object monitor)
        at java.lang.Object.wait(Native Method)
        - waiting on <0x000000004031e190> (a org.apache.hama.bsp.message.SpillingBuffer$SpillWriteIndexStatus)
        at java.lang.Object.wait(Object.java:503)
        at org.apache.hama.bsp.message.SpillingBuffer$SpillWriteIndexStatus.startSpilling(SpillingBuffer.java:243)
        - locked <0x000000004031e190> (a org.apache.hama.bsp.message.SpillingBuffer$SpillWriteIndexStatus)
        at org.apache.hama.bsp.message.SpillingBuffer$SpillingStream.incBytesWritten(SpillingBuffer.java:368)
        - locked <0x000000004031dfa0> (a org.apache.hama.bsp.message.SpillingBuffer$SpillingStream)
        at org.apache.hama.bsp.message.SpillingBuffer$SpillingStream.write(SpillingBuffer.java:398)
        at org.apache.hama.bsp.message.SpillingBuffer$SpillingStream.write(SpillingBuffer.java:349)
        at org.apache.hama.bsp.message.SpillingBuffer$SpillingStream.write(SpillingBuffer.java:344)
        at java.io.DataOutputStream.writeInt(DataOutputStream.java:197)
        at org.apache.hadoop.io.IntWritable.write(IntWritable.java:42)
        at org.apache.hama.bsp.message.SpillingQueue.add(SpillingQueue.java:97)
        at org.apache.hama.bsp.message.SpillingQueue.add(SpillingQueue.java:1)
        at de.jungblut.benchmark.SpillBenchmark.timeSpill(SpillBenchmark.java:55)
        at sun.reflect.GeneratedMethodAccessor3.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:601)
        at com.google.caliper.SimpleBenchmark$1.run(SimpleBenchmark.java:125)
        at com.google.caliper.TimeMeasurer.measureReps(TimeMeasurer.java:184)
        at com.google.caliper.TimeMeasurer.warmUp(TimeMeasurer.java:63)
        at com.google.caliper.TimeMeasurer.run(TimeMeasurer.java:127)
        at com.google.caliper.InProcessRunner.run(InProcessRunner.java:74)
        at com.google.caliper.InProcessRunner.run(InProcessRunner.java:49)
        at com.google.caliper.InProcessRunner.main(InProcessRunner.java:103)

{noformat}

Just a few suggestions:

- SpillIterator can be extended from AbstractIterator in Guava, looks cleaner and does not implement remove as well.
- everything which involves bytes or buffer sizes in the init and configuration should be long instead of int
- Instead of "java.io.tmpdir" we should use the same directory from the diskqueue and instead of a random hash use the task id
- In the buffer we should stay with normal java conventions and don't use _ as suffix.
- Some annotations are not present, parameter names are autogenerated and a few empty javadocs, nothing special
- There are some sysouts (better use the logs)

In case of synchronized receives from RPC, we can use Apurvs segmentation for multiple buffers to increase throughput and decrease locking.

I benched with Caliper against the diskqueue (raw inserts, polls can be benched later on):

https://gist.github.com/3907709

Result (is biased, I have commented out the part that locked the main thread and do the actual spill):

{noformat}

 0% Scenario{vm=java, trial=0, benchmark=Spill, size=10, type=SPILLING_QUEUE} 2876,07 ns; s=40,55 ns @ 10 trials
 8% Scenario{vm=java, trial=0, benchmark=Spill, size=100, type=SPILLING_QUEUE} 9316,85 ns; s=78,40 ns @ 3 trials
17% Scenario{vm=java, trial=0, benchmark=Spill, size=1000, type=SPILLING_QUEUE} 70874,27 ns; s=221,37 ns @ 3 trials
25% Scenario{vm=java, trial=0, benchmark=Spill, size=10000, type=SPILLING_QUEUE} 684388,29 ns; s=1840,57 ns @ 3 trials
33% Scenario{vm=java, trial=0, benchmark=Spill, size=100000, type=SPILLING_QUEUE} 6910806,76 ns; s=9592,61 ns @ 3 trials
42% Scenario{vm=java, trial=0, benchmark=Spill, size=1000000, type=SPILLING_QUEUE} 69555994,62 ns; s=296212,42 ns @ 3 trials
50% Scenario{vm=java, trial=0, benchmark=Spill, size=10, type=DISK_QUEUE} 23428,43 ns; s=203,91 ns @ 4 trials
58% Scenario{vm=java, trial=0, benchmark=Spill, size=100, type=DISK_QUEUE} 203707,18 ns; s=1880,11 ns @ 5 trials
67% Scenario{vm=java, trial=0, benchmark=Spill, size=1000, type=DISK_QUEUE} 2085204,12 ns; s=20188,91 ns @ 3 trials
75% Scenario{vm=java, trial=0, benchmark=Spill, size=10000, type=DISK_QUEUE} 20315540,28 ns; s=1130985,60 ns @ 10 trials
83% Scenario{vm=java, trial=0, benchmark=Spill, size=100000, type=DISK_QUEUE} 196685169,60 ns; s=1888695,70 ns @ 6 trials
92% Scenario{vm=java, trial=0, benchmark=Spill, size=1000000, type=DISK_QUEUE} 2093004128,00 ns; s=1216169,56 ns @ 3 trials

          type    size         us linear runtime
SPILLING_QUEUE      10       2,88 =
SPILLING_QUEUE     100       9,32 =
SPILLING_QUEUE    1000      70,87 =
SPILLING_QUEUE   10000     684,39 =
SPILLING_QUEUE  100000    6910,81 =
SPILLING_QUEUE 1000000   69555,99 =
    DISK_QUEUE      10      23,43 =
    DISK_QUEUE     100     203,71 =
    DISK_QUEUE    1000    2085,20 =
    DISK_QUEUE   10000   20315,54 =
    DISK_QUEUE  100000  196685,17 ==
    DISK_QUEUE 1000000 2093004,13 ==============================

vm: java
trial: 0
benchmark: Spill

{noformat}



TODO: profiling :)
                
> Add a spilling message queue
> ----------------------------
>
>                 Key: HAMA-559
>                 URL: https://issues.apache.org/jira/browse/HAMA-559
>             Project: Hama
>          Issue Type: Sub-task
>          Components: bsp core
>    Affects Versions: 0.5.0
>            Reporter: Thomas Jungblut
>            Assignee: Suraj Menon
>            Priority: Minor
>             Fix For: 0.6.0
>
>         Attachments: HAMA-559.patch-v1
>
>
> After HAMA-521 is done, we can add a spilling queue which just holds the messages in RAM that fit into the heap space. The rest can be flushed to disk.
> We may call this a HybridQueue or something like that.
> The benefits should be that we don't have to flush to disk so often and get faster. However we may have more GC so it is always overall faster.
> The requirements for this queue also include:
> - The message object once written to the queue (after returning from the write call) could be modified, but the changes should not be reflected in the messages stored in the queue.
> - For now let's implement a queue that does not support concurrent reading and writing. This feature is needed when we implement asynchronous communication.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira