You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by mb...@apache.org on 2016/02/14 23:15:58 UTC
[3/3] incubator-systemml git commit: Performance mr
iqm/quantile/median (qsort num red, qpick buffer size)
Performance mr iqm/quantile/median (qsort num red, qpick buffer size)
Our mr iqm/quantile/median realization via qsort and qpick showed severe
scalability bottlenecks on large data. The root cause was that the
out-of-core qpick reads -- according to the required quantiles -- one or
two partitions into CP, where the partition sizes are determined by the
number of reducers of SortMR. This performance patch makes two
improvements: (1) setting the number of reducers for SortMR (except for
order) such that a single partition has at most 10M records (~128MB)
which bounds the read for qpick, and (2) a larger io buffer size of 64KB
for qpick which makes the partition read slightly faster compared to the
4KB default (~20%).
Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/7f8716b6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/7f8716b6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/7f8716b6
Branch: refs/heads/master
Commit: 7f8716b641a45dbc77713710979cbf331d89ae8a
Parents: 240143b
Author: Matthias Boehm <mb...@us.ibm.com>
Authored: Sun Feb 14 01:53:42 2016 -0800
Committer: Matthias Boehm <mb...@us.ibm.com>
Committed: Sun Feb 14 01:53:42 2016 -0800
----------------------------------------------------------------------
.../org/apache/sysml/runtime/matrix/SortMR.java | 16 +++++++++++-----
.../apache/sysml/runtime/util/MapReduceTool.java | 3 ++-
2 files changed, 13 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/7f8716b6/src/main/java/org/apache/sysml/runtime/matrix/SortMR.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/SortMR.java b/src/main/java/org/apache/sysml/runtime/matrix/SortMR.java
index 218a95b..6502e57 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/SortMR.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/SortMR.java
@@ -196,13 +196,19 @@ public class SortMR
Path outpath = new Path(tmpOutput);
FileOutputFormat.setOutputPath(job, outpath);
MapReduceTool.deleteFileIfExistOnHDFS(outpath, job);
-
+
//set number of reducers (1 if local mode)
- if( InfrastructureAnalyzer.isLocalMode(job) )
- job.setNumReduceTasks(1);
- else
+ if( !InfrastructureAnalyzer.isLocalMode(job) ) {
MRJobConfiguration.setNumReducers(job, numReducers, numReducers);
-
+ //ensure partition size <= 10M records to avoid scalability bottlenecks
+ //on cp-side qpick instructions for quantile/iqm/median (~128MB)
+ if( !(getSortInstructionType(sortInst)==SortKeys.OperationTypes.Indexes) )
+ job.setNumReduceTasks((int)Math.max(job.getNumReduceTasks(), rlen/10000000));
+ }
+ else //in case of local mode
+ job.setNumReduceTasks(1);
+
+
//setup input/output format
job.setInputFormat(SamplingSortMRInputFormat.class);
SamplingSortMRInputFormat.setTargetKeyValueClasses(job, (Class<? extends WritableComparable>) outputInfo.outputKeyClass, outputInfo.outputValueClass);
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/7f8716b6/src/main/java/org/apache/sysml/runtime/util/MapReduceTool.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/util/MapReduceTool.java b/src/main/java/org/apache/sysml/runtime/util/MapReduceTool.java
index a142a2d..9b70106 100644
--- a/src/main/java/org/apache/sysml/runtime/util/MapReduceTool.java
+++ b/src/main/java/org/apache/sysml/runtime/util/MapReduceTool.java
@@ -620,7 +620,8 @@ public class MapReduceTool
if(fileToRead==null)
throw new RuntimeException("cannot read partition "+currentPart);
- FSDataInputStream currentStream=fs.open(fileToRead);
+ int buffsz = 64 * 1024;
+ FSDataInputStream currentStream=fs.open(fileToRead, buffsz);
DoubleWritable readKey=new DoubleWritable();
IntWritable readValue=new IntWritable();