You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by mb...@apache.org on 2016/07/17 03:35:24 UTC
[2/3] incubator-systemml git commit: [SYSTEMML-811] Compiler
integration compressed linear algebra
[SYSTEMML-811] Compiler integration compressed linear algebra
Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/cbc4509b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/cbc4509b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/cbc4509b
Branch: refs/heads/master
Commit: cbc4509ba48b3843b10dbc532649c42aa1e302d8
Parents: 616793d
Author: Matthias Boehm <mb...@us.ibm.com>
Authored: Sat Jul 16 20:10:00 2016 -0700
Committer: Matthias Boehm <mb...@us.ibm.com>
Committed: Sat Jul 16 20:10:00 2016 -0700
----------------------------------------------------------------------
.../java/org/apache/sysml/conf/DMLConfig.java | 2 +
src/main/java/org/apache/sysml/hops/Hop.java | 64 +++++++++++++-
.../sysml/hops/rewrite/ProgramRewriter.java | 1 +
.../hops/rewrite/RewriteCompressedReblock.java | 87 ++++++++++++++++++++
.../java/org/apache/sysml/lops/Compression.java | 81 ++++++++++++++++++
5 files changed, 231 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/cbc4509b/src/main/java/org/apache/sysml/conf/DMLConfig.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/conf/DMLConfig.java b/src/main/java/org/apache/sysml/conf/DMLConfig.java
index 15ec73e..b87b476 100644
--- a/src/main/java/org/apache/sysml/conf/DMLConfig.java
+++ b/src/main/java/org/apache/sysml/conf/DMLConfig.java
@@ -70,6 +70,7 @@ public class DMLConfig
public static final String YARN_APPQUEUE = "dml.yarn.app.queue";
public static final String CP_PARALLEL_MATRIXMULT = "cp.parallel.matrixmult";
public static final String CP_PARALLEL_TEXTIO = "cp.parallel.textio";
+ public static final String COMPRESSED_LINALG = "compressed.linalg";
// supported prefixes for custom map/reduce configurations
public static final String PREFIX_MAPRED = "mapred";
@@ -100,6 +101,7 @@ public class DMLConfig
_defaultVals.put(YARN_APPQUEUE, "default" );
_defaultVals.put(CP_PARALLEL_MATRIXMULT, "true" );
_defaultVals.put(CP_PARALLEL_TEXTIO, "true" );
+ _defaultVals.put(COMPRESSED_LINALG, "false" );
}
public DMLConfig()
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/cbc4509b/src/main/java/org/apache/sysml/hops/Hop.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/hops/Hop.java b/src/main/java/org/apache/sysml/hops/Hop.java
index 144ca20..7d69940 100644
--- a/src/main/java/org/apache/sysml/hops/Hop.java
+++ b/src/main/java/org/apache/sysml/hops/Hop.java
@@ -29,6 +29,7 @@ import org.apache.sysml.api.DMLScript.RUNTIME_PLATFORM;
import org.apache.sysml.conf.ConfigurationManager;
import org.apache.sysml.lops.CSVReBlock;
import org.apache.sysml.lops.Checkpoint;
+import org.apache.sysml.lops.Compression;
import org.apache.sysml.lops.Data;
import org.apache.sysml.lops.Lop;
import org.apache.sysml.lops.LopsException;
@@ -108,6 +109,10 @@ public abstract class Hop
// (usually this happens on persistent reads dataops)
protected boolean _requiresReblock = false;
+ // indicates if the output of this hop needs to be compressed
+ // (this happens on persistent reads after reblock but before checkpoint)
+ protected boolean _requiresCompression = false;
+
// indicates if the output of this hop needs to be checkpointed (cached)
// (the default storage level for caching is not yet exposed here)
protected boolean _requiresCheckpoint = false;
@@ -234,11 +239,18 @@ public abstract class Hop
}
}
- public void setRequiresReblock(boolean flag)
- {
+ public void setRequiresReblock(boolean flag) {
_requiresReblock = flag;
}
+ public void setRequiresCompression(boolean flag) {
+ _requiresCompression = flag;
+ }
+
+ public boolean requiresCompression() {
+ return _requiresCompression;
+ }
+
public boolean hasMatrixInputWithDifferentBlocksizes()
{
for( Hop c : getInput() ) {
@@ -285,7 +297,10 @@ public abstract class Hop
//Step 1: construct reblock lop if required (output of hop)
constructAndSetReblockLopIfRequired();
- //Step 2: construct checkpoint lop if required (output of hop or reblock)
+ //Step 2: construct compression lop if required
+ constructAndSetCompressionLopIfRequired();
+
+ //Step 3: construct checkpoint lop if required (output of hop or reblock)
constructAndSetCheckpointLopIfRequired();
}
@@ -397,8 +412,49 @@ public abstract class Hop
catch( LopsException ex ) {
throw new HopsException(ex);
}
+ }
+ }
+
+ /**
+ *
+ * @throws HopsException
+ */
+ private void constructAndSetCompressionLopIfRequired()
+ throws HopsException
+ {
+ //determine execution type
+ ExecType et = ExecType.CP;
+ if( OptimizerUtils.isSparkExecutionMode()
+ && getDataType()!=DataType.SCALAR )
+ {
+ //conditional checkpoint based on memory estimate in order to avoid unnecessary
+ //persist and unpersist calls (4x the memory budget is conservative)
+ if( OptimizerUtils.isHybridExecutionMode()
+ && 2*_outputMemEstimate < OptimizerUtils.getLocalMemBudget()
+ || _etypeForced == ExecType.CP )
+ {
+ et = ExecType.CP;
+ }
+ else //default case
+ {
+ et = ExecType.SPARK;
+ }
+ }
+
+ //add reblock lop to output if required
+ if( _requiresCompression )
+ {
+ try
+ {
+ Lop compress = new Compression(getLops(), getDataType(), getValueType(), et);
+ setOutputDimensions( compress );
+ setLineNumbers( compress );
+ setLops( compress );
+ }
+ catch( LopsException ex ) {
+ throw new HopsException(ex);
+ }
}
-
}
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/cbc4509b/src/main/java/org/apache/sysml/hops/rewrite/ProgramRewriter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/hops/rewrite/ProgramRewriter.java b/src/main/java/org/apache/sysml/hops/rewrite/ProgramRewriter.java
index 49aa0db..8e645dc 100644
--- a/src/main/java/org/apache/sysml/hops/rewrite/ProgramRewriter.java
+++ b/src/main/java/org/apache/sysml/hops/rewrite/ProgramRewriter.java
@@ -92,6 +92,7 @@ public class ProgramRewriter
_dagRuleSet.add( new RewriteTransientWriteParentHandling() );
_dagRuleSet.add( new RewriteRemoveReadAfterWrite() ); //dependency: before blocksize
_dagRuleSet.add( new RewriteBlockSizeAndReblock() );
+ _dagRuleSet.add( new RewriteCompressedReblock() );
_dagRuleSet.add( new RewriteRemoveUnnecessaryCasts() );
if( OptimizerUtils.ALLOW_COMMON_SUBEXPRESSION_ELIMINATION )
_dagRuleSet.add( new RewriteCommonSubexpressionElimination() );
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/cbc4509b/src/main/java/org/apache/sysml/hops/rewrite/RewriteCompressedReblock.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/hops/rewrite/RewriteCompressedReblock.java b/src/main/java/org/apache/sysml/hops/rewrite/RewriteCompressedReblock.java
new file mode 100644
index 0000000..3faf4bc
--- /dev/null
+++ b/src/main/java/org/apache/sysml/hops/rewrite/RewriteCompressedReblock.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.hops.rewrite;
+
+import java.util.ArrayList;
+
+import org.apache.sysml.conf.ConfigurationManager;
+import org.apache.sysml.conf.DMLConfig;
+import org.apache.sysml.hops.DataOp;
+import org.apache.sysml.hops.Hop;
+import org.apache.sysml.hops.HopsException;
+import org.apache.sysml.hops.Hop.DataOpTypes;
+
+/**
+ * Rule: CompressedReblock: If config compressed.linalg is enabled, we
+ * inject compression hooks after pread of matrices w/ both dims > 1.
+ */
+public class RewriteCompressedReblock extends HopRewriteRule
+{
+
+ @Override
+ public ArrayList<Hop> rewriteHopDAGs(ArrayList<Hop> roots, ProgramRewriteStatus state)
+ throws HopsException
+ {
+ if( roots == null )
+ return null;
+
+ boolean compress = ConfigurationManager.getDMLConfig()
+ .getBooleanValue(DMLConfig.COMPRESSED_LINALG);
+
+ //perform compressed reblock rewrite
+ if( compress )
+ for( Hop h : roots )
+ rule_CompressedReblock(h);
+
+ return roots;
+ }
+
+ @Override
+ public Hop rewriteHopDAG(Hop root, ProgramRewriteStatus state)
+ throws HopsException
+ {
+ //do nothing (ppred will never occur in predicate)
+ return root;
+ }
+
+ /**
+ *
+ * @param hop
+ * @throws HopsException
+ */
+ private void rule_CompressedReblock(Hop hop)
+ throws HopsException
+ {
+ // Go to the source(s) of the DAG
+ for (Hop hi : hop.getInput()) {
+ if (hi.getVisited() != Hop.VisitStatus.DONE)
+ rule_CompressedReblock(hi);
+ }
+
+ if( hop instanceof DataOp
+ && ((DataOp)hop).getDataOpType()==DataOpTypes.PERSISTENTREAD
+ && hop.getDim1() > 1 && hop.getDim2() > 1 )
+ {
+ hop.setRequiresCompression(true);
+ }
+
+ hop.setVisited(Hop.VisitStatus.DONE);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/cbc4509b/src/main/java/org/apache/sysml/lops/Compression.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/lops/Compression.java b/src/main/java/org/apache/sysml/lops/Compression.java
new file mode 100644
index 0000000..54dc445
--- /dev/null
+++ b/src/main/java/org/apache/sysml/lops/Compression.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.lops;
+
+import org.apache.sysml.lops.LopProperties.ExecLocation;
+import org.apache.sysml.lops.LopProperties.ExecType;
+import org.apache.sysml.lops.compile.JobType;
+import org.apache.sysml.parser.Expression.DataType;
+import org.apache.sysml.parser.Expression.ValueType;
+
+
+/**
+ *
+ */
+public class Compression extends Lop
+{
+ public static final String OPCODE = "compress";
+
+ /**
+ *
+ * @param input
+ * @param dt
+ * @param vt
+ * @param level
+ * @param et
+ * @throws LopsException
+ */
+ public Compression(Lop input, DataType dt, ValueType vt, ExecType et)
+ throws LopsException
+ {
+ super(Lop.Type.Checkpoint, dt, vt);
+ this.addInput(input);
+ input.addOutput(this);
+
+ boolean breaksAlignment = false;
+ boolean aligner = false;
+ boolean definesMRJob = false;
+
+ lps.addCompatibility(JobType.INVALID);
+ lps.setProperties( inputs, et, ExecLocation.ControlProgram, breaksAlignment, aligner, definesMRJob );
+ }
+
+
+ @Override
+ public String toString() {
+ return "Compress";
+ }
+
+ @Override
+ public String getInstructions(String input1, String output)
+ throws LopsException
+ {
+ StringBuilder sb = new StringBuilder();
+ sb.append( getExecType() );
+ sb.append( Lop.OPERAND_DELIMITOR );
+ sb.append( OPCODE );
+ sb.append( OPERAND_DELIMITOR );
+ sb.append( getInputs().get(0).prepInputOperand(input1));
+ sb.append( OPERAND_DELIMITOR );
+ sb.append( prepOutputOperand(output));
+
+ return sb.toString();
+ }
+}
\ No newline at end of file