You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by mb...@apache.org on 2016/09/27 18:03:34 UTC

[1/3] incubator-systemml git commit: [SYSTEMML-959] Performance spark grouped aggregate (single block agg)

Repository: incubator-systemml
Updated Branches:
  refs/heads/master 79884bd28 -> 055e850fc


[SYSTEMML-959] Performance spark grouped aggregate (single block agg)

Similar to spark unary aggregates and various other instructions, this
patch introduces runtime-level alternatives for single-block and
multi-block aggregation. For example, naive bayes over a 10Mx1k, dense
input (80GB) improved from 64 to 53s. 

Furthermore, this also includes a correctness fix of deep copying matrix
blocks on the creation of combiner blocks which is a pre-condition for
subsequent update in-place if the blocks are consumed by multiple
operations.  

Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/3d7d348f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/3d7d348f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/3d7d348f

Branch: refs/heads/master
Commit: 3d7d348ff8bdca28454bf51186d94933a5c9555a
Parents: 79884bd
Author: Matthias Boehm <mb...@us.ibm.com>
Authored: Mon Sep 26 09:13:12 2016 -0700
Committer: Matthias Boehm <mb...@us.ibm.com>
Committed: Tue Sep 27 10:59:36 2016 -0700

----------------------------------------------------------------------
 .../ParameterizedBuiltinSPInstruction.java      | 24 +++++++++++++++-----
 .../spark/data/CorrMatrixBlock.java             |  5 ++++
 .../spark/utils/RDDAggregateUtils.java          | 10 +++++---
 3 files changed, 30 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/3d7d348f/src/main/java/org/apache/sysml/runtime/instructions/spark/ParameterizedBuiltinSPInstruction.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/ParameterizedBuiltinSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/ParameterizedBuiltinSPInstruction.java
index 645a905..df05702 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/ParameterizedBuiltinSPInstruction.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/ParameterizedBuiltinSPInstruction.java
@@ -210,13 +210,25 @@ public class ParameterizedBuiltinSPInstruction  extends ComputationSPInstruction
 			JavaPairRDD<MatrixIndexes, MatrixBlock> out = 
 					target.flatMapToPair(new RDDMapGroupedAggFunction(groups, _optr, 
 							ngroups, mc1.getRowsPerBlock(), mc1.getColsPerBlock()));
-			out = RDDAggregateUtils.sumByKeyStable(out);
 			
-			//updated characteristics and handle outputs
-			mcOut.set(ngroups, mc1.getCols(), mc1.getRowsPerBlock(), mc1.getColsPerBlock(), -1);
-			sec.setRDDHandleForVariable(output.getName(), out);			
-			sec.addLineageRDD( output.getName(), targetVar );
-			sec.addLineageBroadcast( output.getName(), groupsVar );	
+			//single-block aggregation
+			if( ngroups <= mc1.getRowsPerBlock() && mc1.getCols() <= mc1.getColsPerBlock() ) {
+				MatrixBlock out2 = RDDAggregateUtils.sumStable(out);
+				
+				//put output block into symbol table (no lineage because single block)
+				//this also includes implicit maintenance of matrix characteristics
+				sec.setMatrixOutput(output.getName(), out2);
+			}
+			//multi-block aggregation
+			else {
+				out = RDDAggregateUtils.sumByKeyStable(out);
+				
+				//updated characteristics and handle outputs
+				mcOut.set(ngroups, mc1.getCols(), mc1.getRowsPerBlock(), mc1.getColsPerBlock(), -1);
+				sec.setRDDHandleForVariable(output.getName(), out);			
+				sec.addLineageRDD( output.getName(), targetVar );
+				sec.addLineageBroadcast( output.getName(), groupsVar );	
+			}
 		}
 		else if ( opcode.equalsIgnoreCase("groupedagg") ) 
 		{	

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/3d7d348f/src/main/java/org/apache/sysml/runtime/instructions/spark/data/CorrMatrixBlock.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/data/CorrMatrixBlock.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/data/CorrMatrixBlock.java
index ae5494a..055f8a7 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/data/CorrMatrixBlock.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/data/CorrMatrixBlock.java
@@ -64,6 +64,11 @@ public class CorrMatrixBlock implements Externalizable
 		return _corr;
 	}
 	
+	public void set(MatrixBlock value, MatrixBlock corr) {
+		_value = value;
+		_corr = corr;
+	}
+	
 	/**
 	 * Redirects the default java serialization via externalizable to our default 
 	 * hadoop writable serialization for efficient deserialization. 

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/3d7d348f/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDAggregateUtils.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDAggregateUtils.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDAggregateUtils.java
index dcdbd36..abfcfbf 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDAggregateUtils.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDAggregateUtils.java
@@ -225,7 +225,9 @@ public class RDDAggregateUtils
 		public CorrMatrixBlock call(MatrixBlock arg0) 
 			throws Exception 
 		{
-			return new CorrMatrixBlock(arg0);
+			//deep copy to allow update in-place
+			return new CorrMatrixBlock(
+					new MatrixBlock(arg0));
 		}	
 	}
 	
@@ -254,7 +256,8 @@ public class RDDAggregateUtils
 			//aggregate other input and maintain corrections 
 			//(existing value and corr are used in place)
 			OperationsOnMatrixValues.incrementalAggregation(value, corr, arg1, _op, false);
-			return new CorrMatrixBlock(value, corr);
+			arg0.set(value, corr);
+			return arg0;
 		}	
 	}
 	
@@ -285,7 +288,8 @@ public class RDDAggregateUtils
 			//aggregate other input and maintain corrections
 			//(existing value and corr are used in place)
 			OperationsOnMatrixValues.incrementalAggregation(value1, corr, value2, _op, false);
-			return new CorrMatrixBlock(value1, corr);
+			arg0.set(value1, corr);
+			return arg0;
 		}	
 	}
 


[2/3] incubator-systemml git commit: [SYSTEMML-960] Support for frames as function arguments, tests

Posted by mb...@apache.org.
[SYSTEMML-960] Support for frames as function arguments, tests

So far, the input/outputs of dml-bodied functions did not allow for
variables of type frame (which resulted in parser issues). This patch
fixes this by generalizing the parser. 

In addition, this also includes two minor (partially related) fixes:

* Explain call dag with functions of internal namespace (multi-return
builtin functions), like transformencode as used in this new testcase.

* Unnecessary warning of max result size in local environment (with
USE_LOCAL_SPARK_CONIFG enabled).   


Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/2f7a67d3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/2f7a67d3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/2f7a67d3

Branch: refs/heads/master
Commit: 2f7a67d38c9403285a2c5d17b67b4cd4aa76bf39
Parents: 3d7d348
Author: Matthias Boehm <mb...@us.ibm.com>
Authored: Mon Sep 26 18:38:54 2016 -0700
Committer: Matthias Boehm <mb...@us.ibm.com>
Committed: Tue Sep 27 10:59:41 2016 -0700

----------------------------------------------------------------------
 .../parser/common/CommonSyntacticValidator.java |  15 +++
 .../sysml/parser/dml/DmlSyntacticValidator.java |  25 ++--
 .../parser/pydml/PydmlSyntacticValidator.java   |  44 +++----
 .../context/SparkExecutionContext.java          |   2 +-
 .../java/org/apache/sysml/utils/Explain.java    |   2 +-
 .../functions/frame/FrameFunctionTest.java      | 129 +++++++++++++++++++
 .../scripts/functions/frame/FrameFunction.dml   |  42 ++++++
 7 files changed, 214 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/2f7a67d3/src/main/java/org/apache/sysml/parser/common/CommonSyntacticValidator.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/parser/common/CommonSyntacticValidator.java b/src/main/java/org/apache/sysml/parser/common/CommonSyntacticValidator.java
index 995937f..83f7823 100644
--- a/src/main/java/org/apache/sysml/parser/common/CommonSyntacticValidator.java
+++ b/src/main/java/org/apache/sysml/parser/common/CommonSyntacticValidator.java
@@ -740,4 +740,19 @@ public abstract class CommonSyntacticValidator {
 	// End of Helper Functions for exit*FunctionCall*AssignmentStatement
 	// -----------------------------------------------------------------
 
+	/**
+	 * Indicates if the given data type string is a valid data type. 
+	 * 
+	 * @param datatype
+	 * @param start
+	 */
+	protected void checkValidDataType(String datatype, Token start) {
+		boolean validMatrixType = 
+				datatype.equals("matrix") || datatype.equals("Matrix") || 
+				datatype.equals("frame") || datatype.equals("Frame") ||
+				datatype.equals("scalar") || datatype.equals("Scalar");
+		if(!validMatrixType	) {
+			notifyErrorListeners("incorrect datatype (expected matrix, frame or scalar)", start);
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/2f7a67d3/src/main/java/org/apache/sysml/parser/dml/DmlSyntacticValidator.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/parser/dml/DmlSyntacticValidator.java b/src/main/java/org/apache/sysml/parser/dml/DmlSyntacticValidator.java
index 07e0705..e658a7e 100644
--- a/src/main/java/org/apache/sysml/parser/dml/DmlSyntacticValidator.java
+++ b/src/main/java/org/apache/sysml/parser/dml/DmlSyntacticValidator.java
@@ -720,18 +720,15 @@ public class DmlSyntacticValidator extends CommonSyntacticValidator implements D
 				dataType = paramCtx.paramType.dataType().getText();
 			}
 
-			if(dataType.equals("matrix") || dataType.equals("Matrix")) {
-				// matrix
+			
+			//check and assign data type
+			checkValidDataType(dataType, paramCtx.start);
+			if( dataType.equalsIgnoreCase("matrix") )
 				dataId.setDataType(DataType.MATRIX);
-			}
-			else if(dataType.equals("scalar") || dataType.equals("Scalar")) {
-				// scalar
+			else if( dataType.equalsIgnoreCase("frame") )
+				dataId.setDataType(DataType.FRAME);
+			else if( dataType.equalsIgnoreCase("scalar") )
 				dataId.setDataType(DataType.SCALAR);
-			}
-			else {
-				notifyErrorListeners("invalid datatype " + dataType, paramCtx.start);
-				return null;
-			}
 
 			valueType = paramCtx.paramType.valueType().getText();
 			if(valueType.equals("int") || valueType.equals("integer")
@@ -931,13 +928,7 @@ public class DmlSyntacticValidator extends CommonSyntacticValidator implements D
 
 	@Override
 	public void exitMatrixDataTypeCheck(MatrixDataTypeCheckContext ctx) {
-		boolean validMatrixType = ctx.ID().getText().equals("matrix")
-								|| ctx.ID().getText().equals("Matrix")
-								|| ctx.ID().getText().equals("Scalar")
-								|| ctx.ID().getText().equals("scalar");
-		if(!validMatrixType	) {
-			notifyErrorListeners("incorrect datatype (expected matrix or scalar)", ctx.start);
-		}
+		checkValidDataType(ctx.ID().getText(), ctx.start);
 	}
 
 

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/2f7a67d3/src/main/java/org/apache/sysml/parser/pydml/PydmlSyntacticValidator.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/parser/pydml/PydmlSyntacticValidator.java b/src/main/java/org/apache/sysml/parser/pydml/PydmlSyntacticValidator.java
index c605308..7068589 100644
--- a/src/main/java/org/apache/sysml/parser/pydml/PydmlSyntacticValidator.java
+++ b/src/main/java/org/apache/sysml/parser/pydml/PydmlSyntacticValidator.java
@@ -1391,18 +1391,14 @@ public class PydmlSyntacticValidator extends CommonSyntacticValidator implements
 				dataType = paramCtx.paramType.dataType().getText();
 			}
 
-			if(dataType.equals("matrix")) {
-				// matrix
+			//check and assign data type
+			checkValidDataType(dataType, paramCtx.start);
+			if( dataType.equals("matrix") )
 				dataId.setDataType(DataType.MATRIX);
-			}
-			else if(dataType.equals("scalar")) {
-				// scalar
+			else if( dataType.equals("frame") )
+				dataId.setDataType(DataType.FRAME);
+			else if( dataType.equals("scalar") )
 				dataId.setDataType(DataType.SCALAR);
-			}
-			else {
-				notifyErrorListeners("invalid datatype " + dataType, paramCtx.start);
-				return null;
-			}
 
 			valueType = paramCtx.paramType.valueType().getText();
 			if(valueType.equals("int")) {
@@ -1574,24 +1570,20 @@ public class PydmlSyntacticValidator extends CommonSyntacticValidator implements
 
 	@Override
 	public void exitMatrixDataTypeCheck(MatrixDataTypeCheckContext ctx) {
-		if(		ctx.ID().getText().equals("matrix")
-				|| ctx.ID().getText().equals("scalar")
-				) {
-			// Do nothing
-		}
-		else if(ctx.ID().getText().equals("Matrix"))
+		checkValidDataType(ctx.ID().getText(), ctx.start);
+		
+		//additional error handling (pydml-specific)
+		String datatype = ctx.ID().getText();
+		if(datatype.equals("Matrix"))
 			notifyErrorListeners("incorrect datatype (Hint: use matrix instead of Matrix)", ctx.start);
-		else if(ctx.ID().getText().equals("Scalar"))
+		else if(datatype.equals("Frame"))
+			notifyErrorListeners("incorrect datatype (Hint: use frame instead of Frame)", ctx.start);
+		else if(datatype.equals("Scalar"))
 			notifyErrorListeners("incorrect datatype (Hint: use scalar instead of Scalar)", ctx.start);
-		else if(		ctx.ID().getText().equals("int")
-				|| ctx.ID().getText().equals("str")
-				|| ctx.ID().getText().equals("bool")
-				|| ctx.ID().getText().equals("float")
-				) {
-			notifyErrorListeners("expected datatype but found a valuetype (Hint: use matrix or scalar instead of " + ctx.ID().getText() + ")", ctx.start);
-		}
-		else {
-			notifyErrorListeners("incorrect datatype (expected matrix or scalar)", ctx.start);
+		else if( datatype.equals("int") || datatype.equals("str")
+			|| datatype.equals("bool") || datatype.equals("float") ) {
+			notifyErrorListeners("expected datatype but found a valuetype "
+					+ "(Hint: use matrix, frame or scalar instead of " + datatype + ")", ctx.start);
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/2f7a67d3/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java b/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
index 964d2d6..7103b0d 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
@@ -236,7 +236,7 @@ public class SparkExecutionContext extends ExecutionContext
 		// Set warning if spark.driver.maxResultSize is not set. It needs to be set before starting Spark Context for CP collect 
 		String strDriverMaxResSize = _spctx.getConf().get("spark.driver.maxResultSize", "1g");
 		long driverMaxResSize = UtilFunctions.parseMemorySize(strDriverMaxResSize); 
-		if (driverMaxResSize != 0 && driverMaxResSize<OptimizerUtils.getLocalMemBudget())
+		if (driverMaxResSize != 0 && driverMaxResSize<OptimizerUtils.getLocalMemBudget() && !DMLScript.USE_LOCAL_SPARK_CONFIG)
 			LOG.warn("Configuration parameter spark.driver.maxResultSize set to " + UtilFunctions.formatMemorySize(driverMaxResSize) + "."
 					+ " You can set it through Spark default configuration setting either to 0 (unlimited) or to available memory budget of size " 
 					+ UtilFunctions.formatMemorySize((long)OptimizerUtils.getLocalMemBudget()) + ".");

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/2f7a67d3/src/main/java/org/apache/sysml/utils/Explain.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/utils/Explain.java b/src/main/java/org/apache/sysml/utils/Explain.java
index 8ee2822..12c086e 100644
--- a/src/main/java/org/apache/sysml/utils/Explain.java
+++ b/src/main/java/org/apache/sysml/utils/Explain.java
@@ -1146,7 +1146,7 @@ public class Explain
 						FunctionOp fop = (FunctionOp) h;
 						String fkey = DMLProgram.constructFunctionKey(fop.getFunctionNamespace(), fop.getFunctionName());
 						//prevent redundant call edges
-						if( !lfset.contains(fkey) )
+						if( !lfset.contains(fkey) && !fop.getFunctionNamespace().equals(DMLProgram.INTERNAL_NAMESPACE) )
 						{
 							//recursively explain function call dag
 							if( !fstack.contains(fkey) ) {

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/2f7a67d3/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameFunctionTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameFunctionTest.java b/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameFunctionTest.java
new file mode 100644
index 0000000..b506444
--- /dev/null
+++ b/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameFunctionTest.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.test.integration.functions.frame;
+
+import org.apache.sysml.api.DMLScript;
+import org.apache.sysml.api.DMLScript.RUNTIME_PLATFORM;
+import org.apache.sysml.hops.OptimizerUtils;
+import org.apache.sysml.lops.LopProperties.ExecType;
+import org.apache.sysml.runtime.io.FrameReaderFactory;
+import org.apache.sysml.runtime.io.FrameWriterFactory;
+import org.apache.sysml.runtime.matrix.data.FrameBlock;
+import org.apache.sysml.runtime.matrix.data.InputInfo;
+import org.apache.sysml.runtime.matrix.data.OutputInfo;
+import org.apache.sysml.runtime.util.DataConverter;
+import org.apache.sysml.test.integration.AutomatedTestBase;
+import org.apache.sysml.test.integration.TestConfiguration;
+import org.apache.sysml.test.utils.TestUtils;
+import org.junit.Test;
+
+public class FrameFunctionTest extends AutomatedTestBase
+{
+	private final static String TEST_DIR = "functions/frame/";
+	private final static String TEST_NAME = "FrameFunction";
+	private final static String TEST_CLASS_DIR = TEST_DIR + FrameFunctionTest.class.getSimpleName() + "/";
+	
+	private final static int rows = 1382;
+	private final static int cols = 5;
+	
+	@Override
+	public void setUp() {
+		TestUtils.clearAssertionInformation();
+		addTestConfiguration(TEST_NAME, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME, new String[] {"F2"}));
+	}
+
+	@Test
+	public void testFrameFunctionIPACP()  {
+		runFrameFunctionTest(ExecType.CP, true);
+	}
+	
+	@Test
+	public void testFrameFunctionIPASpark()  {
+		runFrameFunctionTest(ExecType.SPARK, true);
+	}
+	
+	@Test
+	public void testFrameFunctionNoIPACP()  {
+		runFrameFunctionTest(ExecType.CP, false);
+	}
+	
+	@Test
+	public void testFrameFunctionNoIPASpark()  {
+		runFrameFunctionTest(ExecType.SPARK, false);
+	}
+
+	/**
+	 * 
+	 * @param et
+	 */
+	private void runFrameFunctionTest( ExecType et, boolean IPA )
+	{
+		//rtplatform for MR
+		RUNTIME_PLATFORM platformOld = rtplatform;
+		switch( et ){
+			case SPARK: rtplatform = RUNTIME_PLATFORM.SPARK; break;
+			default: rtplatform = RUNTIME_PLATFORM.HYBRID_SPARK; break;
+		}
+	
+		boolean sparkConfigOld = DMLScript.USE_LOCAL_SPARK_CONFIG;
+		if( rtplatform == RUNTIME_PLATFORM.SPARK 
+			|| rtplatform == RUNTIME_PLATFORM.HYBRID_SPARK )
+			DMLScript.USE_LOCAL_SPARK_CONFIG = true;
+	
+		boolean oldIPA = OptimizerUtils.ALLOW_INTER_PROCEDURAL_ANALYSIS;
+		OptimizerUtils.ALLOW_INTER_PROCEDURAL_ANALYSIS = IPA;
+		
+		try
+		{
+			//setup testcase
+			getAndLoadTestConfiguration(TEST_NAME);
+			String HOME = SCRIPT_DIR + TEST_DIR;
+			fullDMLScriptName = HOME + TEST_NAME + ".dml";
+			programArgs = new String[]{"-explain", "-args", 
+					input("F"), output("F2")};
+			
+			//generate input data and write as frame
+			double[][] A = getRandomMatrix(rows, cols, -10, 10, 0.9, 8362);
+			FrameBlock fA = DataConverter.convertToFrameBlock(
+				DataConverter.convertToMatrixBlock(A));
+			FrameWriterFactory.createFrameWriter(OutputInfo.CSVOutputInfo)
+				.writeFrameToHDFS(fA, input("F"), rows, cols);
+			
+			//run test
+			runTest(true, false, null, -1); 
+			
+			//read input/output and compare
+			FrameBlock fB = FrameReaderFactory
+					.createFrameReader(InputInfo.CSVInputInfo)
+					.readFrameFromHDFS(output("F2"), rows, cols);
+			String[][] R1 = DataConverter.convertToStringFrame(fA);
+			String[][] R2 = DataConverter.convertToStringFrame(fB);
+			TestUtils.compareFrames(R1, R2, R1.length, R1[0].length);			
+		}
+		catch(Exception ex) {
+			throw new RuntimeException(ex);
+		}
+		finally {
+			rtplatform = platformOld;
+			DMLScript.USE_LOCAL_SPARK_CONFIG = sparkConfigOld;
+			OptimizerUtils.ALLOW_INTER_PROCEDURAL_ANALYSIS = oldIPA;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/2f7a67d3/src/test/scripts/functions/frame/FrameFunction.dml
----------------------------------------------------------------------
diff --git a/src/test/scripts/functions/frame/FrameFunction.dml b/src/test/scripts/functions/frame/FrameFunction.dml
new file mode 100644
index 0000000..591e63b
--- /dev/null
+++ b/src/test/scripts/functions/frame/FrameFunction.dml
@@ -0,0 +1,42 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+foo = function(Frame[String] F, String jspec) 
+  return (Matrix[Double] RX, Frame[String] RM) 
+{
+  #prevent function inlining
+  if( 1==1 ){}
+
+  [RX, RM] = transformencode(target=F, spec=jspec);
+}
+
+F = read($1, data_type="frame", format="csv");
+
+#make size unknown for recompile
+if( sum(rand(rows=10,cols=1))<1 ) {
+   F = rbind(F, F);
+}
+
+jspec = "{\"ids\": true,\"recode\": [1,2,3,4,5]}";
+[X, M] = foo(F, jspec);
+
+F2 = transformdecode(target=X, spec=jspec, meta=M);
+write(F2, $2, format="csv");


[3/3] incubator-systemml git commit: [SYSTEMML-831] Robustness spark mapmm (automatic input repartitioning)

Posted by mb...@apache.org.
[SYSTEMML-831] Robustness spark mapmm (automatic input repartitioning) 

This patch adds a robustness features to our spark mapmm instruction,
which automatically repartitions the input rdd for outer-product like
matrix multiplication in order to (1) increase the degree of parallelism
and (2) reduce the size of output partitions (to avoid 2GB limitations
of Spark). See https://issues.apache.org/jira/browse/SYSTEMML-831 for
details.

Furthermore, this also includes a minor fix for frame functions tests
(see SYSTEMML-960) with regard to single jvm tests. 

Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/055e850f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/055e850f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/055e850f

Branch: refs/heads/master
Commit: 055e850fcc8475e0690743c03708effcdc8f7ed0
Parents: 2f7a67d
Author: Matthias Boehm <mb...@us.ibm.com>
Authored: Mon Sep 26 22:56:23 2016 -0700
Committer: Matthias Boehm <mb...@us.ibm.com>
Committed: Tue Sep 27 10:59:46 2016 -0700

----------------------------------------------------------------------
 .../instructions/spark/MapmmSPInstruction.java  | 53 +++++++++++++++++++-
 .../functions/frame/FrameFunctionTest.java      |  3 ++
 2 files changed, 55 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/055e850f/src/main/java/org/apache/sysml/runtime/instructions/spark/MapmmSPInstruction.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/MapmmSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/MapmmSPInstruction.java
index 5b3e9ad..9b5f3de 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/MapmmSPInstruction.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/MapmmSPInstruction.java
@@ -30,11 +30,13 @@ import org.apache.spark.api.java.function.PairFunction;
 import scala.Tuple2;
 
 import org.apache.sysml.hops.AggBinaryOp.SparkAggType;
+import org.apache.sysml.hops.OptimizerUtils;
 import org.apache.sysml.lops.MapMult;
 import org.apache.sysml.lops.MapMult.CacheType;
 import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.controlprogram.context.ExecutionContext;
 import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext;
+import org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
 import org.apache.sysml.runtime.functionobjects.Multiply;
 import org.apache.sysml.runtime.functionobjects.Plus;
 import org.apache.sysml.runtime.instructions.InstructionUtils;
@@ -125,8 +127,11 @@ public class MapmmSPInstruction extends BinarySPInstruction
 		
 		//execute mapmult instruction
 		JavaPairRDD<MatrixIndexes,MatrixBlock> out = null;
-		if( requiresFlatMapFunction(_type, mcBc) ) 
+		if( requiresFlatMapFunction(_type, mcBc) ) {
+			if( requiresRepartitioning(_type, mcRdd, mcBc, in1.partitions().size()) )
+				in1 = in1.repartition(getNumRepartitioning(_type, mcRdd, mcBc, in1.partitions().size()));
 			out = in1.flatMapToPair( new RDDFlatMapMMFunction(_type, in2) );
+		}
 		else if( preservesPartitioning(mcRdd, _type) )
 			out = in1.mapPartitionsToPair(new RDDMapMMPartitionFunction(_type, in2), true);
 		else
@@ -175,6 +180,8 @@ public class MapmmSPInstruction extends BinarySPInstruction
 	}
 	
 	/**
+	 * Indicates if there is a need to apply a flatmap rdd operation because a single 
+	 * input block creates multiple output blocks.
 	 * 
 	 * @param type
 	 * @param mcBc
@@ -187,7 +194,51 @@ public class MapmmSPInstruction extends BinarySPInstruction
 	}
 	
 	/**
+	 * Indicates if there is a need to repartition the input RDD in order to increase the
+	 * degree of parallelism or reduce the output partition size (e.g., Spark still has a
+	 * 2GB limitation of partitions)
 	 * 
+	 * @param type
+	 * @param mcRdd
+	 * @param mcBc
+	 * @param numPartitions
+	 * @return
+	 */
+	private static boolean requiresRepartitioning( CacheType type, MatrixCharacteristics mcRdd, MatrixCharacteristics mcBc, int numPartitions ) {
+		//note: as repartitioning requires data shuffling, we try to be very conservative here
+		//approach: we repartition, if there is a "outer-product-like" mm (single block common dimension),
+		//the size of output partitions (assuming dense) exceeds a size of 1GB 
+		
+		boolean isLeft = (type == CacheType.LEFT);
+		boolean isOuter = isLeft ? 
+				(mcRdd.getRows() <= mcRdd.getRowsPerBlock()) :
+				(mcRdd.getCols() <= mcRdd.getColsPerBlock());
+		boolean isLargeOutput = (OptimizerUtils.estimatePartitionedSizeExactSparsity(isLeft?mcBc.getRows():mcRdd.getRows(),
+				isLeft?mcRdd.getCols():mcBc.getCols(), isLeft?mcBc.getRowsPerBlock():mcRdd.getRowsPerBlock(),
+				isLeft?mcRdd.getColsPerBlock():mcBc.getColsPerBlock(), 1.0) / numPartitions) > 1024*1024*1024; 
+		return isOuter && isLargeOutput && mcRdd.dimsKnown() && mcBc.dimsKnown();
+	}
+
+	/**
+	 * Computes the number of target partitions for repartitioning input rdds in case of 
+	 * outer-product-like mm. 
+	 * 
+	 * @param type
+	 * @param mcRdd
+	 * @param mcBc
+	 * @param numPartitions
+	 * @return
+	 */
+	private static int getNumRepartitioning( CacheType type, MatrixCharacteristics mcRdd, MatrixCharacteristics mcBc, int numPartitions ) {
+		boolean isLeft = (type == CacheType.LEFT);
+		long sizeOutput = (OptimizerUtils.estimatePartitionedSizeExactSparsity(isLeft?mcBc.getRows():mcRdd.getRows(),
+				isLeft?mcRdd.getCols():mcBc.getCols(), isLeft?mcBc.getRowsPerBlock():mcRdd.getRowsPerBlock(),
+				isLeft?mcRdd.getColsPerBlock():mcBc.getColsPerBlock(), 1.0)); 
+		long numParts = sizeOutput / InfrastructureAnalyzer.getHDFSBlockSize();
+		return (int)Math.min(numParts, (isLeft?mcRdd.getNumColBlocks():mcRdd.getNumRowBlocks()));
+	}
+	
+	/**
 	 * 
 	 */
 	private static class RDDMapMMFunction implements PairFunction<Tuple2<MatrixIndexes, MatrixBlock>, MatrixIndexes, MatrixBlock> 

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/055e850f/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameFunctionTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameFunctionTest.java b/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameFunctionTest.java
index b506444..af2e75f 100644
--- a/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameFunctionTest.java
+++ b/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameFunctionTest.java
@@ -88,7 +88,9 @@ public class FrameFunctionTest extends AutomatedTestBase
 			DMLScript.USE_LOCAL_SPARK_CONFIG = true;
 	
 		boolean oldIPA = OptimizerUtils.ALLOW_INTER_PROCEDURAL_ANALYSIS;
+		boolean csvReblockOld = OptimizerUtils.ALLOW_FRAME_CSV_REBLOCK;
 		OptimizerUtils.ALLOW_INTER_PROCEDURAL_ANALYSIS = IPA;
+		OptimizerUtils.ALLOW_FRAME_CSV_REBLOCK = true;
 		
 		try
 		{
@@ -124,6 +126,7 @@ public class FrameFunctionTest extends AutomatedTestBase
 			rtplatform = platformOld;
 			DMLScript.USE_LOCAL_SPARK_CONFIG = sparkConfigOld;
 			OptimizerUtils.ALLOW_INTER_PROCEDURAL_ANALYSIS = oldIPA;
+			OptimizerUtils.ALLOW_FRAME_CSV_REBLOCK = csvReblockOld;
 		}
 	}
 }