You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by mb...@apache.org on 2017/11/12 06:09:19 UTC
systemml git commit: [SYSTEMML-2013] Fix robustness spark nary cbind
and rpc configuration
Repository: systemml
Updated Branches:
refs/heads/master 4ae6beee5 -> f59a2dc22
[SYSTEMML-2013] Fix robustness spark nary cbind and rpc configuration
This patch addresses a recent perftest issue with stratstats on the 80GB
scenario. First, we now automatically configure (depending on the spark
version) either 'spark.akka.frameSize' or 'spark.rpc.message.maxSize' to
a more robust value of 512MB as we have done manually in the past.
Second, this patch improves the robustness of the recently added nary
cbind to use a preferred number of output partitions for the final
shuffle, which avoids large partitions in case of many inputs.
Project: http://git-wip-us.apache.org/repos/asf/systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/f59a2dc2
Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/f59a2dc2
Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/f59a2dc2
Branch: refs/heads/master
Commit: f59a2dc22ecd4645d22a6055e9682a606672e839
Parents: 4ae6bee
Author: Matthias Boehm <mb...@gmail.com>
Authored: Sat Nov 11 22:10:24 2017 -0800
Committer: Matthias Boehm <mb...@gmail.com>
Committed: Sat Nov 11 22:10:24 2017 -0800
----------------------------------------------------------------------
.../controlprogram/context/SparkExecutionContext.java | 10 +++++++++-
.../instructions/spark/BuiltinNarySPInstruction.java | 4 +++-
2 files changed, 12 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/systemml/blob/f59a2dc2/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java b/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
index 467b6fc..ff47b3a 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
@@ -261,7 +261,15 @@ public class SparkExecutionContext extends ExecutionContext
if( !conf.contains("spark.locality.wait") ) { //default 3s
conf.set("spark.locality.wait", "5s");
}
-
+
+ //increase max message size for robustness
+ String sparkVersion = org.apache.spark.package$.MODULE$.SPARK_VERSION();
+ String msgSizeConf = (UtilFunctions.compareVersion(sparkVersion, "2.0.0") < 0) ?
+ "spark.akka.frameSize" : "spark.rpc.message.maxSize";
+ if( !conf.contains(msgSizeConf) ) { //default 128MB
+ conf.set(msgSizeConf, "512");
+ }
+
return conf;
}
http://git-wip-us.apache.org/repos/asf/systemml/blob/f59a2dc2/src/main/java/org/apache/sysml/runtime/instructions/spark/BuiltinNarySPInstruction.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/BuiltinNarySPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/BuiltinNarySPInstruction.java
index 4edb1cf..71d0003 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/BuiltinNarySPInstruction.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/BuiltinNarySPInstruction.java
@@ -28,6 +28,7 @@ import org.apache.sysml.runtime.instructions.InstructionUtils;
import org.apache.sysml.runtime.instructions.cp.CPOperand;
import org.apache.sysml.runtime.instructions.spark.AppendGSPInstruction.ShiftMatrix;
import org.apache.sysml.runtime.instructions.spark.utils.RDDAggregateUtils;
+import org.apache.sysml.runtime.instructions.spark.utils.SparkUtils;
import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
import org.apache.sysml.runtime.matrix.data.MatrixBlock;
import org.apache.sysml.runtime.matrix.data.MatrixIndexes;
@@ -85,7 +86,8 @@ public class BuiltinNarySPInstruction extends SPInstruction
}
//aggregate partially overlapping blocks w/ single shuffle
- out = RDDAggregateUtils.mergeByKey(out);
+ int numPartOut = SparkUtils.getNumPreferredPartitions(mcOut);
+ out = RDDAggregateUtils.mergeByKey(out, numPartOut, false);
//set output RDD and add lineage
sec.getMatrixCharacteristics(output.getName()).set(mcOut);