You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by mb...@apache.org on 2017/11/12 06:09:19 UTC

systemml git commit: [SYSTEMML-2013] Fix robustness spark nary cbind and rpc configuration

Repository: systemml
Updated Branches:
  refs/heads/master 4ae6beee5 -> f59a2dc22


[SYSTEMML-2013] Fix robustness spark nary cbind and rpc configuration

This patch addresses a recent perftest issue with stratstats on the 80GB
scenario. First, we now automatically configure (depending on the spark
version) either 'spark.akka.frameSize' or 'spark.rpc.message.maxSize' to
a more robust value of 512MB as we have done manually in the past.
Second, this patch improves the robustness of the recently added nary
cbind to use a preferred number of output partitions for the final
shuffle, which avoids large partitions in case of many inputs.


Project: http://git-wip-us.apache.org/repos/asf/systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/f59a2dc2
Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/f59a2dc2
Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/f59a2dc2

Branch: refs/heads/master
Commit: f59a2dc22ecd4645d22a6055e9682a606672e839
Parents: 4ae6bee
Author: Matthias Boehm <mb...@gmail.com>
Authored: Sat Nov 11 22:10:24 2017 -0800
Committer: Matthias Boehm <mb...@gmail.com>
Committed: Sat Nov 11 22:10:24 2017 -0800

----------------------------------------------------------------------
 .../controlprogram/context/SparkExecutionContext.java     | 10 +++++++++-
 .../instructions/spark/BuiltinNarySPInstruction.java      |  4 +++-
 2 files changed, 12 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/systemml/blob/f59a2dc2/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java b/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
index 467b6fc..ff47b3a 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
@@ -261,7 +261,15 @@ public class SparkExecutionContext extends ExecutionContext
 		if( !conf.contains("spark.locality.wait") ) { //default 3s
 			conf.set("spark.locality.wait", "5s");
 		}
-
+		
+		//increase max message size for robustness
+		String sparkVersion = org.apache.spark.package$.MODULE$.SPARK_VERSION();
+		String msgSizeConf = (UtilFunctions.compareVersion(sparkVersion, "2.0.0") < 0) ?
+			"spark.akka.frameSize" : "spark.rpc.message.maxSize";
+		if( !conf.contains(msgSizeConf) ) { //default 128MB
+			conf.set(msgSizeConf, "512");
+		}
+		
 		return conf;
 	}
 	

http://git-wip-us.apache.org/repos/asf/systemml/blob/f59a2dc2/src/main/java/org/apache/sysml/runtime/instructions/spark/BuiltinNarySPInstruction.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/BuiltinNarySPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/BuiltinNarySPInstruction.java
index 4edb1cf..71d0003 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/BuiltinNarySPInstruction.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/BuiltinNarySPInstruction.java
@@ -28,6 +28,7 @@ import org.apache.sysml.runtime.instructions.InstructionUtils;
 import org.apache.sysml.runtime.instructions.cp.CPOperand;
 import org.apache.sysml.runtime.instructions.spark.AppendGSPInstruction.ShiftMatrix;
 import org.apache.sysml.runtime.instructions.spark.utils.RDDAggregateUtils;
+import org.apache.sysml.runtime.instructions.spark.utils.SparkUtils;
 import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
 import org.apache.sysml.runtime.matrix.data.MatrixBlock;
 import org.apache.sysml.runtime.matrix.data.MatrixIndexes;
@@ -85,7 +86,8 @@ public class BuiltinNarySPInstruction extends SPInstruction
 		}
 		
 		//aggregate partially overlapping blocks w/ single shuffle
-		out = RDDAggregateUtils.mergeByKey(out);
+		int numPartOut = SparkUtils.getNumPreferredPartitions(mcOut);
+		out = RDDAggregateUtils.mergeByKey(out, numPartOut, false);
 		
 		//set output RDD and add lineage
 		sec.getMatrixCharacteristics(output.getName()).set(mcOut);