You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemds.apache.org by ba...@apache.org on 2021/08/26 11:49:30 UTC

[systemds] branch master updated: [SYSTEMDS-3103] CLA Spark Instructions without Correction itermediates

This is an automated email from the ASF dual-hosted git repository.

baunsgaard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/systemds.git


The following commit(s) were added to refs/heads/master by this push:
     new 0a2a3b0  [SYSTEMDS-3103] CLA Spark Instructions without Correction itermediates
0a2a3b0 is described below

commit 0a2a3b050803673c49f710ab69089c35d1afbc1e
Author: baunsgaard <ba...@tugraz.at>
AuthorDate: Wed Aug 25 22:19:45 2021 +0200

    [SYSTEMDS-3103] CLA Spark Instructions without Correction itermediates
    
    This commit addresses some of the aggregate functions in spark that
    remove correction after aggregation, to then process further.
    This means in practice that compressed blocks would have to actively
    add a 0 correction after construction, to avoid this we simply call the
    CP version of the instruction that 1. removes correction if executed
    normally and 2. does not add the correction if it is already removed.
    
    Also added is various test updates.
    
    Closes #1346
---
 .../runtime/compress/CompressedMatrixBlock.java    |   7 +-
 .../runtime/compress/utils/CustomHashMap.java      |  64 -----------
 .../instructions/cp/CompressionCPInstruction.java  |  20 ++--
 .../spark/AggregateUnarySPInstruction.java         |   5 +-
 .../spark/BinUaggChainSPInstruction.java           |  18 ++-
 .../spark/DeCompressionSPInstruction.java          |  17 ++-
 .../spark/utils/RDDAggregateUtils.java             |   2 +-
 .../functions/compress/CompressRewriteSpark.java   | 128 +++++++++++++++++++++
 .../compress/configuration/CompressForce.java      |   5 -
 .../compress/workload/WorkloadAlgorithmTest.java   |  60 ++++++++--
 .../compress/workload/WorkloadAnalysisLm.dml       |  31 -----
 11 files changed, 216 insertions(+), 141 deletions(-)

diff --git a/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlock.java b/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlock.java
index 033368e..2c205f3 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlock.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlock.java
@@ -107,7 +107,10 @@ public class CompressedMatrixBlock extends MatrixBlock {
 	private static final Log LOG = LogFactory.getLog(CompressedMatrixBlock.class.getName());
 	private static final long serialVersionUID = 73193720143154058L;
 
-	protected List<AColGroup> _colGroups;
+	/**
+	 * Column groups
+	 */
+	protected transient List<AColGroup> _colGroups;
 
 	/**
 	 * Boolean specifying if the colGroups are overlapping each other. This happens after a right matrix multiplication.
@@ -117,7 +120,7 @@ public class CompressedMatrixBlock extends MatrixBlock {
 	/**
 	 * Soft reference to a decompressed version of this matrix block.
 	 */
-	protected SoftReference<MatrixBlock> decompressedVersion;
+	protected transient SoftReference<MatrixBlock> decompressedVersion;
 
 	public CompressedMatrixBlock() {
 		super(true);
diff --git a/src/main/java/org/apache/sysds/runtime/compress/utils/CustomHashMap.java b/src/main/java/org/apache/sysds/runtime/compress/utils/CustomHashMap.java
deleted file mode 100644
index 64bee19..0000000
--- a/src/main/java/org/apache/sysds/runtime/compress/utils/CustomHashMap.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.sysds.runtime.compress.utils;
-
-import org.apache.sysds.runtime.DMLRuntimeException;
-
-/**
- * This class provides a memory-efficient base for Custom HashMaps for restricted use cases.
- */
-public abstract class CustomHashMap {
-	protected static final int INIT_CAPACITY = 8;
-	protected static final int RESIZE_FACTOR = 2;
-	protected static final float LOAD_FACTOR = 0.50f;
-
-	protected int _size = -1;
-
-	public int size() {
-		return _size;
-	}
-
-	/**
-	 * Joins the two lists of hashmaps together to form one list containing element wise joins of the hashmaps.
-	 * 
-	 * Also note that the join modifies the left hand side hash map such that it contains the joined values. All values
-	 * in the right hand side is appended to the left hand side, such that the order of the elements is constant after
-	 * the join.
-	 * 
-	 * @param left  The left side hashmaps
-	 * @param right The right side hashmaps
-	 * @return The element-wise join of the two hashmaps.
-	 */
-	public static CustomHashMap[] joinHashMaps(CustomHashMap[] left, CustomHashMap[] right) {
-
-		if(left.length == right.length) {
-			for(int i = 0; i < left.length; i++) {
-				left[i].joinHashMap(right[i]);
-			}
-		}else{
-			throw new DMLRuntimeException("Invalid element wise join of two Hashmaps, of different length.");
-		}
-
-		return left;
-	}
-
-	public CustomHashMap joinHashMap(CustomHashMap that) {
-		return this;
-	}
-}
diff --git a/src/main/java/org/apache/sysds/runtime/instructions/cp/CompressionCPInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/cp/CompressionCPInstruction.java
index b3acc26..c55ab2a 100644
--- a/src/main/java/org/apache/sysds/runtime/instructions/cp/CompressionCPInstruction.java
+++ b/src/main/java/org/apache/sysds/runtime/instructions/cp/CompressionCPInstruction.java
@@ -37,7 +37,6 @@ public class CompressionCPInstruction extends ComputationCPInstruction {
 
 	private final int _singletonLookupID;
 
-
 	private CompressionCPInstruction(Operator op, CPOperand in, CPOperand out, String opcode, String istr,
 		int singletonLookupID) {
 		super(CPType.Compression, op, in, null, null, out, opcode, istr);
@@ -54,26 +53,29 @@ public class CompressionCPInstruction extends ComputationCPInstruction {
 			int treeNodeID = Integer.parseInt(parts[3]);
 			return new CompressionCPInstruction(null, in1, out, opcode, str, treeNodeID);
 		}
-		else {
+		else
 			return new CompressionCPInstruction(null, in1, out, opcode, str, 0);
-		}
 	}
 
 	@Override
 	public void processInstruction(ExecutionContext ec) {
 		// Get matrix block input
-		MatrixBlock in = ec.getMatrixInput(input1.getName());
-		SingletonLookupHashMap m = SingletonLookupHashMap.getMap();
+		final MatrixBlock in = ec.getMatrixInput(input1.getName());
+		final SingletonLookupHashMap m = SingletonLookupHashMap.getMap();
+
+		// Get and clear workload tree entry for this compression instruction.
+		final WTreeRoot root = (_singletonLookupID != 0) ? (WTreeRoot) m.get(_singletonLookupID) : null;
+		m.removeKey(_singletonLookupID);
+
+		final int k = OptimizerUtils.getConstrainedNumThreads(-1);
 
-		WTreeRoot root = (_singletonLookupID != 0) ? (WTreeRoot) m.get(_singletonLookupID) : null;
 		// Compress the matrix block
-		Pair<MatrixBlock, CompressionStatistics> compResult = CompressedMatrixBlockFactory.compress(in, OptimizerUtils.getConstrainedNumThreads(-1), root);
+		Pair<MatrixBlock, CompressionStatistics> compResult = CompressedMatrixBlockFactory.compress(in, k, root);
 
 		if(LOG.isTraceEnabled())
 			LOG.trace(compResult.getRight());
 		MatrixBlock out = compResult.getLeft();
-		
-		m.removeKey(_singletonLookupID);
+
 		// Set output and release input
 		ec.releaseMatrixInput(input1.getName());
 		ec.setMatrixOutput(output.getName(), out);
diff --git a/src/main/java/org/apache/sysds/runtime/instructions/spark/AggregateUnarySPInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/spark/AggregateUnarySPInstruction.java
index c135b01..04b6650 100644
--- a/src/main/java/org/apache/sysds/runtime/instructions/spark/AggregateUnarySPInstruction.java
+++ b/src/main/java/org/apache/sysds/runtime/instructions/spark/AggregateUnarySPInstruction.java
@@ -301,10 +301,7 @@ public class AggregateUnarySPInstruction extends UnarySPInstruction {
 			MatrixBlock blkOut = new MatrixBlock();
 			
 			//unary aggregate operation
-			arg0.aggregateUnaryOperations(_op, blkOut, _blen, _ix);
-			
-			//always drop correction since no aggregation
-			blkOut.dropLastRowsOrColumns(_op.aggOp.correction);
+			arg0.aggregateUnaryOperations(_op, blkOut, _blen, _ix, true);
 			
 			//output new tuple
 			return blkOut;
diff --git a/src/main/java/org/apache/sysds/runtime/instructions/spark/BinUaggChainSPInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/spark/BinUaggChainSPInstruction.java
index dea6260..4ea0284 100644
--- a/src/main/java/org/apache/sysds/runtime/instructions/spark/BinUaggChainSPInstruction.java
+++ b/src/main/java/org/apache/sysds/runtime/instructions/spark/BinUaggChainSPInstruction.java
@@ -68,7 +68,7 @@ public class BinUaggChainSPInstruction extends UnarySPInstruction {
 		
 		//set output RDD
 		updateUnaryOutputDataCharacteristics(sec);
-		sec.setRDDHandleForVariable(output.getName(), out);	
+		sec.setRDDHandleForVariable(output.getName(), out);
 		sec.addLineageRDD(output.getName(), input1.getName());
 	}
 
@@ -90,16 +90,14 @@ public class BinUaggChainSPInstruction extends UnarySPInstruction {
 		{
 			int blen = arg0.getNumRows();
 			
-			//perform unary aggregate operation
-			MatrixBlock out1 = new MatrixBlock();
-			arg0.aggregateUnaryOperations(_uaggOp, out1, blen, null);
+			// perform unary aggregate operation
+			// true for CP instruction since we want to remove correction part anyway.
+			MatrixBlock out1 = arg0.aggregateUnaryOperations(_uaggOp, null, blen, null, true);
 			
-			//strip-off correction
-			out1.dropLastRowsOrColumns(_uaggOp.aggOp.correction);
-		
-			//perform binary operation
-			MatrixBlock out2 = new MatrixBlock();
-			return arg0.binaryOperations(_bOp, out1, out2);
+			// perform binary operation
+			MatrixBlock ret = arg0.binaryOperations(_bOp, out1);
+
+			return ret;
 		}
 	}
 }
diff --git a/src/main/java/org/apache/sysds/runtime/instructions/spark/DeCompressionSPInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/spark/DeCompressionSPInstruction.java
index d002d55..fc1898d 100644
--- a/src/main/java/org/apache/sysds/runtime/instructions/spark/DeCompressionSPInstruction.java
+++ b/src/main/java/org/apache/sysds/runtime/instructions/spark/DeCompressionSPInstruction.java
@@ -52,23 +52,32 @@ public class DeCompressionSPInstruction extends UnarySPInstruction {
 		JavaPairRDD<MatrixIndexes, MatrixBlock> in = sec.getBinaryMatrixBlockRDDHandleForVariable(input1.getName());
 
 		// execute decompression
-		JavaPairRDD<MatrixIndexes, MatrixBlock> out = in.mapValues(new DeCompressionFunction());
+		JavaPairRDD<MatrixIndexes, MatrixBlock> out = 
+			in.mapValues(new DeCompressionFunction());
 
 		DMLCompressionStatistics.addDecompressSparkCount();
 		// set outputs
+		updateUnaryOutputDataCharacteristics(sec);
 		sec.setRDDHandleForVariable(output.getName(), out);
 		sec.addLineageRDD(input1.getName(), output.getName());
 	}
 
 	public static class DeCompressionFunction implements Function<MatrixBlock, MatrixBlock> {
-		private static final long serialVersionUID = -6528833083609413922L;
+		private static final long serialVersionUID = -65288330836413922L;
+
+		public DeCompressionFunction(){
+			// do nothing.
+		}
 
 		@Override
 		public MatrixBlock call(MatrixBlock arg0) throws Exception {
 			if(arg0 instanceof CompressedMatrixBlock) 
 				return ((CompressedMatrixBlock) arg0).decompress(OptimizerUtils.getConstrainedNumThreads(-1));
-			else 
-				return arg0;
+			else{
+				MatrixBlock ret = new MatrixBlock();
+				ret.copy(arg0);
+				return ret;
+			}
 		}
 	}
 }
diff --git a/src/main/java/org/apache/sysds/runtime/instructions/spark/utils/RDDAggregateUtils.java b/src/main/java/org/apache/sysds/runtime/instructions/spark/utils/RDDAggregateUtils.java
index 3df024d..3a3e960 100644
--- a/src/main/java/org/apache/sysds/runtime/instructions/spark/utils/RDDAggregateUtils.java
+++ b/src/main/java/org/apache/sysds/runtime/instructions/spark/utils/RDDAggregateUtils.java
@@ -47,7 +47,7 @@ import org.apache.sysds.runtime.matrix.operators.AggregateUnaryOperator;
  * and with that acceptable. 
  * 
  */
-public class RDDAggregateUtils 
+public class RDDAggregateUtils
 {	
 	//internal configuration to use tree aggregation (treeReduce w/ depth=2),
 	//this is currently disabled because it was 2x slower than a simple
diff --git a/src/test/java/org/apache/sysds/test/functions/compress/CompressRewriteSpark.java b/src/test/java/org/apache/sysds/test/functions/compress/CompressRewriteSpark.java
new file mode 100644
index 0000000..fe3cda5
--- /dev/null
+++ b/src/test/java/org/apache/sysds/test/functions/compress/CompressRewriteSpark.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.test.functions.compress;
+
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.sysds.common.Types;
+import org.apache.sysds.common.Types.ExecMode;
+import org.apache.sysds.test.AutomatedTestBase;
+import org.apache.sysds.test.TestConfiguration;
+import org.apache.sysds.test.TestUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class CompressRewriteSpark extends AutomatedTestBase {
+	private static final Log LOG = LogFactory.getLog(CompressRewriteSpark.class.getName());
+
+	private static final String dataPath = "src/test/scripts/functions/compress/densifying/";
+	private final static String TEST_DIR = "functions/compress/";
+
+	protected String getTestClassDir() {
+		return getTestDir() + this.getClass().getSimpleName() + "/";
+	}
+
+	protected String getTestName() {
+		return "compress";
+	}
+
+	protected String getTestDir() {
+		return "functions/compress/densifying/";
+	}
+
+	@Test
+	public void testCompressInstruction_small() {
+		compressTest(ExecMode.HYBRID, "01", "small.ijv");
+	}
+
+	@Test
+	public void testCompressInstruction_large() {
+		compressTest(ExecMode.HYBRID, "01", "large.ijv");
+	}
+
+	@Test
+	public void testCompressInstruction_large_vector_right() {
+		compressTest(ExecMode.HYBRID, "02", "large.ijv");
+	}
+
+
+	@Test 
+	public void testCompressionInstruction_colmean(){
+		compressTest(ExecMode.HYBRID,"submean", "large.ijv");
+	}
+
+
+	@Test 
+	public void testCompressionInstruction_scale(){
+		compressTest(ExecMode.HYBRID,"scale", "large.ijv");
+	}
+
+
+	@Test 
+	public void testCompressionInstruction_seq_large(){
+		compressTest(ExecMode.HYBRID,"seq", "large.ijv");
+	}
+
+	@Test 
+	public void testCompressionInstruction_pca_large(){
+		compressTest(ExecMode.HYBRID,"pca", "large.ijv");
+	}
+
+	public void compressTest(ExecMode instType, String name, String data) {
+
+		Types.ExecMode platformOld = setExecMode(instType);
+		try {
+
+			loadTestConfiguration(getTestConfiguration(getTestName()));
+
+			fullDMLScriptName = SCRIPT_DIR + "/" + getTestDir() + "compress_" + name + ".dml";
+
+			programArgs = new String[] {"-stats", "100", "-explain", "-args", dataPath + data};
+
+			LOG.debug(runTest(null));
+
+			Assert.assertTrue(!heavyHittersContainsString("sp_compress"));
+			Assert.assertTrue(!heavyHittersContainsString("sp_+"));
+
+		}
+		catch(Exception e) {
+			e.printStackTrace();
+			assertTrue("Exception in execution: " + e.getMessage(), false);
+		}
+		finally {
+			rtplatform = platformOld;
+		}
+	}
+
+	@Override
+	public void setUp() {
+		TestUtils.clearAssertionInformation();
+		addTestConfiguration(getTestName(), new TestConfiguration(getTestClassDir(), getTestName()));
+	}
+
+	@Override
+	protected File getConfigTemplateFile() {
+		return new File(SCRIPT_DIR + TEST_DIR, "SystemDS-config-compress.xml");
+	}
+}
diff --git a/src/test/java/org/apache/sysds/test/functions/compress/configuration/CompressForce.java b/src/test/java/org/apache/sysds/test/functions/compress/configuration/CompressForce.java
index 47c8603..84be096 100644
--- a/src/test/java/org/apache/sysds/test/functions/compress/configuration/CompressForce.java
+++ b/src/test/java/org/apache/sysds/test/functions/compress/configuration/CompressForce.java
@@ -96,31 +96,26 @@ public class CompressForce extends CompressBase {
 	}
 
 	@Test
-	@Ignore
 	public void test_ElementWiseBinaryMultiplyOp_right_CP() {
 		runTest(1500, 1, 1, 1, ExecType.CP, "ewbm_right");
 	}
 
 	@Test
-	@Ignore
 	public void test_ElementWiseBinaryMultiplyOp_right_SP() {
 		runTest(1500, 1, 2, 1, ExecType.SPARK, "ewbm_right");
 	}
 
 	@Test
-	@Ignore
 	public void test_ElementWiseBinaryMultiplyOp_left_CP() {
 		runTest(1500, 1, 1, 1, ExecType.CP, "ewbm_left");
 	}
 
 	@Test
-	@Ignore
 	public void test_ElementWiseBinaryMultiplyOp_left_SP() {
 		runTest(1500, 1, 2, 1, ExecType.SPARK, "ewbm_left");
 	}
 
 	@Test
-	@Ignore
 	public void test_ElementWiseBinaryMultiplyOp_left_SP_larger() {
 		runTest(1500, 15, 2, 1, ExecType.SPARK, "ewbm_left");
 	}
diff --git a/src/test/java/org/apache/sysds/test/functions/compress/workload/WorkloadAlgorithmTest.java b/src/test/java/org/apache/sysds/test/functions/compress/workload/WorkloadAlgorithmTest.java
index 04fe79e..9ffeb02 100644
--- a/src/test/java/org/apache/sysds/test/functions/compress/workload/WorkloadAlgorithmTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/compress/workload/WorkloadAlgorithmTest.java
@@ -23,6 +23,8 @@ import static org.junit.Assert.fail;
 
 import java.io.File;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.sysds.common.Types.ExecMode;
 import org.apache.sysds.runtime.compress.workload.WorkloadAnalyzer;
 import org.apache.sysds.test.AutomatedTestBase;
@@ -34,13 +36,30 @@ import org.junit.Test;
 
 public class WorkloadAlgorithmTest extends AutomatedTestBase {
 
+	private static final Log LOG = LogFactory.getLog(WorkloadAnalysisTest.class.getName());
+
 	private final static String TEST_NAME1 = "WorkloadAnalysisMLogReg";
-	private final static String TEST_NAME2 = "WorkloadAnalysisLm";
+	private final static String TEST_NAME2 = "WorkloadAnalysisLmDS";
 	private final static String TEST_NAME3 = "WorkloadAnalysisPCA";
 	private final static String TEST_NAME4 = "WorkloadAnalysisSliceLine";
+	private final static String TEST_NAME5 = "WorkloadAnalysisSliceFinder";
+	private final static String TEST_NAME6 = "WorkloadAnalysisLmCG";
 	private final static String TEST_DIR = "functions/compress/workload/";
 	private final static String TEST_CLASS_DIR = TEST_DIR + WorkloadAnalysisTest.class.getSimpleName() + "/";
 
+	private int nRows = 1000;
+
+	private double[][] X;
+	private double[][] y;
+
+	public WorkloadAlgorithmTest() {
+		X = TestUtils.round(getRandomMatrix(nRows, 20, 0, 5, 1.0, 7));
+		y = getRandomMatrix(nRows, 1, 0, 0, 1.0, 3);
+
+		for(int i = 0; i < X.length; i++)
+			y[i][0] = Math.max(X[i][0], 1);
+	}
+
 	@Override
 	public void setUp() {
 		TestUtils.clearAssertionInformation();
@@ -48,6 +67,8 @@ public class WorkloadAlgorithmTest extends AutomatedTestBase {
 		addTestConfiguration(TEST_NAME2, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME2, new String[] {"B"}));
 		addTestConfiguration(TEST_NAME3, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME3, new String[] {"B"}));
 		addTestConfiguration(TEST_NAME4, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME4, new String[] {"B"}));
+		addTestConfiguration(TEST_NAME5, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME5, new String[] {"B"}));
+		addTestConfiguration(TEST_NAME6, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME6, new String[] {"B"}));
 	}
 
 	@Test
@@ -66,6 +87,16 @@ public class WorkloadAlgorithmTest extends AutomatedTestBase {
 	}
 
 	@Test
+	public void testLmDSSP() {
+		runWorkloadAnalysisTest(TEST_NAME2, ExecMode.SPARK, 2, false);
+	}
+
+	@Test
+	public void testLmDSCP() {
+		runWorkloadAnalysisTest(TEST_NAME2, ExecMode.HYBRID, 2, false);
+	}
+
+	@Test
 	public void testPCASP() {
 		runWorkloadAnalysisTest(TEST_NAME3, ExecMode.SPARK, 1, false);
 	}
@@ -79,12 +110,23 @@ public class WorkloadAlgorithmTest extends AutomatedTestBase {
 	public void testSliceLineCP1() {
 		runWorkloadAnalysisTest(TEST_NAME4, ExecMode.HYBRID, 0, false);
 	}
-	
+
 	@Test
 	public void testSliceLineCP2() {
 		runWorkloadAnalysisTest(TEST_NAME4, ExecMode.HYBRID, 2, true);
 	}
+
+	@Test
+	public void testLmCGSP() {
+		runWorkloadAnalysisTest(TEST_NAME6, ExecMode.SPARK, 2, false);
+	}
 	
+	@Test
+	public void testLmCGCP() {
+		runWorkloadAnalysisTest(TEST_NAME6, ExecMode.HYBRID, 2, false);
+	}
+	
+	// private void runWorkloadAnalysisTest(String testname, ExecMode mode, int compressionCount) {
 	private void runWorkloadAnalysisTest(String testname, ExecMode mode, int compressionCount, boolean intermediates) {
 		ExecMode oldPlatform = setExecMode(mode);
 		boolean oldIntermediates = WorkloadAnalyzer.ALLOW_INTERMEDIATE_CANDIDATES;
@@ -95,21 +137,16 @@ public class WorkloadAlgorithmTest extends AutomatedTestBase {
 			
 			String HOME = SCRIPT_DIR + TEST_DIR;
 			fullDMLScriptName = HOME + testname + ".dml";
-			programArgs = new String[] {"-explain","-stats",
-				"20", "-args", input("X"), input("y"), output("B")};
+			programArgs = new String[] {"-stats", "20", "-args", input("X"), input("y"), output("B")};
 
-			double[][] X = TestUtils.round(getRandomMatrix(10000, 20, 0, 10, 1.0, 7));
 			writeInputMatrixWithMTD("X", X, false);
-			double[][] y = getRandomMatrix(10000, 1, 1, 1, 1.0, 3);
-			for(int i = 0; i < X.length; i++) {
-				y[i][0] = Math.max(X[i][0], 1);
-			}
 			writeInputMatrixWithMTD("y", y, false);
 
-			runTest(null);
+			String ret = runTest(null).toString();
+			LOG.debug(ret);
 
 			// check various additional expectations
-			long actualCompressionCount = mode == ExecMode.HYBRID ? Statistics
+			long actualCompressionCount = (mode == ExecMode.HYBRID || mode == ExecMode.SINGLE_NODE) ? Statistics
 				.getCPHeavyHitterCount("compress") : Statistics.getCPHeavyHitterCount("sp_compress");
 
 			Assert.assertEquals(compressionCount, actualCompressionCount);
@@ -118,6 +155,7 @@ public class WorkloadAlgorithmTest extends AutomatedTestBase {
 					heavyHittersContainsString("compress") : heavyHittersContainsString("sp_compress"));
 			if( !testname.equals(TEST_NAME4) )
 				Assert.assertFalse(heavyHittersContainsString("m_scale"));
+
 		}
 		catch(Exception e) {
 			resetExecMode(oldPlatform);
diff --git a/src/test/scripts/functions/compress/workload/WorkloadAnalysisLm.dml b/src/test/scripts/functions/compress/workload/WorkloadAnalysisLm.dml
deleted file mode 100644
index f12f9b8..0000000
--- a/src/test/scripts/functions/compress/workload/WorkloadAnalysisLm.dml
+++ /dev/null
@@ -1,31 +0,0 @@
-#-------------------------------------------------------------
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-#-------------------------------------------------------------
-
-X = read($1);
-y = read($2);
-
-print("")
-print("LM")
-
-X = scale(X=X, scale=TRUE, center=TRUE);
-B = lm(X=X, y=y, verbose=TRUE);
-
-print(sum(B))