You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by mb...@apache.org on 2016/07/17 03:35:24 UTC

[2/3] incubator-systemml git commit: [SYSTEMML-811] Compiler integration compressed linear algebra

[SYSTEMML-811] Compiler integration compressed linear algebra

Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/cbc4509b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/cbc4509b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/cbc4509b

Branch: refs/heads/master
Commit: cbc4509ba48b3843b10dbc532649c42aa1e302d8
Parents: 616793d
Author: Matthias Boehm <mb...@us.ibm.com>
Authored: Sat Jul 16 20:10:00 2016 -0700
Committer: Matthias Boehm <mb...@us.ibm.com>
Committed: Sat Jul 16 20:10:00 2016 -0700

----------------------------------------------------------------------
 .../java/org/apache/sysml/conf/DMLConfig.java   |  2 +
 src/main/java/org/apache/sysml/hops/Hop.java    | 64 +++++++++++++-
 .../sysml/hops/rewrite/ProgramRewriter.java     |  1 +
 .../hops/rewrite/RewriteCompressedReblock.java  | 87 ++++++++++++++++++++
 .../java/org/apache/sysml/lops/Compression.java | 81 ++++++++++++++++++
 5 files changed, 231 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/cbc4509b/src/main/java/org/apache/sysml/conf/DMLConfig.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/conf/DMLConfig.java b/src/main/java/org/apache/sysml/conf/DMLConfig.java
index 15ec73e..b87b476 100644
--- a/src/main/java/org/apache/sysml/conf/DMLConfig.java
+++ b/src/main/java/org/apache/sysml/conf/DMLConfig.java
@@ -70,6 +70,7 @@ public class DMLConfig
 	public static final String YARN_APPQUEUE    	= "dml.yarn.app.queue"; 
 	public static final String CP_PARALLEL_MATRIXMULT = "cp.parallel.matrixmult";
 	public static final String CP_PARALLEL_TEXTIO   = "cp.parallel.textio";
+	public static final String COMPRESSED_LINALG    = "compressed.linalg";
 
 	// supported prefixes for custom map/reduce configurations
 	public static final String PREFIX_MAPRED = "mapred";
@@ -100,6 +101,7 @@ public class DMLConfig
 		_defaultVals.put(YARN_APPQUEUE,    	     "default" );
 		_defaultVals.put(CP_PARALLEL_MATRIXMULT, "true" );
 		_defaultVals.put(CP_PARALLEL_TEXTIO,     "true" );
+		_defaultVals.put(COMPRESSED_LINALG,      "false" );
 	}
 	
 	public DMLConfig()

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/cbc4509b/src/main/java/org/apache/sysml/hops/Hop.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/hops/Hop.java b/src/main/java/org/apache/sysml/hops/Hop.java
index 144ca20..7d69940 100644
--- a/src/main/java/org/apache/sysml/hops/Hop.java
+++ b/src/main/java/org/apache/sysml/hops/Hop.java
@@ -29,6 +29,7 @@ import org.apache.sysml.api.DMLScript.RUNTIME_PLATFORM;
 import org.apache.sysml.conf.ConfigurationManager;
 import org.apache.sysml.lops.CSVReBlock;
 import org.apache.sysml.lops.Checkpoint;
+import org.apache.sysml.lops.Compression;
 import org.apache.sysml.lops.Data;
 import org.apache.sysml.lops.Lop;
 import org.apache.sysml.lops.LopsException;
@@ -108,6 +109,10 @@ public abstract class Hop
 	// (usually this happens on persistent reads dataops)
 	protected boolean _requiresReblock = false;
 	
+	// indicates if the output of this hop needs to be compressed
+	// (this happens on persistent reads after reblock but before checkpoint)
+	protected boolean _requiresCompression = false;
+	
 	// indicates if the output of this hop needs to be checkpointed (cached)
 	// (the default storage level for caching is not yet exposed here)
 	protected boolean _requiresCheckpoint = false;
@@ -234,11 +239,18 @@ public abstract class Hop
 		}
 	}
 	
-	public void setRequiresReblock(boolean flag)
-	{
+	public void setRequiresReblock(boolean flag) {
 		_requiresReblock = flag;
 	}
 	
+	public void setRequiresCompression(boolean flag) {
+		_requiresCompression = flag;
+	}
+	
+	public boolean requiresCompression() {
+		return _requiresCompression;
+	}
+	
 	public boolean hasMatrixInputWithDifferentBlocksizes()
 	{
 		for( Hop c : getInput() ) {
@@ -285,7 +297,10 @@ public abstract class Hop
 		//Step 1: construct reblock lop if required (output of hop)
 		constructAndSetReblockLopIfRequired();
 		
-		//Step 2: construct checkpoint lop if required (output of hop or reblock)
+		//Step 2: construct compression lop if required
+		constructAndSetCompressionLopIfRequired();
+		
+		//Step 3: construct checkpoint lop if required (output of hop or reblock)
 		constructAndSetCheckpointLopIfRequired();
 	}
 	
@@ -397,8 +412,49 @@ public abstract class Hop
 			catch( LopsException ex ) {
 				throw new HopsException(ex);
 			}
+		}	
+	}
+	
+	/**
+	 * 
+	 * @throws HopsException
+	 */
+	private void constructAndSetCompressionLopIfRequired() 
+		throws HopsException
+	{
+		//determine execution type
+		ExecType et = ExecType.CP;
+		if( OptimizerUtils.isSparkExecutionMode() 
+			&& getDataType()!=DataType.SCALAR )
+		{
+			//conditional checkpoint based on memory estimate in order to avoid unnecessary 
+			//persist and unpersist calls (4x the memory budget is conservative)
+			if(    OptimizerUtils.isHybridExecutionMode() 
+				&& 2*_outputMemEstimate < OptimizerUtils.getLocalMemBudget()
+				|| _etypeForced == ExecType.CP )
+			{
+				et = ExecType.CP;
+			}
+			else //default case
+			{
+				et = ExecType.SPARK;
+			}
+		}
+
+		//add reblock lop to output if required
+		if( _requiresCompression )
+		{
+			try
+			{
+				Lop compress = new Compression(getLops(), getDataType(), getValueType(), et);				
+				setOutputDimensions( compress );
+				setLineNumbers( compress );
+				setLops( compress );
+			}
+			catch( LopsException ex ) {
+				throw new HopsException(ex);
+			}
 		}
-		
 	}
 	
 	

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/cbc4509b/src/main/java/org/apache/sysml/hops/rewrite/ProgramRewriter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/hops/rewrite/ProgramRewriter.java b/src/main/java/org/apache/sysml/hops/rewrite/ProgramRewriter.java
index 49aa0db..8e645dc 100644
--- a/src/main/java/org/apache/sysml/hops/rewrite/ProgramRewriter.java
+++ b/src/main/java/org/apache/sysml/hops/rewrite/ProgramRewriter.java
@@ -92,6 +92,7 @@ public class ProgramRewriter
 			_dagRuleSet.add(     new RewriteTransientWriteParentHandling()       );
 			_dagRuleSet.add(     new RewriteRemoveReadAfterWrite()               ); //dependency: before blocksize
 			_dagRuleSet.add(     new RewriteBlockSizeAndReblock()                );
+			_dagRuleSet.add(     new RewriteCompressedReblock()                  );
 			_dagRuleSet.add(     new RewriteRemoveUnnecessaryCasts()             );		
 			if( OptimizerUtils.ALLOW_COMMON_SUBEXPRESSION_ELIMINATION )
 				_dagRuleSet.add( new RewriteCommonSubexpressionElimination()     );

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/cbc4509b/src/main/java/org/apache/sysml/hops/rewrite/RewriteCompressedReblock.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/hops/rewrite/RewriteCompressedReblock.java b/src/main/java/org/apache/sysml/hops/rewrite/RewriteCompressedReblock.java
new file mode 100644
index 0000000..3faf4bc
--- /dev/null
+++ b/src/main/java/org/apache/sysml/hops/rewrite/RewriteCompressedReblock.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.hops.rewrite;
+
+import java.util.ArrayList;
+
+import org.apache.sysml.conf.ConfigurationManager;
+import org.apache.sysml.conf.DMLConfig;
+import org.apache.sysml.hops.DataOp;
+import org.apache.sysml.hops.Hop;
+import org.apache.sysml.hops.HopsException;
+import org.apache.sysml.hops.Hop.DataOpTypes;
+
+/**
+ * Rule: CompressedReblock: If config compressed.linalg is enabled, we
+ * inject compression hooks after pread of matrices w/ both dims > 1.
+ */
+public class RewriteCompressedReblock extends HopRewriteRule
+{
+	
+	@Override
+	public ArrayList<Hop> rewriteHopDAGs(ArrayList<Hop> roots, ProgramRewriteStatus state)
+		throws HopsException
+	{
+		if( roots == null )
+			return null;
+		
+		boolean compress = ConfigurationManager.getDMLConfig()
+				.getBooleanValue(DMLConfig.COMPRESSED_LINALG);
+		
+		//perform compressed reblock rewrite
+		if( compress )
+			for( Hop h : roots ) 
+				rule_CompressedReblock(h);
+		
+		return roots;
+	}
+
+	@Override
+	public Hop rewriteHopDAG(Hop root, ProgramRewriteStatus state) 
+		throws HopsException
+	{
+		//do nothing (ppred will never occur in predicate)
+		return root;
+	}
+
+	/**
+	 * 
+	 * @param hop
+	 * @throws HopsException
+	 */
+	private void rule_CompressedReblock(Hop hop) 
+		throws HopsException 
+	{
+		// Go to the source(s) of the DAG
+		for (Hop hi : hop.getInput()) {
+			if (hi.getVisited() != Hop.VisitStatus.DONE)
+				rule_CompressedReblock(hi);
+		}
+
+		if( hop instanceof DataOp 
+			&& ((DataOp)hop).getDataOpType()==DataOpTypes.PERSISTENTREAD
+			&& hop.getDim1() > 1 && hop.getDim2() > 1 ) 
+		{
+			hop.setRequiresCompression(true);
+		}
+
+		hop.setVisited(Hop.VisitStatus.DONE);
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/cbc4509b/src/main/java/org/apache/sysml/lops/Compression.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/lops/Compression.java b/src/main/java/org/apache/sysml/lops/Compression.java
new file mode 100644
index 0000000..54dc445
--- /dev/null
+++ b/src/main/java/org/apache/sysml/lops/Compression.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.lops;
+
+import org.apache.sysml.lops.LopProperties.ExecLocation;
+import org.apache.sysml.lops.LopProperties.ExecType;
+import org.apache.sysml.lops.compile.JobType;
+import org.apache.sysml.parser.Expression.DataType;
+import org.apache.sysml.parser.Expression.ValueType;
+
+
+/**
+ * 
+ */
+public class Compression extends Lop 
+{
+	public static final String OPCODE = "compress"; 
+
+	/**
+	 * 
+	 * @param input
+	 * @param dt
+	 * @param vt
+	 * @param level
+	 * @param et
+	 * @throws LopsException
+	 */
+	public Compression(Lop input, DataType dt, ValueType vt, ExecType et) 
+		throws LopsException
+	{
+		super(Lop.Type.Checkpoint, dt, vt);		
+		this.addInput(input);
+		input.addOutput(this);
+		
+		boolean breaksAlignment = false;
+		boolean aligner = false;
+		boolean definesMRJob = false;
+		
+		lps.addCompatibility(JobType.INVALID);
+		lps.setProperties( inputs, et, ExecLocation.ControlProgram, breaksAlignment, aligner, definesMRJob );
+	}
+
+	
+	@Override
+	public String toString() {
+		return "Compress";
+	}
+	
+	@Override
+	public String getInstructions(String input1, String output) 
+		throws LopsException 
+	{
+		StringBuilder sb = new StringBuilder();
+		sb.append( getExecType() );
+		sb.append( Lop.OPERAND_DELIMITOR );
+		sb.append( OPCODE );
+		sb.append( OPERAND_DELIMITOR );
+		sb.append( getInputs().get(0).prepInputOperand(input1));
+		sb.append( OPERAND_DELIMITOR );
+		sb.append( prepOutputOperand(output));
+		
+		return sb.toString();
+	}
+}
\ No newline at end of file