You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by mb...@apache.org on 2017/11/22 07:07:04 UTC

[1/2] systemml git commit: [SYSTEMML-2027] Fix codegen race conditions in spark executors

Repository: systemml
Updated Branches:
  refs/heads/master 20f97e0b5 -> 5b9c12df6


[SYSTEMML-2027] Fix codegen race conditions in spark executors

For large-scale perftest runs, we've seen intermittent task failures due
to various non-reproducible janino class compilation issues. The root
cause seems to be concurrent compilation and loading of the same class
by multiple executor threads. We now simply synchronize this code path,
which ensures that only the first thread compiles the class and all
other threads (and across tasks) reuse the compiled class as before from
the static class cache (with unique names for distinct classes). The
parfor codepath already used synchronized access, while all
data-parallel operations (row, cell, magg, outer) did not.


Project: http://git-wip-us.apache.org/repos/asf/systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/73394999
Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/73394999
Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/73394999

Branch: refs/heads/master
Commit: 73394999640969294c94a9b0c7db7cc2d3a1e81f
Parents: 20f97e0
Author: Matthias Boehm <mb...@gmail.com>
Authored: Tue Nov 21 19:19:41 2017 -0800
Committer: Matthias Boehm <mb...@gmail.com>
Committed: Tue Nov 21 19:31:26 2017 -0800

----------------------------------------------------------------------
 .../apache/sysml/runtime/codegen/CodegenUtils.java    | 14 +++++++++++++-
 .../parfor/RemoteDPParForSparkWorker.java             | 10 ++++------
 .../parfor/RemoteParForSparkWorker.java               |  8 +++-----
 .../instructions/spark/SpoofSPInstruction.java        |  8 ++++----
 4 files changed, 24 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/systemml/blob/73394999/src/main/java/org/apache/sysml/runtime/codegen/CodegenUtils.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/codegen/CodegenUtils.java b/src/main/java/org/apache/sysml/runtime/codegen/CodegenUtils.java
index 8b3ea5f..726e267 100644
--- a/src/main/java/org/apache/sysml/runtime/codegen/CodegenUtils.java
+++ b/src/main/java/org/apache/sysml/runtime/codegen/CodegenUtils.java
@@ -68,7 +68,7 @@ public class CodegenUtils
 	private static String _workingDir = null;
 	
 	public static Class<?> compileClass(String name, String src) 
-			throws DMLRuntimeException
+		throws DMLRuntimeException
 	{
 		//reuse existing compiled class
 		Class<?> ret = _cache.get(name);
@@ -98,6 +98,18 @@ public class CodegenUtils
 		return getClass(name, null);
 	}
 	
+	public synchronized static Class<?> getClassSync(String name, byte[] classBytes)
+		throws DMLRuntimeException 
+	{
+		//In order to avoid anomalies of concurrently compiling and loading the same
+		//class with the same name multiple times in spark executors, this indirection
+		//synchronizes the class compilation. This synchronization leads to the first
+		//thread compiling the common class and all other threads simply reusing the
+		//cached class instance, which also ensures that the same class is not loaded
+		//multiple times which causes unnecessary JIT compilation overhead.
+		return getClass(name, classBytes);
+	}
+	
 	public static Class<?> getClass(String name, byte[] classBytes) 
 		throws DMLRuntimeException 
 	{

http://git-wip-us.apache.org/repos/asf/systemml/blob/73394999/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java
index 9f2658c..06be2b1 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java
@@ -147,15 +147,13 @@ public class RemoteDPParForSparkWorker extends ParWorker implements PairFlatMapF
 		_workerID = ID;
 		
 		//initialize codegen class cache (before program parsing)
-		synchronized( CodegenUtils.class ) {
-			for( Entry<String, byte[]> e : _clsMap.entrySet() )
-				CodegenUtils.getClass(e.getKey(), e.getValue());
-		}
-		
+		for( Entry<String, byte[]> e : _clsMap.entrySet() )
+			CodegenUtils.getClassSync(e.getKey(), e.getValue());
+	
 		//parse and setup parfor body program
 		ParForBody body = ProgramConverter.parseParForBody(_prog, (int)_workerID);
 		_childBlocks = body.getChildBlocks();
-		_ec          = body.getEc();				
+		_ec          = body.getEc();
 		_resultVars  = body.getResultVarNames();
 		_numTasks    = 0;
 		_numIters    = 0;

http://git-wip-us.apache.org/repos/asf/systemml/blob/73394999/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSparkWorker.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSparkWorker.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSparkWorker.java
index 614f946..9753635 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSparkWorker.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSparkWorker.java
@@ -100,11 +100,9 @@ public class RemoteParForSparkWorker extends ParWorker implements PairFlatMapFun
 		_workerID = taskID;
 		
 		//initialize codegen class cache (before program parsing)
-		synchronized( CodegenUtils.class ) {
-			for( Entry<String, byte[]> e : _clsMap.entrySet() )
-				CodegenUtils.getClass(e.getKey(), e.getValue());
-		}
-		
+		for( Entry<String, byte[]> e : _clsMap.entrySet() )
+			CodegenUtils.getClassSync(e.getKey(), e.getValue());
+	
 		//parse and setup parfor body program
 		ParForBody body = ProgramConverter.parseParForBody(_prog, (int)_workerID);
 		_childBlocks = body.getChildBlocks();

http://git-wip-us.apache.org/repos/asf/systemml/blob/73394999/src/main/java/org/apache/sysml/runtime/instructions/spark/SpoofSPInstruction.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/SpoofSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/SpoofSPInstruction.java
index cb3ad14..f5b1576 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/SpoofSPInstruction.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/SpoofSPInstruction.java
@@ -456,7 +456,7 @@ public class SpoofSPInstruction extends SPInstruction {
 		{
 			//lazy load of shipped class
 			if( _op == null ) {
-				Class<?> loadedClass = CodegenUtils.getClass(_className, _classBytes);
+				Class<?> loadedClass = CodegenUtils.getClassSync(_className, _classBytes);
 				_op = (SpoofRowwise) CodegenUtils.createInstance(loadedClass); 
 			}
 			
@@ -513,7 +513,7 @@ public class SpoofSPInstruction extends SPInstruction {
 		{
 			//lazy load of shipped class
 			if( _op == null ) {
-				Class<?> loadedClass = CodegenUtils.getClass(_className, _classBytes);
+				Class<?> loadedClass = CodegenUtils.getClassSync(_className, _classBytes);
 				_op = (SpoofOperator) CodegenUtils.createInstance(loadedClass); 
 			}
 			
@@ -565,7 +565,7 @@ public class SpoofSPInstruction extends SPInstruction {
 		{
 			//lazy load of shipped class
 			if( _op == null ) {
-				Class<?> loadedClass = CodegenUtils.getClass(_className, _classBytes);
+				Class<?> loadedClass = CodegenUtils.getClassSync(_className, _classBytes);
 				_op = (SpoofOperator) CodegenUtils.createInstance(loadedClass); 
 			}
 				
@@ -627,7 +627,7 @@ public class SpoofSPInstruction extends SPInstruction {
 		{
 			//lazy load of shipped class
 			if( _op == null ) {
-				Class<?> loadedClass = CodegenUtils.getClass(_className, _classBytes);
+				Class<?> loadedClass = CodegenUtils.getClassSync(_className, _classBytes);
 				_op = (SpoofOperator) CodegenUtils.createInstance(loadedClass); 
 			}
 			


[2/2] systemml git commit: [SYSTEMML-2028] Fix codegen invalid fallback from row to cell ops

Posted by mb...@apache.org.
[SYSTEMML-2028] Fix codegen invalid fallback from row to cell ops

This patch fixes issues of perftest Mlogreg 10M x 1K, icpt1 with codegen
due to unsupported cbind operations in cell templates. The root cause
was a violated block size constraint which led to an invalid fallback
from row to cell operations although these are not supported for cell
templates. With this patch, the fallback happens in an operator aware
manner, removing the entry in case of invalid fallbacks.


Project: http://git-wip-us.apache.org/repos/asf/systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/5b9c12df
Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/5b9c12df
Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/5b9c12df

Branch: refs/heads/master
Commit: 5b9c12df62951a49179b39b05bb774da97d3cd37
Parents: 7339499
Author: Matthias Boehm <mb...@gmail.com>
Authored: Tue Nov 21 22:02:07 2017 -0800
Committer: Matthias Boehm <mb...@gmail.com>
Committed: Tue Nov 21 22:02:07 2017 -0800

----------------------------------------------------------------------
 .../opt/PlanSelectionFuseCostBasedV2.java       | 37 +++++++++++++-------
 1 file changed, 24 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/systemml/blob/5b9c12df/src/main/java/org/apache/sysml/hops/codegen/opt/PlanSelectionFuseCostBasedV2.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/hops/codegen/opt/PlanSelectionFuseCostBasedV2.java b/src/main/java/org/apache/sysml/hops/codegen/opt/PlanSelectionFuseCostBasedV2.java
index 7ad00fa..00b1543 100644
--- a/src/main/java/org/apache/sysml/hops/codegen/opt/PlanSelectionFuseCostBasedV2.java
+++ b/src/main/java/org/apache/sysml/hops/codegen/opt/PlanSelectionFuseCostBasedV2.java
@@ -639,6 +639,11 @@ public class PlanSelectionFuseCostBasedV2 extends PlanSelection
 			|| HopRewriteUtils.isBinary(hop, OpOp2.CBIND));
 	}
 	
+	private static boolean isValidRow2CellOp(Hop hop) {
+		return !(HopRewriteUtils.isBinary(hop, OpOp2.CBIND)
+			|| (hop instanceof AggBinaryOp && hop.getDim1()!=1 && hop.getDim2()!=1));
+	}
+	
 	private static void pruneInvalidAndSpecialCasePlans(CPlanMemoTable memo, PlanPartition part) 
 	{	
 		//prune invalid row entries w/ violated blocksize constraint
@@ -744,24 +749,30 @@ public class PlanSelectionFuseCostBasedV2 extends PlanSelection
 		//i.e., plans that become invalid after the previous pruning step
 		long hopID = current.getHopID();
 		if( part.getPartition().contains(hopID) && memo.contains(hopID, TemplateType.ROW) ) {
-			for( MemoTableEntry me : memo.get(hopID, TemplateType.ROW) ) {
+			Iterator<MemoTableEntry> iter = memo.get(hopID, TemplateType.ROW).iterator();
+			while( iter.hasNext() ) {
+				MemoTableEntry me = iter.next();
 				//convert leaf node with pure vector inputs
-				if( !me.hasPlanRef() && !TemplateUtils.hasMatrixInput(current) ) {
-					me.type = TemplateType.CELL;
-					if( LOG.isTraceEnabled() )
-						LOG.trace("Converted leaf memo table entry from row to cell: "+me);
-				}
+				boolean applyLeaf = (!me.hasPlanRef() 
+					&& !TemplateUtils.hasMatrixInput(current));
 				
 				//convert inner node without row template input
-				if( me.hasPlanRef() && !ROW_TPL.open(current) ) {
-					boolean hasRowInput = false;
-					for( int i=0; i<3; i++ )
-						if( me.isPlanRef(i) )
-							hasRowInput |= memo.contains(me.input(i), TemplateType.ROW);
-					if( !hasRowInput ) {
+				boolean applyInner = !applyLeaf && !ROW_TPL.open(current);
+				for( int i=0; i<3 & applyInner; i++ )
+					if( me.isPlanRef(i) )
+						applyInner &= !memo.contains(me.input(i), TemplateType.ROW);
+				
+				if( applyLeaf || applyInner ) {
+					String type = applyLeaf ? "leaf" : "inner";
+					if( isValidRow2CellOp(current) ) {
 						me.type = TemplateType.CELL;
 						if( LOG.isTraceEnabled() )
-							LOG.trace("Converted inner memo table entry from row to cell: "+me);	
+							LOG.trace("Converted "+type+" memo table entry from row to cell: "+me);
+					}
+					else {
+						if( LOG.isTraceEnabled() )
+							LOG.trace("Removed "+type+" memo table entry row (unsupported cell): "+me);
+						iter.remove();
 					}
 				}
 			}