You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by mb...@apache.org on 2016/02/14 23:15:58 UTC

[3/3] incubator-systemml git commit: Performance mr iqm/quantile/median (qsort num red, qpick buffer size)

Performance mr iqm/quantile/median (qsort num red, qpick buffer size)

Our mr iqm/quantile/median realization via qsort and qpick showed severe
scalability bottlenecks on large data. The root cause was that the
out-of-core qpick reads -- according to the required quantiles -- one or
two partitions into CP, where the partition sizes are determined by the
number of reducers of SortMR. This performance patch makes two
improvements: (1) setting the number of reducers for SortMR (except for
order) such that a single partition has at most 10M records (~128MB)
which bounds the read for qpick, and (2) a larger io buffer size of 64KB
for qpick which makes the partition read slightly faster compared to the
4KB default (~20%).  

Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/7f8716b6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/7f8716b6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/7f8716b6

Branch: refs/heads/master
Commit: 7f8716b641a45dbc77713710979cbf331d89ae8a
Parents: 240143b
Author: Matthias Boehm <mb...@us.ibm.com>
Authored: Sun Feb 14 01:53:42 2016 -0800
Committer: Matthias Boehm <mb...@us.ibm.com>
Committed: Sun Feb 14 01:53:42 2016 -0800

----------------------------------------------------------------------
 .../org/apache/sysml/runtime/matrix/SortMR.java     | 16 +++++++++++-----
 .../apache/sysml/runtime/util/MapReduceTool.java    |  3 ++-
 2 files changed, 13 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/7f8716b6/src/main/java/org/apache/sysml/runtime/matrix/SortMR.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/SortMR.java b/src/main/java/org/apache/sysml/runtime/matrix/SortMR.java
index 218a95b..6502e57 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/SortMR.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/SortMR.java
@@ -196,13 +196,19 @@ public class SortMR
 	    Path outpath = new Path(tmpOutput);
 	    FileOutputFormat.setOutputPath(job, outpath);	    
 	    MapReduceTool.deleteFileIfExistOnHDFS(outpath, job);
-	    
+
 	    //set number of reducers (1 if local mode)
-	    if( InfrastructureAnalyzer.isLocalMode(job) )
-	    	job.setNumReduceTasks(1);
-	    else
+		if( !InfrastructureAnalyzer.isLocalMode(job) ) {
 	    	MRJobConfiguration.setNumReducers(job, numReducers, numReducers);
-	    
+	    	//ensure partition size <= 10M records to avoid scalability bottlenecks
+	    	//on cp-side qpick instructions for quantile/iqm/median (~128MB)
+	    	if( !(getSortInstructionType(sortInst)==SortKeys.OperationTypes.Indexes) )
+	    		job.setNumReduceTasks((int)Math.max(job.getNumReduceTasks(), rlen/10000000));
+	    }
+	    else //in case of local mode
+	    	job.setNumReduceTasks(1);
+	    	
+	    	
 	    //setup input/output format
 	    job.setInputFormat(SamplingSortMRInputFormat.class);
 	    SamplingSortMRInputFormat.setTargetKeyValueClasses(job, (Class<? extends WritableComparable>) outputInfo.outputKeyClass, outputInfo.outputValueClass);

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/7f8716b6/src/main/java/org/apache/sysml/runtime/util/MapReduceTool.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/util/MapReduceTool.java b/src/main/java/org/apache/sysml/runtime/util/MapReduceTool.java
index a142a2d..9b70106 100644
--- a/src/main/java/org/apache/sysml/runtime/util/MapReduceTool.java
+++ b/src/main/java/org/apache/sysml/runtime/util/MapReduceTool.java
@@ -620,7 +620,8 @@ public class MapReduceTool
 		if(fileToRead==null)
 			throw new RuntimeException("cannot read partition "+currentPart);
 		
-		FSDataInputStream currentStream=fs.open(fileToRead);
+		int buffsz = 64 * 1024;
+		FSDataInputStream currentStream=fs.open(fileToRead, buffsz);
 	    DoubleWritable readKey=new DoubleWritable();
 	    IntWritable readValue=new IntWritable();