You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2016/05/09 14:56:13 UTC
[jira] [Commented] (FLINK-3880) Improve performance of Accumulator
map
[ https://issues.apache.org/jira/browse/FLINK-3880?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15276460#comment-15276460 ]
ASF GitHub Bot commented on FLINK-3880:
---------------------------------------
GitHub user mxm opened a pull request:
https://github.com/apache/flink/pull/1976
[FLINK-3880] remove mutex for user accumulators hash map
This has been reported as a performance impact. The synchronized lock is not necessary because Flink doesn't perform concurrent updates on the per-task map. If users want to access concurrently, they could implement their own lock.
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/mxm/flink FLINK-3880
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/flink/pull/1976.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #1976
----
commit 1821ecbc3fb4866bc44eb2bcb8d4e7464f740416
Author: Maximilian Michels <mx...@apache.org>
Date: 2016-05-06T08:40:51Z
[FLINK-3880] remove mutex for user accumulators hash map
----
> Improve performance of Accumulator map
> --------------------------------------
>
> Key: FLINK-3880
> URL: https://issues.apache.org/jira/browse/FLINK-3880
> Project: Flink
> Issue Type: Improvement
> Affects Versions: 1.1.0
> Reporter: Ken Krugler
> Assignee: Maximilian Michels
> Priority: Minor
>
> I was looking at improving DataSet performance - this is for a job created using the Cascading-Flink planner for Cascading 3.1.
> While doing a quick "poor man's profiler" session with one of the TaskManager processes, I noticed that many (most?) of the threads that were actually running were in this state:
> {code:java}
> "DataSource (/working1/terms) (8/20)" daemon prio=10 tid=0x00007f55673e0800 nid=0x666a runnable [0x00007f556abcf000]
> java.lang.Thread.State: RUNNABLE
> at java.util.Collections$SynchronizedMap.get(Collections.java:2037)
> - locked <0x00000006e73fe718> (a java.util.Collections$SynchronizedMap)
> at org.apache.flink.api.common.functions.util.AbstractRuntimeUDFContext.getAccumulator(AbstractRuntimeUDFContext.java:162)
> at org.apache.flink.api.common.functions.util.AbstractRuntimeUDFContext.getLongCounter(AbstractRuntimeUDFContext.java:113)
> at com.dataartisans.flink.cascading.runtime.util.FlinkFlowProcess.getOrInitCounter(FlinkFlowProcess.java:245)
> at com.dataartisans.flink.cascading.runtime.util.FlinkFlowProcess.increment(FlinkFlowProcess.java:128)
> at com.dataartisans.flink.cascading.runtime.util.FlinkFlowProcess.increment(FlinkFlowProcess.java:122)
> at cascading.tap.hadoop.util.MeasuredRecordReader.next(MeasuredRecordReader.java:65)
> at cascading.scheme.hadoop.SequenceFile.source(SequenceFile.java:97)
> at cascading.tuple.TupleEntrySchemeIterator.getNext(TupleEntrySchemeIterator.java:166)
> at cascading.tuple.TupleEntrySchemeIterator.hasNext(TupleEntrySchemeIterator.java:139)
> at com.dataartisans.flink.cascading.runtime.source.TapSourceStage.readNextRecord(TapSourceStage.java:70)
> at com.dataartisans.flink.cascading.runtime.source.TapInputFormat.reachedEnd(TapInputFormat.java:175)
> at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:173)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
> at java.lang.Thread.run(Thread.java:745)}}}
> {code}
> It looks like Cascading is asking Flink to increment a counter with each Tuple read, and that in turn is often blocked on getting access to the Accumulator object in a map. It looks like this is a SynchronizedMap, but using a ConcurrentHashMap (for example) would reduce this contention.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)