You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by mb...@apache.org on 2016/09/24 20:50:23 UTC

[1/2] incubator-systemml git commit: [SYSTEMML-569] Scalability transformencode (memory-efficient build)

Repository: incubator-systemml
Updated Branches:
  refs/heads/master dae9ecaac -> feef7e0c7


[SYSTEMML-569] Scalability transformencode (memory-efficient build)

The scalability of transformencode with recoding depends on the number
of distinct items in the input frame. The larger the number of distinct
items, the larger the temporary partial recode maps and the larger the
output per partition (all of which need to fit into the working memory).
For a scenario of 1G x 5 w/ double schema and 1Mx5 distinct items, we
previously ran into out-of-memory errors.

This patch makes various modifications: 

(1) New custom partial build, which avoids code assignments (long
objects) and unnecessary string conversions of schema-specific values.
This is beneficial for performance (string conversion and reduce garbage
collection) and more importantly reduce the memory footprint (no long
objects and smaller tokens on non-string schemas).

(2) Global distinct/group/write on schema-specific objects instead of
string tokens. This again reduces the memory footprint on the map task
output and group output but also reduces shuffle.  

With these changes, we're now able to execute the aforementioned
scenario in 782s. However, it also generally improved performance: for
example, on a scenario of of 1G x 5 w/ double schema and 100Kx5 distinct
items, improved runtime from 298s to 158s.

Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/e9bf9108
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/e9bf9108
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/e9bf9108

Branch: refs/heads/master
Commit: e9bf9108f880ab499a91ad36e704e0fe7865d181
Parents: dae9eca
Author: Matthias Boehm <mb...@us.ibm.com>
Authored: Fri Sep 23 22:12:04 2016 -0700
Committer: Matthias Boehm <mb...@us.ibm.com>
Committed: Fri Sep 23 22:19:36 2016 -0700

----------------------------------------------------------------------
 ...ReturnParameterizedBuiltinSPInstruction.java | 53 ++++++++++++--------
 .../sysml/runtime/transform/RecodeAgent.java    | 43 ++++++++++++++--
 2 files changed, 70 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/e9bf9108/src/main/java/org/apache/sysml/runtime/instructions/spark/MultiReturnParameterizedBuiltinSPInstruction.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/MultiReturnParameterizedBuiltinSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/MultiReturnParameterizedBuiltinSPInstruction.java
index b36b1f3..b0bfd22 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/MultiReturnParameterizedBuiltinSPInstruction.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/MultiReturnParameterizedBuiltinSPInstruction.java
@@ -23,6 +23,7 @@ import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.Map.Entry;
 
@@ -210,47 +211,54 @@ public class MultiReturnParameterizedBuiltinSPInstruction extends ComputationSPI
 	}
 	
 	/**
-	 * 
+	 * This function pre-aggregates distinct values of recoded columns per partition
+	 * (part of distributed recode map construction, used for recoding, binning and 
+	 * dummy coding). We operate directly over schema-specific objects to avoid 
+	 * unnecessary string conversion, as well as reduce memory overhead and shuffle.
 	 */
-	public static class TransformEncodeBuildFunction implements PairFlatMapFunction<Iterator<Tuple2<Long, FrameBlock>>, Integer, String>
+	public static class TransformEncodeBuildFunction 
+		implements PairFlatMapFunction<Iterator<Tuple2<Long, FrameBlock>>, Integer, Object>
 	{
 		private static final long serialVersionUID = 6336375833412029279L;
 
-		private Encoder _encoder = null;
+		private RecodeAgent _raEncoder = null;
 		
 		public TransformEncodeBuildFunction(Encoder encoder) {
-			_encoder = encoder;
+			for( Encoder cEncoder : ((EncoderComposite)encoder).getEncoders() )
+				if( cEncoder instanceof RecodeAgent )
+					_raEncoder = (RecodeAgent)cEncoder;
 		}
 		
 		@Override
-		public Iterable<Tuple2<Integer, String>> call(Iterator<Tuple2<Long, FrameBlock>> iter)
+		public Iterable<Tuple2<Integer, Object>> call(Iterator<Tuple2<Long, FrameBlock>> iter)
 			throws Exception 
 		{
 			//build meta data (e.g., recode maps)
 			while( iter.hasNext() ) {
-				_encoder.build(iter.next()._2());	
+				_raEncoder.buildPartial(iter.next()._2());	
 			}
 			
 			//output recode maps as columnID - token pairs
-			ArrayList<Tuple2<Integer,String>> ret = new ArrayList<Tuple2<Integer,String>>();
-			if( _encoder instanceof EncoderComposite )
-				for( Encoder cEncoder : ((EncoderComposite)_encoder).getEncoders() )
-					if( cEncoder instanceof RecodeAgent ) {
-						RecodeAgent ra = (RecodeAgent) cEncoder;
-						HashMap<Integer,HashMap<String,Long>> tmp = ra.getCPRecodeMaps();
-						for( Entry<Integer,HashMap<String,Long>> e1 : tmp.entrySet() )
-							for( String token : e1.getValue().keySet() )
-								ret.add(new Tuple2<Integer,String>(e1.getKey(), token));
-					}
-				
+			ArrayList<Tuple2<Integer,Object>> ret = new ArrayList<Tuple2<Integer,Object>>();
+			HashMap<Integer,HashSet<Object>> tmp = _raEncoder.getCPRecodeMapsPartial();
+			for( Entry<Integer,HashSet<Object>> e1 : tmp.entrySet() )
+				for( Object token : e1.getValue() )
+					ret.add(new Tuple2<Integer,Object>(e1.getKey(), token));
+			_raEncoder.getCPRecodeMapsPartial().clear();
+		
 			return ret;
 		}
 	}
 	
 	/**
-	 * 
+	 * This function assigns codes to globally distinct values of recoded columns 
+	 * and writes the resulting column map in textcell (IJV) format to the output. 
+	 * (part of distributed recode map construction, used for recoding, binning and 
+	 * dummy coding). We operate directly over schema-specific objects to avoid 
+	 * unnecessary string conversion, as well as reduce memory overhead and shuffle.
 	 */
-	public static class TransformEncodeGroupFunction implements FlatMapFunction<Tuple2<Integer, Iterable<String>>, String>
+	public static class TransformEncodeGroupFunction 
+		implements FlatMapFunction<Tuple2<Integer, Iterable<Object>>, String>
 	{
 		private static final long serialVersionUID = -1034187226023517119L;
 
@@ -261,11 +269,11 @@ public class MultiReturnParameterizedBuiltinSPInstruction extends ComputationSPI
 		}
 		
 		@Override
-		public Iterable<String> call(Tuple2<Integer, Iterable<String>> arg0)
+		public Iterable<String> call(Tuple2<Integer, Iterable<Object>> arg0)
 			throws Exception 
 		{
 			String colID = String.valueOf(arg0._1());
-			Iterator<String> iter = arg0._2().iterator();
+			Iterator<Object> iter = arg0._2().iterator();
 			
 			ArrayList<String> ret = new ArrayList<String>();
 			StringBuilder sb = new StringBuilder();
@@ -275,7 +283,8 @@ public class MultiReturnParameterizedBuiltinSPInstruction extends ComputationSPI
 				sb.append(' ');
 				sb.append(colID);
 				sb.append(' ');
-				sb.append(RecodeAgent.constructRecodeMapEntry(iter.next(), rowID));
+				sb.append(RecodeAgent.constructRecodeMapEntry(
+						iter.next().toString(), rowID));
 				ret.add(sb.toString());
 				sb.setLength(0); 
 				rowID++;

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/e9bf9108/src/main/java/org/apache/sysml/runtime/transform/RecodeAgent.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/transform/RecodeAgent.java b/src/main/java/org/apache/sysml/runtime/transform/RecodeAgent.java
index 8ec2db3..df38d4a 100644
--- a/src/main/java/org/apache/sysml/runtime/transform/RecodeAgent.java
+++ b/src/main/java/org/apache/sysml/runtime/transform/RecodeAgent.java
@@ -27,6 +27,7 @@ import java.io.OutputStreamWriter;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map.Entry;
@@ -54,10 +55,11 @@ public class RecodeAgent extends Encoder
 
 	private int[] _mvrcdList = null;
 	private int[] _fullrcdList = null;
-
-	// HashMap< columnID, HashMap<distinctValue, count> >
+	
+	//recode maps and custom map for partial recode maps 
 	private HashMap<Integer, HashMap<String, Long>> _rcdMaps  = new HashMap<Integer, HashMap<String, Long>>();
 	private HashMap<Integer, HashMap<String,String>> _finalMaps = null;
+	private HashMap<Integer, HashSet<Object>> _rcdMapsPart = null;
 	
 	public RecodeAgent(JSONObject parsedSpec, String[] colnames, int clen)
 		throws JSONException 
@@ -92,6 +94,10 @@ public class RecodeAgent extends Encoder
 		return _rcdMaps; 
 	}
 	
+	public HashMap<Integer, HashSet<Object>> getCPRecodeMapsPartial() { 
+		return _rcdMapsPart; 
+	}
+	
 	public HashMap<Integer, HashMap<String,String>> getRecodeMaps() {
 		return _finalMaps;
 	}
@@ -400,7 +406,7 @@ public class RecodeAgent extends Encoder
 	public void build(FrameBlock in) {
 		if( !isApplicable() )
 			return;		
-		
+
 		Iterator<String[]> iter = in.getStringRowIterator();
 		while( iter.hasNext() ) {
 			String[] row = iter.next(); 
@@ -413,12 +419,41 @@ public class RecodeAgent extends Encoder
 				HashMap<String,Long> map = _rcdMaps.get(colID);
 				String key = row[colID-1];
 				if( key!=null && !key.isEmpty() && !map.containsKey(key) )
-					map.put(key, new Long(map.size()+1));
+					map.put(key, Long.valueOf(map.size()+1));
 			}
 		}
 	}
 	
 	/**
+	 * 
+	 * @param in
+	 */
+	public void buildPartial(FrameBlock in) {
+		if( !isApplicable() )
+			return;		
+
+		//ensure allocated partial recode map
+		if( _rcdMapsPart == null )
+			_rcdMapsPart = new HashMap<Integer, HashSet<Object>>();
+		
+		//construct partial recode map (tokens w/o codes)
+		//iterate over columns for sequential access
+		for( int j=0; j<_colList.length; j++ ) {
+			int colID = _colList[j]; //1-based
+			//allocate column map if necessary
+			if( !_rcdMapsPart.containsKey(colID) ) 
+				_rcdMapsPart.put(colID, new HashSet<Object>());
+			HashSet<Object> map = _rcdMapsPart.get(colID);
+			//probe and build column map
+			for( int i=0; i<in.getNumRows(); i++ )
+				map.add(in.get(i, colID-1));
+			//cleanup unnecessary entries once
+			map.remove(null);
+			map.remove("");
+		}
+	}
+	
+	/**
 	 * Method to apply transformations.
 	 * 
 	 * @param words


[2/2] incubator-systemml git commit: [SYSTEMML-955] Fix cdf/invcdf compilation in forced mr/spark, tests

Posted by mb...@apache.org.
[SYSTEMML-955] Fix cdf/invcdf compilation in forced mr/spark, tests

This patch fixes the compilation of cdf/incdf (cumulative distribution
functions) with forced mr or spark execution mode. These operations are
only supported over scalars and hence have no distributed operations. We
now always force CP execution type. The tests are extended accordingly.
Furthermore, this also fixes the error handling of unknown CP, Spark,
and GPU instructions (which resulted in null pointer exceptions so far).

Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/feef7e0c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/feef7e0c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/feef7e0c

Branch: refs/heads/master
Commit: feef7e0c7ea85c789aac26ed9d28a235b5acf6d4
Parents: e9bf910
Author: Matthias Boehm <mb...@us.ibm.com>
Authored: Fri Sep 23 23:48:49 2016 -0700
Committer: Matthias Boehm <mb...@us.ibm.com>
Committed: Fri Sep 23 23:48:49 2016 -0700

----------------------------------------------------------------------
 .../sysml/hops/ParameterizedBuiltinOp.java      |   3 +-
 .../runtime/instructions/InstructionParser.java |  14 +-
 .../runtime/instructions/InstructionUtils.java  |  20 +-
 .../unary/scalar/FullDistributionTest.java      | 239 ++++++++++++++-----
 4 files changed, 194 insertions(+), 82 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/feef7e0c/src/main/java/org/apache/sysml/hops/ParameterizedBuiltinOp.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/hops/ParameterizedBuiltinOp.java b/src/main/java/org/apache/sysml/hops/ParameterizedBuiltinOp.java
index b3aec91..ea183fc 100644
--- a/src/main/java/org/apache/sysml/hops/ParameterizedBuiltinOp.java
+++ b/src/main/java/org/apache/sysml/hops/ParameterizedBuiltinOp.java
@@ -1136,7 +1136,8 @@ public class ParameterizedBuiltinOp extends Hop implements MultiThreadedHop
 		if( (_op == ParamBuiltinOp.TRANSFORMAPPLY && REMOTE==ExecType.MR)
 			|| _op == ParamBuiltinOp.TRANSFORMDECODE && REMOTE==ExecType.MR
 			|| _op == ParamBuiltinOp.TRANSFORMMETA 
-			||  _op == ParamBuiltinOp.TOSTRING) {
+			|| _op == ParamBuiltinOp.TOSTRING 
+			|| _op == ParamBuiltinOp.CDF || _op == ParamBuiltinOp.INVCDF) {
 			_etype = ExecType.CP;
 		}
 		

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/feef7e0c/src/main/java/org/apache/sysml/runtime/instructions/InstructionParser.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/InstructionParser.java b/src/main/java/org/apache/sysml/runtime/instructions/InstructionParser.java
index b6c243c..251d37d 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/InstructionParser.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/InstructionParser.java
@@ -40,22 +40,28 @@ public class InstructionParser
 			|| execType.equalsIgnoreCase(ExecType.CP_FILE.toString()) ) 
 		{
 			CPINSTRUCTION_TYPE cptype = InstructionUtils.getCPType(str); 
+			if( cptype == null )
+				throw new DMLRuntimeException("Unknown CP instruction: " + str);
 			return CPInstructionParser.parseSingleInstruction (cptype, str);
 		}
 		else if ( execType.equalsIgnoreCase(ExecType.SPARK.toString()) ) 
 		{
 			SPINSTRUCTION_TYPE sptype = InstructionUtils.getSPType(str); 
+			if( sptype == null )
+				throw new DMLRuntimeException("Unknown SPARK instruction: " + str);
 			return SPInstructionParser.parseSingleInstruction (sptype, str);
 		}
 		else if ( execType.equalsIgnoreCase(ExecType.GPU.toString()) ) 
 		{
-			GPUINSTRUCTION_TYPE cptype = InstructionUtils.getGPUType(str); 
-			return GPUInstructionParser.parseSingleInstruction (cptype, str);
+			GPUINSTRUCTION_TYPE gputype = InstructionUtils.getGPUType(str); 
+			if( gputype == null )
+				throw new DMLRuntimeException("Unknown GPU instruction: " + str);
+			return GPUInstructionParser.parseSingleInstruction (gputype, str);
 		}
 		else if ( execType.equalsIgnoreCase("MR") ) {
 			MRINSTRUCTION_TYPE mrtype = InstructionUtils.getMRType(str); 
-			if ( mrtype == null )
-				throw new DMLRuntimeException("Can not determine MRType for instruction: " + str);
+			if( mrtype == null )
+				throw new DMLRuntimeException("Unknown MR instruction: " + str);
 			return MRInstructionParser.parseSingleInstruction (mrtype, str);
 		}
 		else {

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/feef7e0c/src/main/java/org/apache/sysml/runtime/instructions/InstructionUtils.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/InstructionUtils.java b/src/main/java/org/apache/sysml/runtime/instructions/InstructionUtils.java
index 5dc66af..8c6b6b5 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/InstructionUtils.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/InstructionUtils.java
@@ -233,9 +233,7 @@ public class InstructionUtils
 	 * @return
 	 */
 	public static MRINSTRUCTION_TYPE getMRType( String str ) {
-		String opcode = getOpCode(str);
-		MRINSTRUCTION_TYPE mrtype = MRInstructionParser.String2MRInstructionType.get( opcode ); 
-		return mrtype;
+		return MRInstructionParser.String2MRInstructionType.get( getOpCode(str) ); 
 	}
 	
 	/**
@@ -244,9 +242,7 @@ public class InstructionUtils
 	 * @return
 	 */
 	public static SPINSTRUCTION_TYPE getSPType( String str ) {
-		String opcode = getOpCode(str);
-		SPINSTRUCTION_TYPE sptype = SPInstructionParser.String2SPInstructionType.get( opcode ); 
-		return sptype;
+		return SPInstructionParser.String2SPInstructionType.get( getOpCode(str) ); 
 	}
 	
 	/**
@@ -255,22 +251,16 @@ public class InstructionUtils
 	 * @return
 	 */
 	public static CPINSTRUCTION_TYPE getCPType( String str ) {
-		String opcode = getOpCode(str);
-		CPINSTRUCTION_TYPE cptype = CPInstructionParser.String2CPInstructionType.get( opcode ); 
-		return cptype;
+		return CPInstructionParser.String2CPInstructionType.get( getOpCode(str) ); 
 	}
 	
 	/**
 	 * 
 	 * @param str
 	 * @return
-	 * @throws DMLUnsupportedOperationException
 	 */
-	public static GPUINSTRUCTION_TYPE getGPUType( String str ) 
-	{
-		String opcode = getOpCode(str);
-		GPUINSTRUCTION_TYPE cptype = GPUInstructionParser.String2GPUInstructionType.get( opcode ); 
-		return cptype;
+	public static GPUINSTRUCTION_TYPE getGPUType( String str ) {
+		return GPUInstructionParser.String2GPUInstructionType.get( getOpCode(str) ); 
 	}
 	
 	/**

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/feef7e0c/src/test/java/org/apache/sysml/test/integration/functions/unary/scalar/FullDistributionTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/sysml/test/integration/functions/unary/scalar/FullDistributionTest.java b/src/test/java/org/apache/sysml/test/integration/functions/unary/scalar/FullDistributionTest.java
index 82346de..a49deee 100644
--- a/src/test/java/org/apache/sysml/test/integration/functions/unary/scalar/FullDistributionTest.java
+++ b/src/test/java/org/apache/sysml/test/integration/functions/unary/scalar/FullDistributionTest.java
@@ -23,22 +23,31 @@ import java.util.HashMap;
 import java.util.Random;
 
 import org.junit.Test;
-
+import org.apache.sysml.api.DMLScript;
+import org.apache.sysml.api.DMLScript.RUNTIME_PLATFORM;
+import org.apache.sysml.lops.LopProperties.ExecType;
 import org.apache.sysml.runtime.matrix.data.MatrixValue.CellIndex;
 import org.apache.sysml.test.integration.AutomatedTestBase;
 import org.apache.sysml.test.integration.TestConfiguration;
 import org.apache.sysml.test.utils.TestUtils;
 
+/**
+ * Test case for all cdf distribution functions, where we test the specific builtin 
+ * functions (which are equivalent to the generic cdf with specific parameterizations) 
+ *
+ */
 public class FullDistributionTest extends AutomatedTestBase 
 {
-	
 	private final static String TEST_NAME = "DFTest";
-	
-	enum TEST_TYPE { NORMAL, NORMAL_NOPARAMS, NORMAL_MEAN, NORMAL_SD, F, T, CHISQ, EXP, EXP_NOPARAMS };
-	
 	private final static String TEST_DIR = "functions/unary/scalar/";
 	private final static String TEST_CLASS_DIR = TEST_DIR + FullDistributionTest.class.getSimpleName() + "/";
 	
+	private enum TEST_TYPE { 
+		NORMAL, NORMAL_NOPARAMS, NORMAL_MEAN, 
+		NORMAL_SD, F, T, CHISQ, EXP, EXP_NOPARAMS 
+	};
+	
+	
 	@Override
 	public void setUp() {
 		TestUtils.clearAssertionInformation();
@@ -46,89 +55,195 @@ public class FullDistributionTest extends AutomatedTestBase
 	}
 	
 	@Test
-	public void testNormal() {
-		runDFTest(TEST_TYPE.NORMAL, true, 1.0, 2.0);
+	public void testNormalCP() {
+		runDFTest(TEST_TYPE.NORMAL, true, 1.0, 2.0, ExecType.CP);
 	}
 	
 	@Test
-	public void testNormalNoParams() {
-		runDFTest(TEST_TYPE.NORMAL_NOPARAMS, true, null, null);
+	public void testNormalNoParamsCP() {
+		runDFTest(TEST_TYPE.NORMAL_NOPARAMS, true, null, null, ExecType.CP);
 	}
 	
 	@Test
-	public void testNormalMean() {
-		runDFTest(TEST_TYPE.NORMAL_MEAN, true, 1.0, null);
+	public void testNormalMeanCP() {
+		runDFTest(TEST_TYPE.NORMAL_MEAN, true, 1.0, null, ExecType.CP);
 	}
 	
 	@Test
-	public void testNormalSd() {
-		runDFTest(TEST_TYPE.NORMAL_SD, true, 2.0, null);
+	public void testNormalSdCP() {
+		runDFTest(TEST_TYPE.NORMAL_SD, true, 2.0, null, ExecType.CP);
 	}
 	
 	@Test
-	public void testT() {
-		runDFTest(TEST_TYPE.T, true, 10.0, null);
+	public void testTCP() {
+		runDFTest(TEST_TYPE.T, true, 10.0, null, ExecType.CP);
 	}
 	
 	@Test
-	public void testF() {
-		runDFTest(TEST_TYPE.T, true, 10.0, 20.0);
+	public void testFCP() {
+		runDFTest(TEST_TYPE.T, true, 10.0, 20.0, ExecType.CP);
 	}
 	
 	@Test
-	public void testChisq() {
-		runDFTest(TEST_TYPE.CHISQ, true, 10.0, null);
+	public void testChisqCP() {
+		runDFTest(TEST_TYPE.CHISQ, true, 10.0, null, ExecType.CP);
 	}
 	
 	@Test
-	public void testExp() {
-		runDFTest(TEST_TYPE.EXP, true, 5.0, null);
+	public void testExpCP() {
+		runDFTest(TEST_TYPE.EXP, true, 5.0, null, ExecType.CP);
 	}
-	
-	private void runDFTest(TEST_TYPE type, boolean inverse, Double param1, Double param2) {
-		getAndLoadTestConfiguration(TEST_NAME);
 
-		double in = (new Random(System.nanoTime())).nextDouble();
-		
-		String HOME = SCRIPT_DIR + TEST_DIR;
-		fullDMLScriptName = HOME + TEST_NAME + "_" + type.toString() + ".dml";
-		fullRScriptName = HOME + TEST_NAME + "_" + type.toString() + ".R";
-		
-		String DMLout = output("dfout");
-		String Rout = expected("dfout");
+	@Test
+	public void testNormalSpark() {
+		runDFTest(TEST_TYPE.NORMAL, true, 1.0, 2.0, ExecType.SPARK);
+	}
+	
+	@Test
+	public void testNormalNoParamsSpark() {
+		runDFTest(TEST_TYPE.NORMAL_NOPARAMS, true, null, null, ExecType.SPARK);
+	}
+	
+	@Test
+	public void testNormalMeanSpark() {
+		runDFTest(TEST_TYPE.NORMAL_MEAN, true, 1.0, null, ExecType.SPARK);
+	}
+	
+	@Test
+	public void testNormalSdSpark() {
+		runDFTest(TEST_TYPE.NORMAL_SD, true, 2.0, null, ExecType.SPARK);
+	}
+	
+	@Test
+	public void testTSpark() {
+		runDFTest(TEST_TYPE.T, true, 10.0, null, ExecType.SPARK);
+	}
+	
+	@Test
+	public void testFSpark() {
+		runDFTest(TEST_TYPE.T, true, 10.0, 20.0, ExecType.SPARK);
+	}
+	
+	@Test
+	public void testChisqSpark() {
+		runDFTest(TEST_TYPE.CHISQ, true, 10.0, null, ExecType.SPARK);
+	}
+	
+	@Test
+	public void testExpSpark() {
+		runDFTest(TEST_TYPE.EXP, true, 5.0, null, ExecType.SPARK);
+	}
+	
+	@Test
+	public void testNormalMR() {
+		runDFTest(TEST_TYPE.NORMAL, true, 1.0, 2.0, ExecType.MR);
+	}
+	
+	@Test
+	public void testNormalNoParamsMR() {
+		runDFTest(TEST_TYPE.NORMAL_NOPARAMS, true, null, null, ExecType.MR);
+	}
+	
+	@Test
+	public void testNormalMeanMR() {
+		runDFTest(TEST_TYPE.NORMAL_MEAN, true, 1.0, null, ExecType.MR);
+	}
+	
+	@Test
+	public void testNormalSdMR() {
+		runDFTest(TEST_TYPE.NORMAL_SD, true, 2.0, null, ExecType.MR);
+	}
+	
+	@Test
+	public void testTMR() {
+		runDFTest(TEST_TYPE.T, true, 10.0, null, ExecType.MR);
+	}
+	
+	@Test
+	public void testFMR() {
+		runDFTest(TEST_TYPE.T, true, 10.0, 20.0, ExecType.MR);
+	}
+	
+	@Test
+	public void testChisqMR() {
+		runDFTest(TEST_TYPE.CHISQ, true, 10.0, null, ExecType.MR);
+	}
+	
+	@Test
+	public void testExpMR() {
+		runDFTest(TEST_TYPE.EXP, true, 5.0, null, ExecType.MR);
+	}
+	
+	/**
+	 * Internal test method - all these tests are expected to run in CP independent of the passed
+	 * instType. However, we test all backends to ensure correct compilation in the presence of
+	 * forced execution types.
+	 * 
+	 * @param type
+	 * @param inverse
+	 * @param param1
+	 * @param param2
+	 * @param instType
+	 */
+	private void runDFTest(TEST_TYPE type, boolean inverse, Double param1, Double param2, ExecType instType) 
+	{
+		//setup multi backend configuration
+		RUNTIME_PLATFORM platformOld = rtplatform;
+		switch( instType ){
+			case MR: rtplatform = RUNTIME_PLATFORM.HADOOP; break;
+			case SPARK: rtplatform = RUNTIME_PLATFORM.SPARK; break;
+			default: rtplatform = RUNTIME_PLATFORM.HYBRID; break;
+		}
+		boolean sparkConfigOld = DMLScript.USE_LOCAL_SPARK_CONFIG;
+		if( rtplatform == RUNTIME_PLATFORM.SPARK )
+			DMLScript.USE_LOCAL_SPARK_CONFIG = true;
 		
-		switch(type) {
-		case NORMAL_NOPARAMS:
-			programArgs = new String[]{"-args", Double.toString(in), DMLout };
-			rCmd = "Rscript" + " " + fullRScriptName + " " + Double.toString(in) + " " + Rout;
-			break;
+		try
+		{
+			//set test and parameters
+			getAndLoadTestConfiguration(TEST_NAME);
+			double in = (new Random(System.nanoTime())).nextDouble();
+			String HOME = SCRIPT_DIR + TEST_DIR;
+			fullDMLScriptName = HOME + TEST_NAME + "_" + type.toString() + ".dml";
+			fullRScriptName = HOME + TEST_NAME + "_" + type.toString() + ".R";
 			
-		case NORMAL_MEAN:
-		case NORMAL_SD:
-		case T:
-		case CHISQ:
-		case EXP:
-			programArgs = new String[]{"-args", Double.toString(in), Double.toString(param1), DMLout };
-			rCmd = "Rscript" + " " + fullRScriptName + " " + Double.toString(in) + " " + Double.toString(param1) + " " + Rout;
-			break;
+			switch(type) {
+				case NORMAL_NOPARAMS:
+					programArgs = new String[]{"-args", Double.toString(in), output("dfout") };
+					rCmd = "Rscript" + " " + fullRScriptName + " " + Double.toString(in) + " " + expected("dfout");
+					break;
+					
+				case NORMAL_MEAN:
+				case NORMAL_SD:
+				case T:
+				case CHISQ:
+				case EXP:
+					programArgs = new String[]{"-args", Double.toString(in), Double.toString(param1), output("dfout") };
+					rCmd = "Rscript" + " " + fullRScriptName + " " + Double.toString(in) + " " + Double.toString(param1) + " " + expected("dfout");
+					break;
+					
+				case NORMAL:
+				case F:
+					programArgs = new String[]{"-args", Double.toString(in), Double.toString(param1), Double.toString(param2), output("dfout") };
+					rCmd = "Rscript" + " " + fullRScriptName + " " + Double.toString(in) + " " + Double.toString(param1) + " " + Double.toString(param2) + " " + expected("dfout");
+					break;
+				
+				default: 
+					throw new RuntimeException("Invalid distribution function: " + type);
+			}
 			
-		case NORMAL:
-		case F:
-			programArgs = new String[]{"-args", Double.toString(in), Double.toString(param1), Double.toString(param2), DMLout };
-			rCmd = "Rscript" + " " + fullRScriptName + " " + Double.toString(in) + " " + Double.toString(param1) + " " + Double.toString(param2) + " " + Rout;
-			break;
-		
-			default: 
-				throw new RuntimeException("Invalid distribution function: " + type);
+			//run test
+			runTest(true, false, null, -1); 
+			runRScript(true); 
+			
+			//compare results
+			HashMap<CellIndex, Double> dmlfile = readDMLMatrixFromHDFS("dfout");
+			HashMap<CellIndex, Double> rfile  = readRMatrixFromFS("dfout");
+			TestUtils.compareMatrices(dmlfile, rfile, 1e-8, "DMLout", "Rout");
+		}
+		finally {
+			rtplatform = platformOld;
+			DMLScript.USE_LOCAL_SPARK_CONFIG = sparkConfigOld;
 		}
-		
-		runTest(true, false, null, -1); 
-		runRScript(true); 
-		
-		HashMap<CellIndex, Double> dmlfile = readDMLMatrixFromHDFS("dfout");
-		HashMap<CellIndex, Double> rfile  = readRMatrixFromFS("dfout");
-		TestUtils.compareMatrices(dmlfile, rfile, 1e-8, "DMLout", "Rout");
-
 	}
-	
 }