You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by lr...@apache.org on 2015/11/19 21:46:59 UTC

[17/50] [abbrv] incubator-systemml git commit: Improved parallel writers (fallback to sequential for single file)

Improved parallel writers (fallback to sequential for single file)

All parallel writers in cp determine their degree of parallelism based
on available parallelism and datasize divided by hdfs blocksize. For
good scalability, every thread writes it own file (which is consistent
with write from distributed backends). However, it leads to
unnecessarily complex outputs (e.g., "./A.csv/m-0-00000") for small data
(<128MB). This change lets the parallel writers fall back to sequential
write in these situations, which gives a slight benefit in terms of
performance but more importantly collapses the unnecessary complex
output directories to "A.csv".   

Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/6d6f8902
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/6d6f8902
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/6d6f8902

Branch: refs/heads/master
Commit: 6d6f8902707739c86af2b71a8ea1df46750f3e60
Parents: 9506f47
Author: Matthias Boehm <mb...@us.ibm.com>
Authored: Fri Oct 30 22:05:23 2015 -0700
Committer: Matthias Boehm <mb...@us.ibm.com>
Committed: Fri Oct 30 22:05:23 2015 -0700

----------------------------------------------------------------------
 .../com/ibm/bi/dml/runtime/io/WriterBinaryBlockParallel.java   | 6 ++++++
 .../com/ibm/bi/dml/runtime/io/WriterMatrixMarketParallel.java  | 6 ++++++
 .../java/com/ibm/bi/dml/runtime/io/WriterTextCSVParallel.java  | 6 ++++++
 .../java/com/ibm/bi/dml/runtime/io/WriterTextCellParallel.java | 6 ++++++
 4 files changed, 24 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/6d6f8902/src/main/java/com/ibm/bi/dml/runtime/io/WriterBinaryBlockParallel.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/ibm/bi/dml/runtime/io/WriterBinaryBlockParallel.java b/src/main/java/com/ibm/bi/dml/runtime/io/WriterBinaryBlockParallel.java
index 5c8e074..0218ffd 100644
--- a/src/main/java/com/ibm/bi/dml/runtime/io/WriterBinaryBlockParallel.java
+++ b/src/main/java/com/ibm/bi/dml/runtime/io/WriterBinaryBlockParallel.java
@@ -73,6 +73,12 @@ public class WriterBinaryBlockParallel extends WriterBinaryBlock
 		int numThreads = OptimizerUtils.getParallelBinaryWriteParallelism();
 		numThreads = Math.min(numThreads, numPartFiles);
 		
+		//fall back to sequential write if dop is 1 (e.g., <128MB) in order to create single file
+		if( numThreads <= 1 ) {
+			super.writeBinaryBlockMatrixToHDFS(path, job, src, rlen, clen, brlen, bclen, replication);
+			return;
+		}
+			
 		//set up preferred custom serialization framework for binary block format
 		if( MRJobConfiguration.USE_BINARYBLOCK_SERIALIZATION )
 			MRJobConfiguration.addBinaryBlockSerializationFramework( job );

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/6d6f8902/src/main/java/com/ibm/bi/dml/runtime/io/WriterMatrixMarketParallel.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/ibm/bi/dml/runtime/io/WriterMatrixMarketParallel.java b/src/main/java/com/ibm/bi/dml/runtime/io/WriterMatrixMarketParallel.java
index 6f21a16..89384b3 100644
--- a/src/main/java/com/ibm/bi/dml/runtime/io/WriterMatrixMarketParallel.java
+++ b/src/main/java/com/ibm/bi/dml/runtime/io/WriterMatrixMarketParallel.java
@@ -67,6 +67,12 @@ public class WriterMatrixMarketParallel extends WriterMatrixMarket
 		int numThreads = OptimizerUtils.getParallelTextWriteParallelism();
 		numThreads = Math.min(numThreads, numPartFiles);
 		
+		//fall back to sequential write if dop is 1 (e.g., <128MB) in order to create single file
+		if( numThreads <= 1 ) {
+			super.writeMatrixMarketMatrixToHDFS(path, job, src, rlen, clen, nnz);
+			return;
+		}
+		
 		//create directory for concurrent tasks
 		MapReduceTool.createDirIfNotExistOnHDFS(path.toString(), DMLConfig.DEFAULT_SHARED_DIR_PERMISSION);
 

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/6d6f8902/src/main/java/com/ibm/bi/dml/runtime/io/WriterTextCSVParallel.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/ibm/bi/dml/runtime/io/WriterTextCSVParallel.java b/src/main/java/com/ibm/bi/dml/runtime/io/WriterTextCSVParallel.java
index 4b2191b..d76894c 100644
--- a/src/main/java/com/ibm/bi/dml/runtime/io/WriterTextCSVParallel.java
+++ b/src/main/java/com/ibm/bi/dml/runtime/io/WriterTextCSVParallel.java
@@ -70,6 +70,12 @@ public class WriterTextCSVParallel extends WriterTextCSV
 		//determine degree of parallelism
 		int numThreads = OptimizerUtils.getParallelTextWriteParallelism();
 		numThreads = Math.min(numThreads, numPartFiles);
+	
+		//fall back to sequential write if dop is 1 (e.g., <128MB) in order to create single file
+		if( numThreads <= 1 ) {
+			super.writeCSVMatrixToHDFS(path, job, src, rlen, clen, nnz, props);
+			return;
+		}
 		
 		//create directory for concurrent tasks
 		MapReduceTool.createDirIfNotExistOnHDFS(path.toString(), DMLConfig.DEFAULT_SHARED_DIR_PERMISSION);

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/6d6f8902/src/main/java/com/ibm/bi/dml/runtime/io/WriterTextCellParallel.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/ibm/bi/dml/runtime/io/WriterTextCellParallel.java b/src/main/java/com/ibm/bi/dml/runtime/io/WriterTextCellParallel.java
index 90d2a5e..cacbe49 100644
--- a/src/main/java/com/ibm/bi/dml/runtime/io/WriterTextCellParallel.java
+++ b/src/main/java/com/ibm/bi/dml/runtime/io/WriterTextCellParallel.java
@@ -66,6 +66,12 @@ public class WriterTextCellParallel extends WriterTextCell
 		int numThreads = OptimizerUtils.getParallelTextWriteParallelism();
 		numThreads = Math.min(numThreads, numPartFiles);
 		
+		//fall back to sequential write if dop is 1 (e.g., <128MB) in order to create single file
+		if( numThreads <= 1 ) {
+			super.writeTextCellMatrixToHDFS(path, job, src, rlen, clen);
+			return;
+		}
+		
 		//create directory for concurrent tasks
 		MapReduceTool.createDirIfNotExistOnHDFS(path.toString(), DMLConfig.DEFAULT_SHARED_DIR_PERMISSION);