You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by lr...@apache.org on 2015/12/03 19:46:16 UTC
[45/78] [abbrv] [partial] incubator-systemml git commit: Move files
to new package folder structure
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/276d9257/src/main/java/com/ibm/bi/dml/hops/AggBinaryOp.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/ibm/bi/dml/hops/AggBinaryOp.java b/src/main/java/com/ibm/bi/dml/hops/AggBinaryOp.java
deleted file mode 100644
index 005744f..0000000
--- a/src/main/java/com/ibm/bi/dml/hops/AggBinaryOp.java
+++ /dev/null
@@ -1,1959 +0,0 @@
-/**
- * (C) Copyright IBM Corp. 2010, 2015
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package com.ibm.bi.dml.hops;
-
-import com.ibm.bi.dml.api.DMLScript;
-import com.ibm.bi.dml.api.DMLScript.RUNTIME_PLATFORM;
-import com.ibm.bi.dml.hops.rewrite.HopRewriteUtils;
-import com.ibm.bi.dml.lops.Aggregate;
-import com.ibm.bi.dml.lops.Binary;
-import com.ibm.bi.dml.lops.DataPartition;
-import com.ibm.bi.dml.lops.Group;
-import com.ibm.bi.dml.hops.Hop.MultiThreadedHop;
-import com.ibm.bi.dml.lops.Lop;
-import com.ibm.bi.dml.lops.LopProperties.ExecType;
-import com.ibm.bi.dml.lops.LopsException;
-import com.ibm.bi.dml.lops.MMCJ;
-import com.ibm.bi.dml.lops.MMRJ;
-import com.ibm.bi.dml.lops.MMTSJ;
-import com.ibm.bi.dml.lops.MMCJ.MMCJType;
-import com.ibm.bi.dml.lops.MMTSJ.MMTSJType;
-import com.ibm.bi.dml.lops.MMZip;
-import com.ibm.bi.dml.lops.MapMult;
-import com.ibm.bi.dml.lops.MapMultChain;
-import com.ibm.bi.dml.lops.MapMultChain.ChainType;
-import com.ibm.bi.dml.lops.PMMJ;
-import com.ibm.bi.dml.lops.PMapMult;
-import com.ibm.bi.dml.lops.PartialAggregate.CorrectionLocationType;
-import com.ibm.bi.dml.lops.Transform;
-import com.ibm.bi.dml.lops.Transform.OperationTypes;
-import com.ibm.bi.dml.parser.Expression.DataType;
-import com.ibm.bi.dml.parser.Expression.ValueType;
-import com.ibm.bi.dml.runtime.controlprogram.ParForProgramBlock.PDataPartitionFormat;
-import com.ibm.bi.dml.runtime.controlprogram.context.SparkExecutionContext;
-import com.ibm.bi.dml.runtime.matrix.MatrixCharacteristics;
-import com.ibm.bi.dml.runtime.matrix.data.MatrixBlock;
-import com.ibm.bi.dml.runtime.matrix.mapred.DistributedCacheInput;
-import com.ibm.bi.dml.runtime.matrix.mapred.MMCJMRReducerWithAggregator;
-
-
-/* Aggregate binary (cell operations): Sum (aij + bij)
- * Properties:
- * Inner Symbol: *, -, +, ...
- * Outer Symbol: +, min, max, ...
- * 2 Operands
- *
- * Semantic: generate indices, align, cross-operate, generate indices, align, aggregate
- */
-
-public class AggBinaryOp extends Hop implements MultiThreadedHop
-{
- public static final double MAPMULT_MEM_MULTIPLIER = 1.0;
- public static MMultMethod FORCED_MMULT_METHOD = null;
-
- public enum MMultMethod {
- CPMM, //cross-product matrix multiplication (mr)
- RMM, //replication matrix multiplication (mr)
- MAPMM_L, //map-side matrix-matrix multiplication using distributed cache (mr/sp)
- MAPMM_R, //map-side matrix-matrix multiplication using distributed cache (mr/sp)
- MAPMM_CHAIN, //map-side matrix-matrix-matrix multiplication using distributed cache, for right input (cp/mr/sp)
- PMAPMM, //partitioned map-side matrix-matrix multiplication (sp)
- PMM, //permutation matrix multiplication using distributed cache, for left input (mr/cp)
- TSMM, //transpose-self matrix multiplication (cp/mr/sp)
- ZIPMM, //zip matrix multiplication (sp)
- MM //in-memory matrix multiplication (cp)
- };
-
- public enum SparkAggType{
- NONE,
- SINGLE_BLOCK,
- MULTI_BLOCK,
- }
-
- private OpOp2 innerOp;
- private AggOp outerOp;
-
- private MMultMethod _method = null;
-
- //hints set by previous to operator selection
- private boolean _hasLeftPMInput = false; //left input is permutation matrix
- private int _maxNumThreads = -1; //-1 for unlimited
-
- private AggBinaryOp() {
- //default constructor for clone
- }
-
- public AggBinaryOp(String l, DataType dt, ValueType vt, OpOp2 innOp,
- AggOp outOp, Hop in1, Hop in2) {
- super(l, dt, vt);
- innerOp = innOp;
- outerOp = outOp;
- getInput().add(0, in1);
- getInput().add(1, in2);
- in1.getParent().add(this);
- in2.getParent().add(this);
-
- //compute unknown dims and nnz
- refreshSizeInformation();
- }
-
- public void setHasLeftPMInput(boolean flag) {
- _hasLeftPMInput = flag;
- }
-
- public boolean hasLeftPMInput(){
- return _hasLeftPMInput;
- }
-
- @Override
- public void setMaxNumThreads( int k ) {
- _maxNumThreads = k;
- }
-
- @Override
- public int getMaxNumThreads() {
- return _maxNumThreads;
- }
-
- public MMultMethod getMMultMethod(){
- return _method;
- }
-
- /**
- * NOTE: overestimated mem in case of transpose-identity matmult, but 3/2 at worst
- * and existing mem estimate advantageous in terms of consistency hops/lops,
- * and some special cases internally materialize the transpose for better cache locality
- */
- @Override
- public Lop constructLops()
- throws HopsException, LopsException
- {
- //return already created lops
- if( getLops() != null )
- return getLops();
-
- //construct matrix mult lops (currently only supported aggbinary)
- if ( isMatrixMultiply() )
- {
- Hop input1 = getInput().get(0);
- Hop input2 = getInput().get(1);
-
- //matrix mult operation selection part 1 (CP vs MR vs Spark)
- ExecType et = optFindExecType();
-
- //matrix mult operation selection part 2 (specific pattern)
- MMTSJType mmtsj = checkTransposeSelf(); //determine tsmm pattern
- ChainType chain = checkMapMultChain(); //determine mmchain pattern
-
- if( et == ExecType.CP )
- {
- //matrix mult operation selection part 3 (CP type)
- _method = optFindMMultMethodCP ( input1.getDim1(), input1.getDim2(),
- input2.getDim1(), input2.getDim2(), mmtsj, chain, _hasLeftPMInput );
-
- //dispatch CP lops construction
- switch( _method ){
- case TSMM:
- constructCPLopsTSMM( mmtsj );
- break;
- case MAPMM_CHAIN:
- constructCPLopsMMChain( chain );
- break;
- case PMM:
- constructCPLopsPMM();
- break;
- case MM:
- constructCPLopsMM();
- break;
- default:
- throw new HopsException(this.printErrorLocation() + "Invalid Matrix Mult Method (" + _method + ") while constructing CP lops.");
- }
- }
- else if( et == ExecType.SPARK )
- {
- //matrix mult operation selection part 3 (SPARK type)
- boolean tmmRewrite = input1 instanceof ReorgOp && ((ReorgOp)input1).getOp()==ReOrgOp.TRANSPOSE;
- _method = optFindMMultMethodSpark (
- input1.getDim1(), input1.getDim2(), input1.getRowsInBlock(), input1.getColsInBlock(), input1.getNnz(),
- input2.getDim1(), input2.getDim2(), input2.getRowsInBlock(), input2.getColsInBlock(), input2.getNnz(),
- mmtsj, chain, _hasLeftPMInput, tmmRewrite );
-
- //dispatch SPARK lops construction
- switch( _method )
- {
- case TSMM:
- constructSparkLopsTSMM( mmtsj );
- break;
- case MAPMM_L:
- case MAPMM_R:
- constructSparkLopsMapMM( _method );
- break;
- case MAPMM_CHAIN:
- constructSparkLopsMapMMChain( chain );
- break;
- case PMAPMM:
- constructSparkLopsPMapMM();
- break;
- case CPMM:
- constructSparkLopsCPMM();
- break;
- case RMM:
- constructSparkLopsRMM();
- break;
- case PMM:
- constructSparkLopsPMM();
- break;
- case ZIPMM:
- constructSparkLopsZIPMM();
- break;
-
- default:
- throw new HopsException(this.printErrorLocation() + "Invalid Matrix Mult Method (" + _method + ") while constructing SPARK lops.");
- }
- }
- else if( et == ExecType.MR )
- {
- //matrix mult operation selection part 3 (MR type)
- _method = optFindMMultMethodMR (
- input1.getDim1(), input1.getDim2(), input1.getRowsInBlock(), input1.getColsInBlock(), input1.getNnz(),
- input2.getDim1(), input2.getDim2(), input2.getRowsInBlock(), input2.getColsInBlock(), input2.getNnz(),
- mmtsj, chain, _hasLeftPMInput);
-
- //dispatch MR lops construction
- switch( _method ) {
- case MAPMM_L:
- case MAPMM_R:
- constructMRLopsMapMM( _method );
- break;
- case MAPMM_CHAIN:
- constructMRLopsMapMMChain( chain );
- break;
- case CPMM:
- constructMRLopsCPMM();
- break;
- case RMM:
- constructMRLopsRMM();
- break;
- case TSMM:
- constructMRLopsTSMM( mmtsj );
- break;
- case PMM:
- constructMRLopsPMM();
- break;
- default:
- throw new HopsException(this.printErrorLocation() + "Invalid Matrix Mult Method (" + _method + ") while constructing MR lops.");
- }
- }
- }
- else
- throw new HopsException(this.printErrorLocation() + "Invalid operation in AggBinary Hop, aggBin(" + innerOp + "," + outerOp + ") while constructing lops.");
-
- //add reblock/checkpoint lops if necessary
- constructAndSetLopsDataFlowProperties();
-
- return getLops();
- }
-
- @Override
- public String getOpString() {
- //ba - binary aggregate, for consistency with runtime
- String s = "ba(" +
- HopsAgg2String.get(outerOp) +
- HopsOpOp2String.get(innerOp)+")";
- return s;
- }
-
- public void printMe() throws HopsException {
- if (LOG.isDebugEnabled()){
- if (getVisited() != VisitStatus.DONE) {
- super.printMe();
- LOG.debug(" InnerOperation: " + innerOp);
- LOG.debug(" OuterOperation: " + outerOp);
- for (Hop h : getInput()) {
- h.printMe();
- }
- ;
- }
- setVisited(VisitStatus.DONE);
- }
- }
-
-
-
- @Override
- public void computeMemEstimate(MemoTable memo)
- {
- //extension of default compute memory estimate in order to
- //account for smaller tsmm memory requirements.
- super.computeMemEstimate(memo);
-
- //tsmm left is guaranteed to require only X but not t(X), while
- //tsmm right might have additional requirements to transpose X if sparse
- //NOTE: as a heuristic this correction is only applied if not a column vector because
- //most other vector operations require memory for at least two vectors (we aim for
- //consistency in order to prevent anomalies in parfor opt leading to small degree of par)
- MMTSJType mmtsj = checkTransposeSelf();
- if( mmtsj.isLeft() && getInput().get(1).dimsKnown() && getInput().get(1).getDim2()>1 ) {
- _memEstimate = _memEstimate - getInput().get(0)._outputMemEstimate;
- }
- }
-
- @Override
- protected double computeOutputMemEstimate( long dim1, long dim2, long nnz )
- {
- //NOTES:
- // * The estimate for transpose-self is the same as for normal matrix multiplications
- // because (1) this decouples the decision of TSMM over default MM and (2) some cases
- // of TSMM internally materialize the transpose for efficiency.
- // * All matrix multiplications internally use dense output representations for efficiency.
- // This is reflected in our conservative memory estimate. However, we additionally need
- // to account for potential final dense/sparse transformations via processing mem estimates.
- double sparsity = 1.0;
- /*
- if( isMatrixMultiply() ) {
- if( nnz < 0 ){
- Hops input1 = getInput().get(0);
- Hops input2 = getInput().get(1);
- if( input1.dimsKnown() && input2.dimsKnown() )
- {
- double sp1 = (input1.getNnz()>0) ? OptimizerUtils.getSparsity(input1.getDim1(), input1.getDim2(), input1.getNnz()) : 1.0;
- double sp2 = (input2.getNnz()>0) ? OptimizerUtils.getSparsity(input2.getDim1(), input2.getDim2(), input2.getNnz()) : 1.0;
- sparsity = OptimizerUtils.getMatMultSparsity(sp1, sp2, input1.getDim1(), input1.getDim2(), input2.getDim2(), true);
- }
- }
- else //sparsity known (e.g., inferred from worst case estimates)
- sparsity = OptimizerUtils.getSparsity(dim1, dim2, nnz);
- }
- */
- //currently always estimated as dense in order to account for dense intermediate without unnecessary overestimation
- double ret = OptimizerUtils.estimateSizeExactSparsity(dim1, dim2, sparsity);
-
- return ret;
- }
-
- @Override
- protected double computeIntermediateMemEstimate( long dim1, long dim2, long nnz )
- {
- double ret = 0;
-
- //account for potential final dense-sparse transformation (worst-case sparse representation)
- if( dim2 >= 2 ) //vectors always dense
- ret = OptimizerUtils.estimateSizeExactSparsity(dim1, dim2, MatrixBlock.SPARSITY_TURN_POINT);
-
- return ret;
- }
-
- @Override
- protected long[] inferOutputCharacteristics( MemoTable memo )
- {
- long[] ret = null;
-
- MatrixCharacteristics[] mc = memo.getAllInputStats(getInput());
- if( mc[0].rowsKnown() && mc[1].colsKnown() ) {
- ret = new long[3];
- ret[0] = mc[0].getRows();
- ret[1] = mc[1].getCols();
- double sp1 = (mc[0].getNonZeros()>0) ? OptimizerUtils.getSparsity(mc[0].getRows(), mc[0].getCols(), mc[0].getNonZeros()) : 1.0;
- double sp2 = (mc[1].getNonZeros()>0) ? OptimizerUtils.getSparsity(mc[1].getRows(), mc[1].getCols(), mc[1].getNonZeros()) : 1.0;
- ret[2] = (long) ( ret[0] * ret[1] * OptimizerUtils.getMatMultSparsity(sp1, sp2, ret[0], mc[0].getCols(), ret[1], true));
- }
-
- return ret;
- }
-
-
- public boolean isMatrixMultiply() {
- return ( this.innerOp == OpOp2.MULT && this.outerOp == AggOp.SUM );
- }
-
- private boolean isOuterProduct() {
- if ( getInput().get(0).isVector() && getInput().get(1).isVector() ) {
- if ( getInput().get(0).getDim1() == 1 && getInput().get(0).getDim1() > 1
- && getInput().get(1).getDim1() > 1 && getInput().get(1).getDim2() == 1 )
- return true;
- else
- return false;
- }
- return false;
- }
-
- @Override
- public boolean allowsAllExecTypes()
- {
- return true;
- }
-
- @Override
- protected ExecType optFindExecType()
- throws HopsException
- {
- checkAndSetForcedPlatform();
-
- ExecType REMOTE = OptimizerUtils.isSparkExecutionMode() ? ExecType.SPARK : ExecType.MR;
-
- if( _etypeForced != null )
- {
- _etype = _etypeForced;
- }
- else
- {
- if ( OptimizerUtils.isMemoryBasedOptLevel() )
- {
- _etype = findExecTypeByMemEstimate();
- }
- // choose CP if the dimensions of both inputs are below Hops.CPThreshold
- // OR if it is vector-vector inner product
- else if ( (getInput().get(0).areDimsBelowThreshold() && getInput().get(1).areDimsBelowThreshold())
- || (getInput().get(0).isVector() && getInput().get(1).isVector() && !isOuterProduct()) )
- {
- _etype = ExecType.CP;
- }
- else
- {
- _etype = REMOTE;
- }
-
- //check for valid CP dimensions and matrix size
- checkAndSetInvalidCPDimsAndSize();
- }
-
- //spark-specific decision refinement (execute binary aggregate w/ left spark input and
- //single parent also in spark because it's likely cheap and reduces data transfer)
- if( _etype == ExecType.CP && _etypeForced != ExecType.CP
- && !(getInput().get(0) instanceof ReorgOp && ((ReorgOp)getInput().get(0)).getOp()==ReOrgOp.TRANSPOSE)
- && !(getInput().get(0) instanceof DataOp) //input is not checkpoint
- && getInput().get(0).getParent().size()==1 //bagg is only parent
- && !getInput().get(0).areDimsBelowThreshold()
- && getInput().get(0).optFindExecType() == ExecType.SPARK
- && getInput().get(0).getOutputMemEstimate()>getOutputMemEstimate() )
- {
- //pull unary aggregate into spark
- _etype = ExecType.SPARK;
- }
-
- //mark for recompile (forever)
- if( OptimizerUtils.ALLOW_DYN_RECOMPILATION && !dimsKnown(true) && _etype==REMOTE ) {
- setRequiresRecompile();
- }
-
- return _etype;
- }
-
- /**
- * TSMM: Determine if XtX pattern applies for this aggbinary and if yes
- * which type.
- *
- * @return
- */
- public MMTSJType checkTransposeSelf()
- {
- MMTSJType ret = MMTSJType.NONE;
-
- Hop in1 = getInput().get(0);
- Hop in2 = getInput().get(1);
-
- if( in1 instanceof ReorgOp
- && ((ReorgOp)in1).getOp() == ReOrgOp.TRANSPOSE
- && in1.getInput().get(0) == in2 )
- {
- ret = MMTSJType.LEFT;
- }
-
- if( in2 instanceof ReorgOp
- && ((ReorgOp)in2).getOp() == ReOrgOp.TRANSPOSE
- && in2.getInput().get(0) == in1 )
- {
- ret = MMTSJType.RIGHT;
- }
-
- return ret;
- }
-
- /**
- * MapMultChain: Determine if XtwXv/XtXv pattern applies for this aggbinary
- * and if yes which type.
- *
- * @return
- */
- public ChainType checkMapMultChain()
- {
- ChainType chainType = ChainType.NONE;
-
- Hop in1 = getInput().get(0);
- Hop in2 = getInput().get(1);
-
- //check for transpose left input (both chain types)
- if( in1 instanceof ReorgOp && ((ReorgOp)in1).getOp() == ReOrgOp.TRANSPOSE )
- {
- Hop X = in1.getInput().get(0);
-
- //check mapmultchain patterns
- //t(X)%*%(w*(X%*%v))
- if( in2 instanceof BinaryOp )
- {
- Hop in3b = in2.getInput().get(1);
- if( in3b instanceof AggBinaryOp )
- {
- Hop in4 = in3b.getInput().get(0);
- if( X == in4 ) //common input
- chainType = ChainType.XtwXv;
- }
- }
- //t(X)%*%(X%*%v)
- else if( in2 instanceof AggBinaryOp )
- {
- Hop in3 = in2.getInput().get(0);
- if( X == in3 ) //common input
- chainType = ChainType.XtXv;
- }
- }
-
- return chainType;
- }
-
- //////////////////////////
- // CP Lops generation
- /////////////////////////
-
- /**
- *
- * @param mmtsj
- * @throws HopsException
- * @throws LopsException
- */
- private void constructCPLopsTSMM( MMTSJType mmtsj )
- throws HopsException, LopsException
- {
- int k = OptimizerUtils.getConstrainedNumThreads(_maxNumThreads);
- Lop matmultCP = new MMTSJ(getInput().get(mmtsj.isLeft()?1:0).constructLops(),
- getDataType(), getValueType(), ExecType.CP, mmtsj, k);
-
- matmultCP.getOutputParameters().setDimensions(getDim1(), getDim2(), getRowsInBlock(), getColsInBlock(), getNnz());
- setLineNumbers( matmultCP );
- setLops(matmultCP);
- }
-
- /**
- *
- * @param chain
- * @throws LopsException
- * @throws HopsException
- */
- private void constructCPLopsMMChain( ChainType chain )
- throws LopsException, HopsException
- {
- MapMultChain mapmmchain = null;
- if( chain == ChainType.XtXv ) {
- Hop hX = getInput().get(0).getInput().get(0);
- Hop hv = getInput().get(1).getInput().get(1);
- mapmmchain = new MapMultChain( hX.constructLops(), hv.constructLops(), getDataType(), getValueType(), ExecType.CP);
- }
- else { //if( chainType == ChainType.XtwXv )
- Hop hX = getInput().get(0).getInput().get(0);
- Hop hw = getInput().get(1).getInput().get(0);
- Hop hv = getInput().get(1).getInput().get(1).getInput().get(1);
- mapmmchain = new MapMultChain( hX.constructLops(), hv.constructLops(), hw.constructLops(), getDataType(), getValueType(), ExecType.CP);
- }
-
- //set degree of parallelism
- int k = OptimizerUtils.getConstrainedNumThreads(_maxNumThreads);
- mapmmchain.setNumThreads( k );
-
- //set basic lop properties
- setOutputDimensions(mapmmchain);
- setLineNumbers(mapmmchain);
- setLops(mapmmchain);
- }
-
- /**
- * NOTE: exists for consistency since removeEmtpy might be scheduled to MR
- * but matrix mult on small output might be scheduled to CP. Hence, we
- * need to handle directly passed selection vectors in CP as well.
- *
- * @throws HopsException
- * @throws LopsException
- */
- private void constructCPLopsPMM()
- throws HopsException, LopsException
- {
- Hop pmInput = getInput().get(0);
- Hop rightInput = getInput().get(1);
-
- Hop nrow = HopRewriteUtils.createValueHop(pmInput, true); //NROW
- HopRewriteUtils.setOutputBlocksizes(nrow, 0, 0);
- nrow.setForcedExecType(ExecType.CP);
- HopRewriteUtils.copyLineNumbers(this, nrow);
- Lop lnrow = nrow.constructLops();
-
- PMMJ pmm = new PMMJ(pmInput.constructLops(), rightInput.constructLops(), lnrow, getDataType(), getValueType(), false, false, ExecType.CP);
-
- //set degree of parallelism
- int k = OptimizerUtils.getConstrainedNumThreads(_maxNumThreads);
- pmm.setNumThreads(k);
-
- pmm.getOutputParameters().setDimensions(getDim1(), getDim2(), getRowsInBlock(), getColsInBlock(), getNnz());
- setLineNumbers(pmm);
-
- setLops(pmm);
-
- HopRewriteUtils.removeChildReference(pmInput, nrow);
- }
-
- /**
- *
- * @throws HopsException
- * @throws LopsException
- */
- private void constructCPLopsMM()
- throws HopsException, LopsException
- {
- Lop matmultCP = null;
- if( isLeftTransposeRewriteApplicable(true, false) ) {
- matmultCP = constructCPLopsMMWithLeftTransposeRewrite();
- }
- else {
- int k = OptimizerUtils.getConstrainedNumThreads(_maxNumThreads);
- matmultCP = new Binary(getInput().get(0).constructLops(),getInput().get(1).constructLops(),
- Binary.OperationTypes.MATMULT, getDataType(), getValueType(), ExecType.CP, k);
- }
-
- setOutputDimensions(matmultCP);
- setLineNumbers( matmultCP );
- setLops(matmultCP);
- }
-
- /**
- *
- * @return
- * @throws HopsException
- * @throws LopsException
- */
- private Lop constructCPLopsMMWithLeftTransposeRewrite()
- throws HopsException, LopsException
- {
- Hop X = getInput().get(0).getInput().get(0); //guaranteed to exists
- Hop Y = getInput().get(1);
-
- //right vector transpose
- Lop tY = new Transform(Y.constructLops(), OperationTypes.Transpose, getDataType(), getValueType(), ExecType.CP);
- tY.getOutputParameters().setDimensions(Y.getDim2(), Y.getDim1(), getRowsInBlock(), getColsInBlock(), Y.getNnz());
- setLineNumbers(tY);
-
- //matrix mult
- int k = OptimizerUtils.getConstrainedNumThreads(_maxNumThreads);
- Lop mult = new Binary(tY, X.constructLops(), Binary.OperationTypes.MATMULT, getDataType(), getValueType(), ExecType.CP, k);
- mult.getOutputParameters().setDimensions(Y.getDim2(), X.getDim2(), getRowsInBlock(), getColsInBlock(), getNnz());
- setLineNumbers(mult);
-
- //result transpose (dimensions set outside)
- Lop out = new Transform(mult, OperationTypes.Transpose, getDataType(), getValueType(), ExecType.CP);
-
- return out;
- }
-
- //////////////////////////
- // Spark Lops generation
- /////////////////////////
-
- /**
- *
- * @param mmtsj
- * @throws HopsException
- * @throws LopsException
- */
- private void constructSparkLopsTSMM(MMTSJType mmtsj)
- throws HopsException, LopsException
- {
- Hop input = getInput().get(mmtsj.isLeft()?1:0);
- MMTSJ tsmm = new MMTSJ(input.constructLops(), getDataType(), getValueType(), ExecType.SPARK, mmtsj);
- setOutputDimensions(tsmm);
- setLineNumbers(tsmm);
- setLops(tsmm);
- }
-
- /**
- *
- * @param method
- * @throws LopsException
- * @throws HopsException
- */
- private void constructSparkLopsMapMM(MMultMethod method)
- throws LopsException, HopsException
- {
- Lop mapmult = null;
- if( isLeftTransposeRewriteApplicable(false, false) )
- {
- mapmult = constructSparkLopsMapMMWithLeftTransposeRewrite();
- }
- else
- {
- // If number of columns is smaller than block size then explicit aggregation is not required.
- // i.e., entire matrix multiplication can be performed in the mappers.
- boolean needAgg = requiresAggregation(method);
- SparkAggType aggtype = getSparkMMAggregationType(needAgg);
- _outputEmptyBlocks = !OptimizerUtils.allowsToFilterEmptyBlockOutputs(this);
-
- //core matrix mult
- mapmult = new MapMult( getInput().get(0).constructLops(), getInput().get(1).constructLops(),
- getDataType(), getValueType(), (method==MMultMethod.MAPMM_R), false,
- _outputEmptyBlocks, aggtype);
- }
- setOutputDimensions(mapmult);
- setLineNumbers(mapmult);
- setLops(mapmult);
- }
-
- /**
- *
- * @return
- * @throws HopsException
- * @throws LopsException
- */
- private Lop constructSparkLopsMapMMWithLeftTransposeRewrite()
- throws HopsException, LopsException
- {
- Hop X = getInput().get(0).getInput().get(0); //guaranteed to exists
- Hop Y = getInput().get(1);
-
- //right vector transpose
- Lop tY = new Transform(Y.constructLops(), OperationTypes.Transpose, getDataType(), getValueType(), ExecType.CP);
- tY.getOutputParameters().setDimensions(Y.getDim2(), Y.getDim1(), getRowsInBlock(), getColsInBlock(), Y.getNnz());
- setLineNumbers(tY);
-
- //matrix mult spark
- boolean needAgg = requiresAggregation(MMultMethod.MAPMM_R);
- SparkAggType aggtype = getSparkMMAggregationType(needAgg);
- _outputEmptyBlocks = !OptimizerUtils.allowsToFilterEmptyBlockOutputs(this);
-
- Lop mult = new MapMult( tY, X.constructLops(), getDataType(), getValueType(),
- false, false, _outputEmptyBlocks, aggtype);
- mult.getOutputParameters().setDimensions(Y.getDim2(), X.getDim2(), getRowsInBlock(), getColsInBlock(), getNnz());
- setLineNumbers(mult);
-
- //result transpose (dimensions set outside)
- Lop out = new Transform(mult, OperationTypes.Transpose, getDataType(), getValueType(), ExecType.CP);
-
- return out;
- }
-
- /**
- *
- * @param chain
- * @throws HopsException
- * @throws LopsException
- */
- private void constructSparkLopsMapMMChain(ChainType chain)
- throws LopsException, HopsException
- {
- MapMultChain mapmmchain = null;
- if( chain == ChainType.XtXv ) {
- Hop hX = getInput().get(0).getInput().get(0);
- Hop hv = getInput().get(1).getInput().get(1);
- mapmmchain = new MapMultChain( hX.constructLops(), hv.constructLops(), getDataType(), getValueType(), ExecType.SPARK);
- }
- else { //if( chainType == ChainType.XtwXv )
- Hop hX = getInput().get(0).getInput().get(0);
- Hop hw = getInput().get(1).getInput().get(0);
- Hop hv = getInput().get(1).getInput().get(1).getInput().get(1);
- mapmmchain = new MapMultChain( hX.constructLops(), hv.constructLops(), hw.constructLops(), getDataType(), getValueType(), ExecType.SPARK);
- }
- setOutputDimensions(mapmmchain);
- setLineNumbers(mapmmchain);
- setLops(mapmmchain);
- }
-
- /**
- *
- * @throws LopsException
- * @throws HopsException
- */
- private void constructSparkLopsPMapMM()
- throws LopsException, HopsException
- {
- PMapMult pmapmult = new PMapMult(
- getInput().get(0).constructLops(),
- getInput().get(1).constructLops(),
- getDataType(), getValueType() );
- setOutputDimensions(pmapmult);
- setLineNumbers(pmapmult);
- setLops(pmapmult);
- }
-
- /**
- *
- * @throws HopsException
- * @throws LopsException
- */
- private void constructSparkLopsCPMM()
- throws HopsException, LopsException
- {
- if( isLeftTransposeRewriteApplicable(false, false) )
- {
- setLops( constructSparkLopsCPMMWithLeftTransposeRewrite() );
- }
- else
- {
- SparkAggType aggtype = getSparkMMAggregationType(true);
-
- Lop cpmm = new MMCJ(getInput().get(0).constructLops(), getInput().get(1).constructLops(),
- getDataType(), getValueType(), aggtype, ExecType.SPARK);
- setOutputDimensions( cpmm );
- setLineNumbers( cpmm );
- setLops( cpmm );
- }
- }
-
-
- /**
- *
- * @return
- * @throws HopsException
- * @throws LopsException
- */
- private Lop constructSparkLopsCPMMWithLeftTransposeRewrite()
- throws HopsException, LopsException
- {
- SparkAggType aggtype = getSparkMMAggregationType(true);
-
- Hop X = getInput().get(0).getInput().get(0); //guaranteed to exists
- Hop Y = getInput().get(1);
-
- //right vector transpose CP
- Lop tY = new Transform(Y.constructLops(), OperationTypes.Transpose, getDataType(), getValueType(), ExecType.CP);
- tY.getOutputParameters().setDimensions(Y.getDim2(), Y.getDim1(), getRowsInBlock(), getColsInBlock(), Y.getNnz());
- setLineNumbers(tY);
-
- //matrix multiply
- MMCJ mmcj = new MMCJ(tY, X.constructLops(), getDataType(), getValueType(), aggtype, ExecType.SPARK);
- mmcj.getOutputParameters().setDimensions(getDim1(), getDim2(), getRowsInBlock(), getColsInBlock(), getNnz());
- setLineNumbers(mmcj);
-
- //result transpose CP
- Lop out = new Transform(mmcj, OperationTypes.Transpose, getDataType(), getValueType(), ExecType.CP);
- out.getOutputParameters().setDimensions(X.getDim2(), Y.getDim2(), getRowsInBlock(), getColsInBlock(), getNnz());
-
- return out;
- }
-
- /**
- *
- * @param chain
- * @throws LopsException
- * @throws HopsException
- */
- private void constructSparkLopsRMM()
- throws LopsException, HopsException
- {
- Lop rmm = new MMRJ(getInput().get(0).constructLops(),getInput().get(1).constructLops(),
- getDataType(), getValueType(), ExecType.SPARK);
- setOutputDimensions(rmm);
- setLineNumbers( rmm );
- setLops(rmm);
- }
-
- /**
- *
- * @throws HopsException
- * @throws LopsException
- */
- private void constructSparkLopsPMM()
- throws HopsException, LopsException
- {
- //PMM has two potential modes (a) w/ full permutation matrix input, and
- //(b) w/ already condensed input vector of target row positions.
-
- Hop pmInput = getInput().get(0);
- Hop rightInput = getInput().get(1);
- long brlen = pmInput.getRowsInBlock();
- long bclen = pmInput.getColsInBlock();
-
- //a) full permutation matrix input (potentially without empty block materialized)
- Lop lpmInput = pmInput.constructLops();
- if( pmInput.getDim2() != 1 ) //not a vector
- {
- //compute condensed permutation matrix vector input
- //v = rowMaxIndex(t(pm)) * rowMax(t(pm))
- ReorgOp transpose = HopRewriteUtils.createTranspose(pmInput);
- transpose.setForcedExecType(ExecType.SPARK);
- HopRewriteUtils.copyLineNumbers(this, transpose);
-
- AggUnaryOp agg1 = new AggUnaryOp("tmp2a", DataType.MATRIX, ValueType.DOUBLE, AggOp.MAXINDEX, Direction.Row, transpose);
- HopRewriteUtils.setOutputBlocksizes(agg1, brlen, bclen);
- agg1.refreshSizeInformation();
- agg1.setForcedExecType(ExecType.SPARK);
- HopRewriteUtils.copyLineNumbers(this, agg1);
-
- AggUnaryOp agg2 = new AggUnaryOp("tmp2b", DataType.MATRIX, ValueType.DOUBLE, AggOp.MAX, Direction.Row, transpose);
- HopRewriteUtils.setOutputBlocksizes(agg2, brlen, bclen);
- agg2.refreshSizeInformation();
- agg2.setForcedExecType(ExecType.SPARK);
- HopRewriteUtils.copyLineNumbers(this, agg2);
-
- BinaryOp mult = new BinaryOp("tmp3", DataType.MATRIX, ValueType.DOUBLE, OpOp2.MULT, agg1, agg2);
- HopRewriteUtils.setOutputBlocksizes(mult, brlen, bclen);
- mult.refreshSizeInformation();
- mult.setForcedExecType(ExecType.SPARK);
- //mult.computeMemEstimate(memo); //select exec type
- HopRewriteUtils.copyLineNumbers(this, mult);
-
- lpmInput = mult.constructLops();
-
- HopRewriteUtils.removeChildReference(pmInput, transpose);
- }
-
- //b) condensed permutation matrix vector input (target rows)
- Hop nrow = HopRewriteUtils.createValueHop(pmInput, true); //NROW
- HopRewriteUtils.setOutputBlocksizes(nrow, 0, 0);
- nrow.setForcedExecType(ExecType.CP);
- HopRewriteUtils.copyLineNumbers(this, nrow);
- Lop lnrow = nrow.constructLops();
-
- _outputEmptyBlocks = !OptimizerUtils.allowsToFilterEmptyBlockOutputs(this);
- PMMJ pmm = new PMMJ(lpmInput, rightInput.constructLops(), lnrow, getDataType(), getValueType(), false, _outputEmptyBlocks, ExecType.SPARK);
- setOutputDimensions(pmm);
- setLineNumbers(pmm);
- setLops(pmm);
-
- HopRewriteUtils.removeChildReference(pmInput, nrow);
- }
-
- /**
- *
- * @throws HopsException
- * @throws LopsException
- */
- private void constructSparkLopsZIPMM()
- throws HopsException, LopsException
- {
- //zipmm applies to t(X)%*%y if ncol(X)<=blocksize and it prevents
- //unnecessary reshuffling by keeping the original indexes (and partitioning)
- //joining the datasets, and internally doing the necessary transpose operations
-
- Hop left = getInput().get(0).getInput().get(0); //x out of t(X)
- Hop right = getInput().get(1); //y
-
- //determine left-transpose rewrite beneficial
- boolean tRewrite = (left.getDim1()*left.getDim2() >= right.getDim1()*right.getDim2());
-
- Lop zipmm = new MMZip(left.constructLops(), right.constructLops(), getDataType(), getValueType(), tRewrite, ExecType.SPARK);
- setOutputDimensions(zipmm);
- setLineNumbers( zipmm );
- setLops(zipmm);
- }
-
- //////////////////////////
- // MR Lops generation
- /////////////////////////
-
-
- /**
- *
- * @param method
- * @throws HopsException
- * @throws LopsException
- */
- private void constructMRLopsMapMM(MMultMethod method)
- throws HopsException, LopsException
- {
- if( method == MMultMethod.MAPMM_R && isLeftTransposeRewriteApplicable(false, true) )
- {
- setLops( constructMRLopsMapMMWithLeftTransposeRewrite() );
- }
- else //GENERAL CASE
- {
- // If number of columns is smaller than block size then explicit aggregation is not required.
- // i.e., entire matrix multiplication can be performed in the mappers.
- boolean needAgg = requiresAggregation(method);
- boolean needPart = requiresPartitioning(method, false);
- _outputEmptyBlocks = !OptimizerUtils.allowsToFilterEmptyBlockOutputs(this);
-
- //pre partitioning
- Lop leftInput = getInput().get(0).constructLops();
- Lop rightInput = getInput().get(1).constructLops();
- if( needPart ) {
- if( (method==MMultMethod.MAPMM_L) ) //left in distributed cache
- {
- Hop input = getInput().get(0);
- ExecType etPart = (OptimizerUtils.estimateSizeExactSparsity(input.getDim1(), input.getDim2(), OptimizerUtils.getSparsity(input.getDim1(), input.getDim2(), input.getNnz()))
- < OptimizerUtils.getLocalMemBudget()) ? ExecType.CP : ExecType.MR; //operator selection
- leftInput = new DataPartition(input.constructLops(), DataType.MATRIX, ValueType.DOUBLE, etPart, PDataPartitionFormat.COLUMN_BLOCK_WISE_N);
- leftInput.getOutputParameters().setDimensions(input.getDim1(), input.getDim2(), getRowsInBlock(), getColsInBlock(), input.getNnz());
- setLineNumbers(leftInput);
- }
- else //right side in distributed cache
- {
- Hop input = getInput().get(1);
- ExecType etPart = (OptimizerUtils.estimateSizeExactSparsity(input.getDim1(), input.getDim2(), OptimizerUtils.getSparsity(input.getDim1(), input.getDim2(), input.getNnz()))
- < OptimizerUtils.getLocalMemBudget()) ? ExecType.CP : ExecType.MR; //operator selection
- rightInput = new DataPartition(input.constructLops(), DataType.MATRIX, ValueType.DOUBLE, etPart, PDataPartitionFormat.ROW_BLOCK_WISE_N);
- rightInput.getOutputParameters().setDimensions(input.getDim1(), input.getDim2(), getRowsInBlock(), getColsInBlock(), input.getNnz());
- setLineNumbers(rightInput);
- }
- }
-
- //core matrix mult
- MapMult mapmult = new MapMult( leftInput, rightInput, getDataType(), getValueType(),
- (method==MMultMethod.MAPMM_R), needPart, _outputEmptyBlocks);
- mapmult.getOutputParameters().setDimensions(getDim1(), getDim2(), getRowsInBlock(), getColsInBlock(), getNnz());
- setLineNumbers(mapmult);
-
- //post aggregation
- if (needAgg) {
- Group grp = new Group(mapmult, Group.OperationTypes.Sort, getDataType(), getValueType());
- Aggregate agg1 = new Aggregate(grp, HopsAgg2Lops.get(outerOp), getDataType(), getValueType(), ExecType.MR);
-
- grp.getOutputParameters().setDimensions(getDim1(), getDim2(), getRowsInBlock(), getColsInBlock(), getNnz());
- agg1.getOutputParameters().setDimensions(getDim1(), getDim2(), getRowsInBlock(), getColsInBlock(), getNnz());
- setLineNumbers(agg1);
-
- // aggregation uses kahanSum but the inputs do not have correction values
- agg1.setupCorrectionLocation(CorrectionLocationType.NONE);
-
- setLops(agg1);
- }
- else {
- setLops(mapmult);
- }
- }
- }
-
-
- /**
- *
- * @return
- * @throws HopsException
- * @throws LopsException
- */
- private Lop constructMRLopsMapMMWithLeftTransposeRewrite()
- throws HopsException, LopsException
- {
- Hop X = getInput().get(0).getInput().get(0); //guaranteed to exists
- Hop Y = getInput().get(1);
-
- //right vector transpose CP
- Lop tY = new Transform(Y.constructLops(), OperationTypes.Transpose, getDataType(), getValueType(), ExecType.CP);
- tY.getOutputParameters().setDimensions(Y.getDim2(), Y.getDim1(), getRowsInBlock(), getColsInBlock(), Y.getNnz());
- setLineNumbers(tY);
-
- //matrix mult
-
- // If number of columns is smaller than block size then explicit aggregation is not required.
- // i.e., entire matrix multiplication can be performed in the mappers.
- boolean needAgg = ( X.getDim1() <= 0 || X.getDim1() > X.getRowsInBlock() );
- boolean needPart = requiresPartitioning(MMultMethod.MAPMM_R, true); //R disregarding transpose rewrite
-
- //pre partitioning
- Lop dcinput = null;
- if( needPart ) {
- ExecType etPart = (OptimizerUtils.estimateSizeExactSparsity(Y.getDim2(), Y.getDim1(), OptimizerUtils.getSparsity(Y.getDim2(), Y.getDim1(), Y.getNnz()))
- < OptimizerUtils.getLocalMemBudget()) ? ExecType.CP : ExecType.MR; //operator selection
- dcinput = new DataPartition(tY, DataType.MATRIX, ValueType.DOUBLE, etPart, PDataPartitionFormat.COLUMN_BLOCK_WISE_N);
- dcinput.getOutputParameters().setDimensions(Y.getDim2(), Y.getDim1(), getRowsInBlock(), getColsInBlock(), Y.getNnz());
- setLineNumbers(dcinput);
- }
- else
- dcinput = tY;
-
- MapMult mapmult = new MapMult(dcinput, X.constructLops(), getDataType(), getValueType(), false, needPart, false);
- mapmult.getOutputParameters().setDimensions(Y.getDim2(), X.getDim2(), getRowsInBlock(), getColsInBlock(), getNnz());
- setLineNumbers(mapmult);
-
- //post aggregation
- Lop mult = null;
- if( needAgg ) {
- Group grp = new Group(mapmult, Group.OperationTypes.Sort, getDataType(), getValueType());
- grp.getOutputParameters().setDimensions(Y.getDim2(), X.getDim2(), getRowsInBlock(), getColsInBlock(), getNnz());
- setLineNumbers(grp);
-
- Aggregate agg1 = new Aggregate(grp, HopsAgg2Lops.get(outerOp), getDataType(), getValueType(), ExecType.MR);
- agg1.getOutputParameters().setDimensions(Y.getDim2(), X.getDim2(), getRowsInBlock(), getColsInBlock(), getNnz());
- setLineNumbers(agg1);
- agg1.setupCorrectionLocation(CorrectionLocationType.NONE);
- mult = agg1;
- }
- else
- mult = mapmult;
-
- //result transpose CP
- Lop out = new Transform(mult, OperationTypes.Transpose, getDataType(), getValueType(), ExecType.CP);
- out.getOutputParameters().setDimensions(X.getDim2(), Y.getDim2(), getRowsInBlock(), getColsInBlock(), getNnz());
-
- return out;
- }
-
-
- /**
- *
- * @param chainType
- * @throws HopsException
- * @throws LopsException
- */
- private void constructMRLopsMapMMChain( ChainType chainType )
- throws HopsException, LopsException
- {
- Lop mapmult = null;
-
- if( chainType == ChainType.XtXv )
- {
- //v never needs partitioning because always single block
- Hop hX = getInput().get(0).getInput().get(0);
- Hop hv = getInput().get(1).getInput().get(1);
-
- //core matrix mult
- mapmult = new MapMultChain( hX.constructLops(), hv.constructLops(), getDataType(), getValueType(), ExecType.MR);
- mapmult.getOutputParameters().setDimensions(getDim1(), getDim2(), getRowsInBlock(), getColsInBlock(), getNnz());
- setLineNumbers(mapmult);
- }
- else //if( chainType == ChainType.XtwXv )
- {
- //v never needs partitioning because always single block
- Hop hX = getInput().get(0).getInput().get(0);
- Hop hw = getInput().get(1).getInput().get(0);
- Hop hv = getInput().get(1).getInput().get(1).getInput().get(1);
-
- double mestW = OptimizerUtils.estimateSize(hw.getDim1(), hw.getDim2());
- boolean needPart = !hw.dimsKnown() || hw.getDim1() * hw.getDim2() > DistributedCacheInput.PARTITION_SIZE;
- Lop X = hX.constructLops(), v = hv.constructLops(), w = null;
- if( needPart ){ //requires partitioning
- w = new DataPartition(hw.constructLops(), DataType.MATRIX, ValueType.DOUBLE, (mestW>OptimizerUtils.getLocalMemBudget())?ExecType.MR:ExecType.CP, PDataPartitionFormat.ROW_BLOCK_WISE_N);
- w.getOutputParameters().setDimensions(hw.getDim1(), hw.getDim2(), getRowsInBlock(), getColsInBlock(), hw.getNnz());
- setLineNumbers(w);
- }
- else
- w = hw.constructLops();
-
- //core matrix mult
- mapmult = new MapMultChain( X, v, w, getDataType(), getValueType(), ExecType.MR);
- mapmult.getOutputParameters().setDimensions(getDim1(), getDim2(), getRowsInBlock(), getColsInBlock(), getNnz());
- setLineNumbers(mapmult);
- }
-
- //post aggregation
- Group grp = new Group(mapmult, Group.OperationTypes.Sort, getDataType(), getValueType());
- grp.getOutputParameters().setDimensions(getDim1(), getDim2(), getRowsInBlock(), getColsInBlock(), getNnz());
- Aggregate agg1 = new Aggregate(grp, HopsAgg2Lops.get(outerOp), getDataType(), getValueType(), ExecType.MR);
- agg1.getOutputParameters().setDimensions(getDim1(), getDim2(), getRowsInBlock(), getColsInBlock(), getNnz());
- agg1.setupCorrectionLocation(CorrectionLocationType.NONE); // aggregation uses kahanSum
- setLineNumbers(agg1);
-
- setLops(agg1);
- }
-
- /**
- *
- * @throws HopsException
- * @throws LopsException
- */
- private void constructMRLopsCPMM()
- throws HopsException, LopsException
- {
- if( isLeftTransposeRewriteApplicable(false, false) )
- {
- setLops( constructMRLopsCPMMWithLeftTransposeRewrite() );
- }
- else //general case
- {
- Hop X = getInput().get(0);
- Hop Y = getInput().get(1);
-
- MMCJType type = getMMCJAggregationType(X, Y);
- MMCJ mmcj = new MMCJ(X.constructLops(), Y.constructLops(), getDataType(), getValueType(), type, ExecType.MR);
- setOutputDimensions(mmcj);
- setLineNumbers(mmcj);
-
- Group grp = new Group(mmcj, Group.OperationTypes.Sort, getDataType(), getValueType());
- setOutputDimensions(grp);
- setLineNumbers(grp);
-
- Aggregate agg1 = new Aggregate(grp, HopsAgg2Lops.get(outerOp), getDataType(), getValueType(), ExecType.MR);
- setOutputDimensions(agg1);
- setLineNumbers(agg1);
-
- // aggregation uses kahanSum but the inputs do not have correction values
- agg1.setupCorrectionLocation(CorrectionLocationType.NONE);
-
- setLops(agg1);
- }
- }
-
- /**
- *
- * @return
- * @throws HopsException
- * @throws LopsException
- */
- private Lop constructMRLopsCPMMWithLeftTransposeRewrite()
- throws HopsException, LopsException
- {
- Hop X = getInput().get(0).getInput().get(0); //guaranteed to exists
- Hop Y = getInput().get(1);
-
- //right vector transpose CP
- Lop tY = new Transform(Y.constructLops(), OperationTypes.Transpose, getDataType(), getValueType(), ExecType.CP);
- tY.getOutputParameters().setDimensions(Y.getDim2(), Y.getDim1(), getRowsInBlock(), getColsInBlock(), Y.getNnz());
- setLineNumbers(tY);
-
- //matrix multiply
- MMCJType type = getMMCJAggregationType(X, Y);
- MMCJ mmcj = new MMCJ(tY, X.constructLops(), getDataType(), getValueType(), type, ExecType.MR);
- setOutputDimensions(mmcj);
- setLineNumbers(mmcj);
-
- Group grp = new Group(mmcj, Group.OperationTypes.Sort, getDataType(), getValueType());
- setOutputDimensions(grp);
- setLineNumbers(grp);
-
- Aggregate agg1 = new Aggregate(grp, HopsAgg2Lops.get(outerOp), getDataType(), getValueType(), ExecType.MR);
- setOutputDimensions(agg1);
- setLineNumbers(agg1);
-
- // aggregation uses kahanSum but the inputs do not have correction values
- agg1.setupCorrectionLocation(CorrectionLocationType.NONE);
-
-
- //result transpose CP
- Lop out = new Transform(agg1, OperationTypes.Transpose, getDataType(), getValueType(), ExecType.CP);
- out.getOutputParameters().setDimensions(X.getDim2(), Y.getDim2(), getRowsInBlock(), getColsInBlock(), getNnz());
-
- return out;
- }
-
- /**
- *
- * @throws HopsException
- * @throws LopsException
- */
- private void constructMRLopsRMM()
- throws HopsException, LopsException
- {
- MMRJ rmm = new MMRJ(getInput().get(0).constructLops(), getInput().get(1).constructLops(),
- getDataType(), getValueType(), ExecType.MR);
-
- setOutputDimensions(rmm);
- setLineNumbers(rmm);
- setLops(rmm);
- }
-
- /**
- *
- * @param mmtsj
- * @throws HopsException
- * @throws LopsException
- */
- private void constructMRLopsTSMM(MMTSJType mmtsj)
- throws HopsException, LopsException
- {
- Hop input = getInput().get(mmtsj.isLeft()?1:0);
-
- MMTSJ tsmm = new MMTSJ(input.constructLops(), getDataType(), getValueType(), ExecType.MR, mmtsj);
- tsmm.getOutputParameters().setDimensions(getDim1(), getDim2(), getRowsInBlock(), getColsInBlock(), getNnz());
- setLineNumbers(tsmm);
-
- Aggregate agg1 = new Aggregate(tsmm, HopsAgg2Lops.get(outerOp), getDataType(), getValueType(), ExecType.MR);
- agg1.getOutputParameters().setDimensions(getDim1(), getDim2(), getRowsInBlock(), getColsInBlock(), getNnz());
- agg1.setupCorrectionLocation(CorrectionLocationType.NONE); // aggregation uses kahanSum but the inputs do not have correction values
- setLineNumbers(agg1);
-
- setLops(agg1);
- }
-
- /**
- *
- * @throws HopsException
- * @throws LopsException
- */
- private void constructMRLopsPMM()
- throws HopsException, LopsException
- {
- //PMM has two potential modes (a) w/ full permutation matrix input, and
- //(b) w/ already condensed input vector of target row positions.
-
- Hop pmInput = getInput().get(0);
- Hop rightInput = getInput().get(1);
- long brlen = pmInput.getRowsInBlock();
- long bclen = pmInput.getColsInBlock();
-
- //a) full permutation matrix input (potentially without empty block materialized)
- Lop lpmInput = pmInput.constructLops();
- if( pmInput.getDim2() != 1 ) //not a vector
- {
- //compute condensed permutation matrix vector input
- //v = rowMaxIndex(t(pm)) * rowMax(t(pm))
- ReorgOp transpose = HopRewriteUtils.createTranspose(pmInput);
- transpose.setForcedExecType(ExecType.MR);
- HopRewriteUtils.copyLineNumbers(this, transpose);
-
- AggUnaryOp agg1 = new AggUnaryOp("tmp2a", DataType.MATRIX, ValueType.DOUBLE, AggOp.MAXINDEX, Direction.Row, transpose);
- HopRewriteUtils.setOutputBlocksizes(agg1, brlen, bclen);
- agg1.refreshSizeInformation();
- agg1.setForcedExecType(ExecType.MR);
- HopRewriteUtils.copyLineNumbers(this, agg1);
-
- AggUnaryOp agg2 = new AggUnaryOp("tmp2b", DataType.MATRIX, ValueType.DOUBLE, AggOp.MAX, Direction.Row, transpose);
- HopRewriteUtils.setOutputBlocksizes(agg2, brlen, bclen);
- agg2.refreshSizeInformation();
- agg2.setForcedExecType(ExecType.MR);
- HopRewriteUtils.copyLineNumbers(this, agg2);
-
- BinaryOp mult = new BinaryOp("tmp3", DataType.MATRIX, ValueType.DOUBLE, OpOp2.MULT, agg1, agg2);
- HopRewriteUtils.setOutputBlocksizes(mult, brlen, bclen);
- mult.refreshSizeInformation();
- mult.setForcedExecType(ExecType.MR);
- //mult.computeMemEstimate(memo); //select exec type
- HopRewriteUtils.copyLineNumbers(this, mult);
-
- lpmInput = mult.constructLops();
-
- HopRewriteUtils.removeChildReference(pmInput, transpose);
- }
-
- //b) condensed permutation matrix vector input (target rows)
- Hop nrow = HopRewriteUtils.createValueHop(pmInput, true); //NROW
- HopRewriteUtils.setOutputBlocksizes(nrow, 0, 0);
- nrow.setForcedExecType(ExecType.CP);
- HopRewriteUtils.copyLineNumbers(this, nrow);
- Lop lnrow = nrow.constructLops();
-
- boolean needPart = !pmInput.dimsKnown() || pmInput.getDim1() > DistributedCacheInput.PARTITION_SIZE;
- double mestPM = OptimizerUtils.estimateSize(pmInput.getDim1(), 1);
- if( needPart ){ //requires partitioning
- lpmInput = new DataPartition(lpmInput, DataType.MATRIX, ValueType.DOUBLE, (mestPM>OptimizerUtils.getLocalMemBudget())?ExecType.MR:ExecType.CP, PDataPartitionFormat.ROW_BLOCK_WISE_N);
- lpmInput.getOutputParameters().setDimensions(pmInput.getDim1(), 1, getRowsInBlock(), getColsInBlock(), pmInput.getDim1());
- setLineNumbers(lpmInput);
- }
-
- _outputEmptyBlocks = !OptimizerUtils.allowsToFilterEmptyBlockOutputs(this);
- PMMJ pmm = new PMMJ(lpmInput, rightInput.constructLops(), lnrow, getDataType(), getValueType(), needPart, _outputEmptyBlocks, ExecType.MR);
- pmm.getOutputParameters().setDimensions(getDim1(), getDim2(), getRowsInBlock(), getColsInBlock(), getNnz());
- setLineNumbers(pmm);
-
- Aggregate aggregate = new Aggregate(pmm, HopsAgg2Lops.get(outerOp), getDataType(), getValueType(), ExecType.MR);
- aggregate.getOutputParameters().setDimensions(getDim1(), getDim2(), getRowsInBlock(), getColsInBlock(), getNnz());
- aggregate.setupCorrectionLocation(CorrectionLocationType.NONE); // aggregation uses kahanSum but the inputs do not have correction values
- setLineNumbers(aggregate);
-
- setLops(aggregate);
-
- HopRewriteUtils.removeChildReference(pmInput, nrow);
- }
-
-
-
- /**
- * Determines if the rewrite t(X)%*%Y -> t(t(Y)%*%X) is applicable
- * and cost effective. Whenever X is a wide matrix and Y is a vector
- * this has huge impact, because the transpose of X would dominate
- * the entire operation costs.
- *
- * @return
- */
- private boolean isLeftTransposeRewriteApplicable(boolean CP, boolean checkMemMR)
- {
- //check for forced MR or Spark execution modes, which prevent the introduction of
- //additional CP operations and hence the rewrite application
- if( DMLScript.rtplatform == RUNTIME_PLATFORM.HADOOP //not hybrid_mr
- || DMLScript.rtplatform == RUNTIME_PLATFORM.SPARK ) //not hybrid_spark
- {
- return false;
- }
-
- boolean ret = false;
- Hop h1 = getInput().get(0);
- Hop h2 = getInput().get(1);
-
- //check for known dimensions and cost for t(X) vs t(v) + t(tvX)
- //(for both CP/MR, we explicitly check that new transposes fit in memory,
- //even a ba in CP does not imply that both transposes can be executed in CP)
- if( CP ) //in-memory ba
- {
- if( h1 instanceof ReorgOp && ((ReorgOp)h1).getOp()==ReOrgOp.TRANSPOSE )
- {
- long m = h1.getDim1();
- long cd = h1.getDim2();
- long n = h2.getDim2();
-
- //check for known dimensions (necessary condition for subsequent checks)
- ret = (m>0 && cd>0 && n>0);
-
- //check operation memory with changed transpose (this is important if we have
- //e.g., t(X) %*% v, where X is sparse and tX fits in memory but X does not
- double memX = h1.getInput().get(0).getOutputMemEstimate();
- double memtv = OptimizerUtils.estimateSizeExactSparsity(n, cd, 1.0);
- double memtXv = OptimizerUtils.estimateSizeExactSparsity(n, m, 1.0);
- double newMemEstimate = memtv + memX + memtXv;
- ret &= ( newMemEstimate < OptimizerUtils.getLocalMemBudget() );
-
- //check for cost benefit of t(X) vs t(v) + t(tvX) and memory of additional transpose ops
- ret &= ( m*cd > (cd*n + m*n) &&
- 2 * OptimizerUtils.estimateSizeExactSparsity(cd, n, 1.0) < OptimizerUtils.getLocalMemBudget() &&
- 2 * OptimizerUtils.estimateSizeExactSparsity(m, n, 1.0) < OptimizerUtils.getLocalMemBudget() );
-
- //update operation memory estimate (e.g., for parfor optimizer)
- if( ret )
- _memEstimate = newMemEstimate;
- }
- }
- else //MR
- {
- if( h1 instanceof ReorgOp && ((ReorgOp)h1).getOp()==ReOrgOp.TRANSPOSE )
- {
- long m = h1.getDim1();
- long cd = h1.getDim2();
- long n = h2.getDim2();
-
-
- //note: output size constraint for mapmult already checked by optfindmmultmethod
- if( m>0 && cd>0 && n>0 && (m*cd > (cd*n + m*n)) &&
- 2 * OptimizerUtils.estimateSizeExactSparsity(cd, n, 1.0) < OptimizerUtils.getLocalMemBudget() &&
- 2 * OptimizerUtils.estimateSizeExactSparsity(m, n, 1.0) < OptimizerUtils.getLocalMemBudget() &&
- (!checkMemMR || OptimizerUtils.estimateSizeExactSparsity(cd, n, 1.0) < OptimizerUtils.getRemoteMemBudgetMap(true)) )
- {
- ret = true;
- }
- }
- }
-
- return ret;
- }
-
- /**
- *
- * @param X
- * @param Y
- * @return
- */
- private MMCJType getMMCJAggregationType(Hop X, Hop Y)
- {
- //choose quickpath (no aggregation) if the aggregation buffer likely has to spill and the smaller block fits
- //into the minimal cache size and hence is guaranteed not to require spilling
- double sizeX = OptimizerUtils.estimateSize(X.getDim1(), Math.min(X.getDim2(), X.getColsInBlock()));
- double sizeY = OptimizerUtils.estimateSize(Math.min(Y.getDim1(), Y.getRowsInBlock()), Y.getDim2());
-
- return (dimsKnown() && 2*OptimizerUtils.estimateSize(getDim1(), getDim2())>OptimizerUtils.getRemoteMemBudgetReduce()
- && ( sizeX < MMCJMRReducerWithAggregator.MIN_CACHE_SIZE
- || sizeY < MMCJMRReducerWithAggregator.MIN_CACHE_SIZE) )
- ? MMCJType.NO_AGG : MMCJType.AGG ;
- }
-
- /**
- *
- * @param agg
- * @return
- */
- private SparkAggType getSparkMMAggregationType( boolean agg )
- {
- if( !agg )
- return SparkAggType.NONE;
-
- if( dimsKnown() && getDim1()<=getRowsInBlock() && getDim2()<=getColsInBlock() )
- return SparkAggType.SINGLE_BLOCK;
- else
- return SparkAggType.MULTI_BLOCK;
- }
-
- /**
- *
- * @param method
- * @return
- */
- private boolean requiresAggregation(MMultMethod method)
- {
- //worst-case assumption (for plan correctness)
- boolean ret = true;
-
- //right side cached (no agg if left has just one column block)
- if( method == MMultMethod.MAPMM_R && getInput().get(0).getDim2() > 0 //known num columns
- && getInput().get(0).getDim2() <= getInput().get(0).getColsInBlock() )
- {
- ret = false;
- }
-
- //left side cached (no agg if right has just one row block)
- if( method == MMultMethod.MAPMM_L && getInput().get(1).getDim1() > 0 //known num rows
- && getInput().get(1).getDim1() <= getInput().get(1).getRowsInBlock() )
- {
- ret = false;
- }
-
- return ret;
- }
-
- /**
- *
- * @param method
- * @return
- */
- private boolean requiresPartitioning(MMultMethod method, boolean rewrite)
- {
- boolean ret = true; //worst-case
- Hop input1 = getInput().get(0);
- Hop input2 = getInput().get(1);
-
- //right side cached
- if( method == MMultMethod.MAPMM_R && input2.dimsKnown() ) { //known input size
- ret = (input2.getDim1()*input2.getDim2() > DistributedCacheInput.PARTITION_SIZE);
-
- //conservative: do not apply partitioning if this forces t(X) into separate job
- //if( !rewrite && input1 instanceof ReorgOp && ((ReorgOp)input1).getOp()==ReOrgOp.TRANSPOSE )
- // ret = false;
- }
-
- //left side cached (no agg if right has just one row block)
- if( method == MMultMethod.MAPMM_L && input1.dimsKnown() ) { //known input size
- ret = (input1.getDim1()*input1.getDim2() > DistributedCacheInput.PARTITION_SIZE);
-
- //conservative: do not apply partitioning if this forces t(X) into separate job
- //if( !rewrite && input2 instanceof ReorgOp && ((ReorgOp)input2).getOp()==ReOrgOp.TRANSPOSE )
- // ret = false;
- }
-
- return ret;
- }
-
- /**
- * Estimates the memory footprint of MapMult operation depending on which input is put into distributed cache.
- * This function is called by <code>optFindMMultMethod()</code> to decide the execution strategy, as well as by
- * piggybacking to decide the number of Map-side instructions to put into a single GMR job.
- */
- public static double getMapmmMemEstimate(long m1_rows, long m1_cols, long m1_rpb, long m1_cpb, long m1_nnz,
- long m2_rows, long m2_cols, long m2_rpb, long m2_cpb, long m2_nnz, int cachedInputIndex, boolean pmm)
- {
- // If the size of one input is small, choose a method that uses distributed cache
- // NOTE: be aware of output size because one input block might generate many output blocks
- double m1SizeP = OptimizerUtils.estimatePartitionedSizeExactSparsity(m1_rows, m1_cols, m1_rpb, m1_cpb, m1_nnz); //m1 partitioned
- double m2SizeP = OptimizerUtils.estimatePartitionedSizeExactSparsity(m2_rows, m2_cols, m2_rpb, m2_cpb, m2_nnz); //m2 partitioned
-
- double m1BlockSize = OptimizerUtils.estimateSize(Math.min(m1_rows, m1_rpb), Math.min(m1_cols, m1_cpb));
- double m2BlockSize = OptimizerUtils.estimateSize(Math.min(m2_rows, m2_rpb), Math.min(m2_cols, m2_cpb));
- double m3m1OutSize = OptimizerUtils.estimateSize(Math.min(m1_rows, m1_rpb), m2_cols); //output per m1 block if m2 in cache
- double m3m2OutSize = OptimizerUtils.estimateSize(m1_rows, Math.min(m2_cols, m2_cpb)); //output per m2 block if m1 in cache
-
- double footprint = 0;
- if( pmm )
- {
- //permutation matrix multiply
- //(one input block -> at most two output blocks)
- footprint = m1SizeP + 3*m2BlockSize; //in+2*out
- }
- else
- {
- //generic matrix multiply
- if ( cachedInputIndex == 1 ) {
- // left input (m1) is in cache
- footprint = m1SizeP+m2BlockSize+m3m2OutSize;
- }
- else {
- // right input (m2) is in cache
- footprint = m1BlockSize+m2SizeP+m3m1OutSize;
- }
- }
-
- return footprint;
- }
-
- /**
- * Optimization that chooses between two methods to perform matrix multiplication on map-reduce.
- *
- * More details on the cost-model used: refer ICDE 2011 paper.
- */
- private static MMultMethod optFindMMultMethodMR ( long m1_rows, long m1_cols, long m1_rpb, long m1_cpb, long m1_nnz,
- long m2_rows, long m2_cols, long m2_rpb, long m2_cpb, long m2_nnz,
- MMTSJType mmtsj, ChainType chainType, boolean leftPMInput )
- {
- double memBudget = MAPMULT_MEM_MULTIPLIER * OptimizerUtils.getRemoteMemBudgetMap(true);
-
- // Step 0: check for forced mmultmethod
- if( FORCED_MMULT_METHOD !=null )
- return FORCED_MMULT_METHOD;
-
- // Step 1: check TSMM
- // If transpose self pattern and result is single block:
- // use specialized TSMM method (always better than generic jobs)
- if( ( mmtsj == MMTSJType.LEFT && m2_cols>=0 && m2_cols <= m2_cpb )
- || ( mmtsj == MMTSJType.RIGHT && m1_rows>=0 && m1_rows <= m1_rpb ) )
- {
- return MMultMethod.TSMM;
- }
-
- // Step 2: check MapMultChain
- // If mapmultchain pattern and result is a single block:
- // use specialized mapmult method
- if( OptimizerUtils.ALLOW_SUM_PRODUCT_REWRITES )
- {
- //matmultchain if dim2(X)<=blocksize and all vectors fit in mappers
- //(X: m1_cols x m1_rows, v: m1_rows x m2_cols, w: m1_cols x m2_cols)
- //NOTE: generalization possibe: m2_cols>=0 && m2_cols<=m2_cpb
- if( chainType!=ChainType.NONE && m1_rows>=0 && m1_rows<= m1_rpb && m2_cols==1 )
- {
- if( chainType==ChainType.XtXv && m1_rows>=0 && m2_cols>=0
- && OptimizerUtils.estimateSize(m1_rows, m2_cols ) < memBudget )
- {
- return MMultMethod.MAPMM_CHAIN;
- }
- else if( chainType==ChainType.XtwXv && m1_rows>=0 && m2_cols>=0 && m1_cols>=0
- && OptimizerUtils.estimateSize(m1_rows, m2_cols )
- + OptimizerUtils.estimateSize(m1_cols, m2_cols) < memBudget )
- {
- return MMultMethod.MAPMM_CHAIN;
- }
- }
- }
-
- // Step 3: check for PMM (permutation matrix needs to fit into mapper memory)
- // (needs to be checked before mapmult for consistency with removeEmpty compilation
- double footprintPM1 = getMapmmMemEstimate(m1_rows, 1, m1_rpb, m1_cpb, m1_nnz, m2_rows, m2_cols, m2_rpb, m2_cpb, m2_nnz, 1, true);
- double footprintPM2 = getMapmmMemEstimate(m2_rows, 1, m1_rpb, m1_cpb, m1_nnz, m2_rows, m2_cols, m2_rpb, m2_cpb, m2_nnz, 1, true);
- if( (footprintPM1 < memBudget && m1_rows>=0 || footprintPM2 < memBudget && m2_rows>=0 )
- && leftPMInput )
- {
- return MMultMethod.PMM;
- }
-
- // Step 4: check MapMult
- // If the size of one input is small, choose a method that uses distributed cache
- // (with awareness of output size because one input block might generate many output blocks)
- //memory estimates for local partitioning (mb -> partitioned mb)
- double m1SizeP = OptimizerUtils.estimatePartitionedSizeExactSparsity(m1_rows, m1_cols, m1_rpb, m1_cpb, m1_nnz); //m1 partitioned
- double m2SizeP = OptimizerUtils.estimatePartitionedSizeExactSparsity(m2_rows, m2_cols, m2_rpb, m2_cpb, m2_nnz); //m2 partitioned
-
- //memory estimates for remote execution (broadcast and outputs)
- double footprint1 = getMapmmMemEstimate(m1_rows, m1_cols, m1_rpb, m1_cpb, m1_nnz, m2_rows, m2_cols, m2_rpb, m2_cpb, m2_nnz, 1, false);
- double footprint2 = getMapmmMemEstimate(m1_rows, m1_cols, m1_rpb, m1_cpb, m1_nnz, m2_rows, m2_cols, m2_rpb, m2_cpb, m2_nnz, 2, false);
-
- if ( (footprint1 < memBudget && m1_rows>=0 && m1_cols>=0)
- || (footprint2 < memBudget && m2_rows>=0 && m2_cols>=0) )
- {
- //apply map mult if one side fits in remote task memory
- //(if so pick smaller input for distributed cache)
- if( m1SizeP < m2SizeP && m1_rows>=0 && m1_cols>=0)
- return MMultMethod.MAPMM_L;
- else
- return MMultMethod.MAPMM_R;
- }
-
- // Step 5: check for unknowns
- // If the dimensions are unknown at compilation time, simply assume
- // the worst-case scenario and produce the most robust plan -- which is CPMM
- if ( m1_rows == -1 || m1_cols == -1 || m2_rows == -1 || m2_cols == -1 )
- return MMultMethod.CPMM;
-
- // Step 6: Decide CPMM vs RMM based on io costs
-
- //estimate shuffle costs weighted by parallelism
- double rmm_costs = getRMMCostEstimate(m1_rows, m1_cols, m1_rpb, m1_cpb, m2_rows, m2_cols, m2_rpb, m2_cpb);
- double cpmm_costs = getCPMMCostEstimate(m1_rows, m1_cols, m1_rpb, m1_cpb, m2_rows, m2_cols, m2_rpb, m2_cpb);
-
- //final mmult method decision
- if ( cpmm_costs < rmm_costs )
- return MMultMethod.CPMM;
- else
- return MMultMethod.RMM;
- }
-
- /**
- *
- * @param m1_rows
- * @param m1_cols
- * @param m2_rows
- * @param m2_cols
- * @param mmtsj
- * @param chainType
- * @return
- */
- private static MMultMethod optFindMMultMethodCP( long m1_rows, long m1_cols, long m2_rows, long m2_cols, MMTSJType mmtsj, ChainType chainType, boolean leftPM )
- {
- //step 1: check for TSMM pattern
- if( mmtsj != MMTSJType.NONE )
- return MMultMethod.TSMM;
-
- //step 2: check for MMChain pattern
- if( chainType != ChainType.NONE && OptimizerUtils.ALLOW_SUM_PRODUCT_REWRITES && m2_cols==1 )
- return MMultMethod.MAPMM_CHAIN;
-
- //step 3: check for PMM
- if( leftPM && m1_cols==1 && m2_rows!=1 )
- return MMultMethod.PMM;
-
- //step 4: general purpose MM
- return MMultMethod.MM;
- }
-
- /**
- *
- * @param m1_rows
- * @param m1_cols
- * @param m1_rpb
- * @param m1_cpb
- * @param m2_rows
- * @param m2_cols
- * @param m2_rpb
- * @param m2_cpb
- * @param mmtsj
- * @param chainType
- * @return
- */
- private MMultMethod optFindMMultMethodSpark( long m1_rows, long m1_cols, long m1_rpb, long m1_cpb, long m1_nnz,
- long m2_rows, long m2_cols, long m2_rpb, long m2_cpb, long m2_nnz,
- MMTSJType mmtsj, ChainType chainType, boolean leftPMInput, boolean tmmRewrite )
- {
- //Notes: Any broadcast needs to fit twice in local memory because we partition the input in cp,
- //and needs to fit once in executor broadcast memory. The 2GB broadcast constraint is no longer
- //required because the max_int byte buffer constraint has been fixed in Spark 1.4
- double memBudgetExec = MAPMULT_MEM_MULTIPLIER * SparkExecutionContext.getBroadcastMemoryBudget();
- double memBudgetLocal = OptimizerUtils.getLocalMemBudget();
-
- //reset spark broadcast memory information (for concurrent parfor jobs, awareness of additional
- //cp memory requirements on spark rdd operations with broadcasts)
- _spBroadcastMemEstimate = 0;
-
- // Step 0: check for forced mmultmethod
- if( FORCED_MMULT_METHOD !=null )
- return FORCED_MMULT_METHOD;
-
- // Step 1: check TSMM
- // If transpose self pattern and result is single block:
- // use specialized TSMM method (always better than generic jobs)
- if( ( mmtsj == MMTSJType.LEFT && m2_cols>=0 && m2_cols <= m2_cpb )
- || ( mmtsj == MMTSJType.RIGHT && m1_rows>=0 && m1_rows <= m1_rpb ) )
- {
- return MMultMethod.TSMM;
- }
-
- // Step 2: check MapMMChain
- // If mapmultchain pattern and result is a single block:
- // use specialized mapmult method
- if( OptimizerUtils.ALLOW_SUM_PRODUCT_REWRITES )
- {
- //matmultchain if dim2(X)<=blocksize and all vectors fit in mappers
- //(X: m1_cols x m1_rows, v: m1_rows x m2_cols, w: m1_cols x m2_cols)
- //NOTE: generalization possibe: m2_cols>=0 && m2_cols<=m2_cpb
- if( chainType!=ChainType.NONE && m1_rows >=0 && m1_rows <= m1_rpb && m2_cols==1 )
- {
- if( chainType==ChainType.XtXv && m1_rows>=0 && m2_cols>=0
- && OptimizerUtils.estimateSize(m1_rows, m2_cols ) < memBudgetExec )
- {
- return MMultMethod.MAPMM_CHAIN;
- }
- else if( chainType==ChainType.XtwXv && m1_rows>=0 && m2_cols>=0 && m1_cols>=0
- && OptimizerUtils.estimateSize(m1_rows, m2_cols)
- + OptimizerUtils.estimateSize(m1_cols, m2_cols) < memBudgetExec
- && 2*(OptimizerUtils.estimateSize(m1_rows, m2_cols)
- + OptimizerUtils.estimateSize(m1_cols, m2_cols)) < memBudgetLocal )
- {
- _spBroadcastMemEstimate = 2*(OptimizerUtils.estimateSize(m1_rows, m2_cols)
- + OptimizerUtils.estimateSize(m1_cols, m2_cols));
- return MMultMethod.MAPMM_CHAIN;
- }
- }
- }
-
- // Step 3: check for PMM (permutation matrix needs to fit into mapper memory)
- // (needs to be checked before mapmult for consistency with removeEmpty compilation
- double footprintPM1 = getMapmmMemEstimate(m1_rows, 1, m1_rpb, m1_cpb, m1_nnz, m2_rows, m2_cols, m2_rpb, m2_cpb, m2_nnz, 1, true);
- double footprintPM2 = getMapmmMemEstimate(m2_rows, 1, m1_rpb, m1_cpb, m1_nnz, m2_rows, m2_cols, m2_rpb, m2_cpb, m2_nnz, 1, true);
- if( (footprintPM1 < memBudgetExec && m1_rows>=0 || footprintPM2 < memBudgetExec && m2_rows>=0)
- && 2*OptimizerUtils.estimateSize(m1_rows, 1) < memBudgetLocal
- && leftPMInput )
- {
- _spBroadcastMemEstimate = 2*OptimizerUtils.estimateSize(m1_rows, 1);
- return MMultMethod.PMM;
- }
-
- // Step 4: check MapMM
- // If the size of one input is small, choose a method that uses broadcast variables to prevent shuffle
-
- //memory estimates for local partitioning (mb -> partitioned mb)
- double m1Size = OptimizerUtils.estimateSizeExactSparsity(m1_rows, m1_cols, m1_nnz); //m1 single block
- double m2Size = OptimizerUtils.estimateSizeExactSparsity(m2_rows, m2_cols, m2_nnz); //m2 single block
- double m1SizeP = OptimizerUtils.estimatePartitionedSizeExactSparsity(m1_rows, m1_cols, m1_rpb, m1_cpb, m1_nnz); //m1 partitioned
- double m2SizeP = OptimizerUtils.estimatePartitionedSizeExactSparsity(m2_rows, m2_cols, m2_rpb, m2_cpb, m2_nnz); //m2 partitioned
-
- //memory estimates for remote execution (broadcast and outputs)
- double footprint1 = getMapmmMemEstimate(m1_rows, m1_cols, m1_rpb, m1_cpb, m1_nnz, m2_rows, m2_cols, m2_rpb, m2_cpb, m2_nnz, 1, false);
- double footprint2 = getMapmmMemEstimate(m1_rows, m1_cols, m1_rpb, m1_cpb, m1_nnz, m2_rows, m2_cols, m2_rpb, m2_cpb, m2_nnz, 2, false);
-
- if ( (footprint1 < memBudgetExec && m1Size+m1SizeP < memBudgetLocal && m1_rows>=0 && m1_cols>=0)
- || (footprint2 < memBudgetExec && m2Size+m2SizeP < memBudgetLocal && m2_rows>=0 && m2_cols>=0) )
- {
- //apply map mult if one side fits in remote task memory
- //(if so pick smaller input for distributed cache)
- if( m1SizeP < m2SizeP && m1_rows>=0 && m1_cols>=0) {
- _spBroadcastMemEstimate = m1Size+m1SizeP;
- return MMultMethod.MAPMM_L;
- }
- else {
- _spBroadcastMemEstimate = m2Size+m2SizeP;
- return MMultMethod.MAPMM_R;
- }
- }
-
- // Step 5: check for unknowns
- // If the dimensions are unknown at compilation time, simply assume
- // the worst-case scenario and produce the most robust plan -- which is CPMM
- if ( m1_rows == -1 || m1_cols == -1 || m2_rows == -1 || m2_cols == -1 )
- return MMultMethod.CPMM;
-
- // Step 6: check for ZIPMM
- // If t(X)%*%y -> t(t(y)%*%X) rewrite and ncol(X)<blocksize
- if( tmmRewrite && m1_rows >= 0 && m1_rows <= m1_rpb //blocksize constraint left
- && m2_cols >= 0 && m2_cols <= m2_cpb ) //blocksize constraint right
- {
- return MMultMethod.ZIPMM;
- }
-
- // Step 7: Decide CPMM vs RMM based on io costs
- //estimate shuffle costs weighted by parallelism
- //TODO currently we reuse the mr estimates, these need to be fine-tune for our spark operators
- double rmm_costs = getRMMCostEstimate(m1_rows, m1_cols, m1_rpb, m1_cpb, m2_rows, m2_cols, m2_rpb, m2_cpb);
- double cpmm_costs = getCPMMCostEstimate(m1_rows, m1_cols, m1_rpb, m1_cpb, m2_rows, m2_cols, m2_rpb, m2_cpb);
-
- //final mmult method decision
- if ( cpmm_costs < rmm_costs )
- return MMultMethod.CPMM;
- else
- return MMultMethod.RMM;
- }
-
- /**
- *
- * @param m1_rows
- * @param m1_cols
- * @param m1_rpb
- * @param m1_cpb
- * @param m2_rows
- * @param m2_cols
- * @param m2_rpb
- * @param m2_cpb
- * @return
- */
- private static double getRMMCostEstimate( long m1_rows, long m1_cols, long m1_rpb, long m1_cpb,
- long m2_rows, long m2_cols, long m2_rpb, long m2_cpb )
- {
- long m1_nrb = (long) Math.ceil((double)m1_rows/m1_rpb); // number of row blocks in m1
- long m2_ncb = (long) Math.ceil((double)m2_cols/m2_cpb); // number of column blocks in m2
-
- // TODO: we must factor in the "sparsity"
- double m1_size = m1_rows * m1_cols;
- double m2_size = m2_rows * m2_cols;
- double result_size = m1_rows * m2_cols;
-
- int numReducersRMM = OptimizerUtils.getNumReducers(true);
-
- // Estimate the cost of RMM
- // RMM phase 1
- double rmm_shuffle = (m2_ncb*m1_size) + (m1_nrb*m2_size);
- double rmm_io = m1_size + m2_size + result_size;
- double rmm_nred = Math.min( m1_nrb * m2_ncb, //max used reducers
- numReducersRMM); //available reducers
- // RMM total costs
- double rmm_costs = (rmm_shuffle + rmm_io) / rmm_nred;
-
- // return total costs
- return rmm_costs;
- }
-
- /**
- *
- * @param m1_rows
- * @param m1_cols
- * @param m1_rpb
- * @param m1_cpb
- * @param m2_rows
- * @param m2_cols
- * @param m2_rpb
- * @param m2_cpb
- * @return
- */
- private static double getCPMMCostEstimate( long m1_rows, long m1_cols, long m1_rpb, long m1_cpb,
- long m2_rows, long m2_cols, long m2_rpb, long m2_cpb )
- {
- long m1_nrb = (long) Math.ceil((double)m1_rows/m1_rpb); // number of row blocks in m1
- long m1_ncb = (long) Math.ceil((double)m1_cols/m1_cpb); // number of column blocks in m1
- long m2_ncb = (long) Math.ceil((double)m2_cols/m2_cpb); // number of column blocks in m2
-
- // TODO: we must factor in the "sparsity"
- double m1_size = m1_rows * m1_cols;
- double m2_size = m2_rows * m2_cols;
- double result_size = m1_rows * m2_cols;
-
- int numReducersCPMM = OptimizerUtils.getNumReducers(false);
-
- // Estimate the cost of CPMM
- // CPMM phase 1
- double cpmm_shuffle1 = m1_size + m2_size;
- double cpmm_nred1 = Math.min( m1_ncb, //max used reducers
- numReducersCPMM); //available reducers
- double cpmm_io1 = m1_size + m2_size + cpmm_nred1 * result_size;
- // CPMM phase 2
- double cpmm_shuffle2 = cpmm_nred1 * result_size;
- double cpmm_io2 = cpmm_nred1 * result_size + result_size;
- double cpmm_nred2 = Math.min( m1_nrb * m2_ncb, //max used reducers
- numReducersCPMM); //available reducers
- // CPMM total costs
- double cpmm_costs = (cpmm_shuffle1+cpmm_io1)/cpmm_nred1 //cpmm phase1
- +(cpmm_shuffle2+cpmm_io2)/cpmm_nred2; //cpmm phase2
-
- //return total costs
- return cpmm_costs;
- }
-
- @Override
- public void refreshSizeInformation()
- {
- Hop input1 = getInput().get(0);
- Hop input2 = getInput().get(1);
-
- if( isMatrixMultiply() )
- {
- setDim1(input1.getDim1());
- setDim2(input2.getDim2());
- }
- }
-
- @Override
- public Object clone() throws CloneNotSupportedException
- {
- AggBinaryOp ret = new AggBinaryOp();
-
- //copy generic attributes
- ret.clone(this, false);
-
- //copy specific attributes
- ret.innerOp = innerOp;
- ret.outerOp = outerOp;
- ret._hasLeftPMInput = _hasLeftPMInput;
- ret._maxNumThreads = _maxNumThreads;
-
- return ret;
- }
-
- @Override
- public boolean compare( Hop that )
- {
- if( !(that instanceof AggBinaryOp) )
- return false;
-
- AggBinaryOp that2 = (AggBinaryOp)that;
- return ( innerOp == that2.innerOp
- && outerOp == that2.outerOp
- && getInput().get(0) == that2.getInput().get(0)
- && getInput().get(1) == that2.getInput().get(1)
- && _hasLeftPMInput == that2._hasLeftPMInput
- && _maxNumThreads == that2._maxNumThreads);
- }
-}