You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by lr...@apache.org on 2015/12/03 19:46:08 UTC
[37/78] [abbrv] [partial] incubator-systemml git commit: Move files
to new package folder structure
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/276d9257/src/main/java/com/ibm/bi/dml/hops/cost/CostEstimator.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/ibm/bi/dml/hops/cost/CostEstimator.java b/src/main/java/com/ibm/bi/dml/hops/cost/CostEstimator.java
deleted file mode 100644
index 1f6d947..0000000
--- a/src/main/java/com/ibm/bi/dml/hops/cost/CostEstimator.java
+++ /dev/null
@@ -1,787 +0,0 @@
-/**
- * (C) Copyright IBM Corp. 2010, 2015
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package com.ibm.bi.dml.hops.cost;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.StringTokenizer;
-import java.util.Map.Entry;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import com.ibm.bi.dml.hops.Hop;
-import com.ibm.bi.dml.hops.HopsException;
-import com.ibm.bi.dml.hops.recompile.Recompiler;
-import com.ibm.bi.dml.lops.Lop;
-import com.ibm.bi.dml.lops.LopsException;
-import com.ibm.bi.dml.parser.DMLProgram;
-import com.ibm.bi.dml.parser.DMLTranslator;
-import com.ibm.bi.dml.runtime.DMLRuntimeException;
-import com.ibm.bi.dml.runtime.DMLUnsupportedOperationException;
-import com.ibm.bi.dml.runtime.controlprogram.ExternalFunctionProgramBlock;
-import com.ibm.bi.dml.runtime.controlprogram.ForProgramBlock;
-import com.ibm.bi.dml.runtime.controlprogram.FunctionProgramBlock;
-import com.ibm.bi.dml.runtime.controlprogram.IfProgramBlock;
-import com.ibm.bi.dml.runtime.controlprogram.LocalVariableMap;
-import com.ibm.bi.dml.runtime.controlprogram.Program;
-import com.ibm.bi.dml.runtime.controlprogram.ProgramBlock;
-import com.ibm.bi.dml.runtime.controlprogram.WhileProgramBlock;
-import com.ibm.bi.dml.runtime.controlprogram.caching.MatrixObject;
-import com.ibm.bi.dml.runtime.instructions.Instruction;
-import com.ibm.bi.dml.runtime.instructions.InstructionUtils;
-import com.ibm.bi.dml.runtime.instructions.MRInstructionParser;
-import com.ibm.bi.dml.runtime.instructions.MRJobInstruction;
-import com.ibm.bi.dml.runtime.instructions.cp.AggregateTernaryCPInstruction;
-import com.ibm.bi.dml.runtime.instructions.cp.AggregateUnaryCPInstruction;
-import com.ibm.bi.dml.runtime.instructions.cp.BinaryCPInstruction;
-import com.ibm.bi.dml.runtime.instructions.cp.CPInstruction;
-import com.ibm.bi.dml.runtime.instructions.cp.Data;
-import com.ibm.bi.dml.runtime.instructions.cp.DataGenCPInstruction;
-import com.ibm.bi.dml.runtime.instructions.cp.FunctionCallCPInstruction;
-import com.ibm.bi.dml.runtime.instructions.cp.MMTSJCPInstruction;
-import com.ibm.bi.dml.runtime.instructions.cp.MultiReturnBuiltinCPInstruction;
-import com.ibm.bi.dml.runtime.instructions.cp.ParameterizedBuiltinCPInstruction;
-import com.ibm.bi.dml.runtime.instructions.cp.StringInitCPInstruction;
-import com.ibm.bi.dml.runtime.instructions.cp.UnaryCPInstruction;
-import com.ibm.bi.dml.runtime.instructions.cp.VariableCPInstruction;
-import com.ibm.bi.dml.runtime.instructions.mr.MRInstruction;
-import com.ibm.bi.dml.runtime.matrix.MatrixCharacteristics;
-import com.ibm.bi.dml.runtime.matrix.MatrixDimensionsMetaData;
-import com.ibm.bi.dml.runtime.matrix.operators.CMOperator;
-import com.ibm.bi.dml.runtime.matrix.operators.CMOperator.AggregateOperationTypes;
-import com.ibm.bi.dml.runtime.util.UtilFunctions;
-
-public abstract class CostEstimator
-{
-
- protected static final Log LOG = LogFactory.getLog(CostEstimator.class.getName());
-
- private static final int DEFAULT_NUMITER = 15;
-
- protected static final VarStats _unknownStats = new VarStats(1,1,-1,-1,-1,false);
- protected static final VarStats _scalarStats = new VarStats(1,1,1,1,1,true);
-
- /**
- *
- * @param rtprog
- * @return
- * @throws DMLRuntimeException
- * @throws DMLUnsupportedOperationException
- */
- public double getTimeEstimate(Program rtprog, LocalVariableMap vars, HashMap<String,VarStats> stats)
- throws DMLRuntimeException, DMLUnsupportedOperationException
- {
- double costs = 0;
-
- //obtain stats from symboltable (e.g., during recompile)
- maintainVariableStatistics(vars, stats);
-
- //get cost estimate
- for( ProgramBlock pb : rtprog.getProgramBlocks() )
- costs += rGetTimeEstimate(pb, stats, new HashSet<String>(), true);
-
- return costs;
- }
-
- /**
- *
- * @param pb
- * @param vars
- * @param stats
- * @return
- * @throws DMLRuntimeException
- * @throws DMLUnsupportedOperationException
- */
- public double getTimeEstimate(ProgramBlock pb, LocalVariableMap vars, HashMap<String,VarStats> stats, boolean recursive)
- throws DMLRuntimeException, DMLUnsupportedOperationException
- {
- //obtain stats from symboltable (e.g., during recompile)
- maintainVariableStatistics(vars, stats);
-
- //get cost estimate
- return rGetTimeEstimate(pb, stats, new HashSet<String>(), recursive);
- }
-
-
- /**
- *
- * @param hops
- * @param vars
- * @return
- * @throws DMLUnsupportedOperationException
- * @throws DMLRuntimeException
- * @throws IOException
- * @throws LopsException
- * @throws HopsException
- */
- public double getTimeEstimate( ArrayList<Hop> hops, LocalVariableMap vars, HashMap<String,VarStats> stats )
- throws DMLRuntimeException, DMLUnsupportedOperationException, HopsException, LopsException, IOException
- {
- double costs = 0;
-
- ArrayList<Instruction> linst = Recompiler.recompileHopsDag(null, hops, vars, null, false, 0);
- ProgramBlock pb = new ProgramBlock(null);
- pb.setInstructions(linst);
-
- //obtain stats from symboltable (e.g., during recompile)
- maintainVariableStatistics(vars, stats);
-
- //get cost estimate
- costs = rGetTimeEstimate(pb, stats, new HashSet<String>(), true);
-
- return costs;
- }
-
- /**
- *
- * @param pb
- * @param vars
- * @param stats
- * @return
- * @throws DMLRuntimeException
- * @throws DMLUnsupportedOperationException
- */
- private double rGetTimeEstimate(ProgramBlock pb, HashMap<String,VarStats> stats, HashSet<String> memoFunc, boolean recursive)
- throws DMLRuntimeException, DMLUnsupportedOperationException
- {
- double ret = 0;
-
- if (pb instanceof WhileProgramBlock)
- {
- WhileProgramBlock tmp = (WhileProgramBlock)pb;
- if( recursive )
- for (ProgramBlock pb2 : tmp.getChildBlocks())
- ret += rGetTimeEstimate(pb2, stats, memoFunc, recursive);
- ret *= DEFAULT_NUMITER;
- }
- else if (pb instanceof IfProgramBlock)
- {
- IfProgramBlock tmp = (IfProgramBlock)pb;
- if( recursive ) {
- for( ProgramBlock pb2 : tmp.getChildBlocksIfBody() )
- ret += rGetTimeEstimate(pb2, stats, memoFunc, recursive);
- if( tmp.getChildBlocksElseBody()!=null )
- for( ProgramBlock pb2 : tmp.getChildBlocksElseBody() ){
- ret += rGetTimeEstimate(pb2, stats, memoFunc, recursive);
- ret /= 2; //weighted sum
- }
- }
- }
- else if (pb instanceof ForProgramBlock) //includes ParFORProgramBlock
- {
- ForProgramBlock tmp = (ForProgramBlock)pb;
- if( recursive )
- for( ProgramBlock pb2 : tmp.getChildBlocks() )
- ret += rGetTimeEstimate(pb2, stats, memoFunc, recursive);
-
- ret *= getNumIterations(stats, tmp.getIterablePredicateVars());
- }
- else if ( pb instanceof FunctionProgramBlock
- && !(pb instanceof ExternalFunctionProgramBlock)) //see generic
- {
- FunctionProgramBlock tmp = (FunctionProgramBlock) pb;
- if( recursive )
- for( ProgramBlock pb2 : tmp.getChildBlocks() )
- ret += rGetTimeEstimate(pb2, stats, memoFunc, recursive);
- }
- else
- {
- ArrayList<Instruction> tmp = pb.getInstructions();
-
- for( Instruction inst : tmp )
- {
- if( inst instanceof CPInstruction ) //CP
- {
- //obtain stats from createvar, cpvar, rmvar, rand
- maintainCPInstVariableStatistics((CPInstruction)inst, stats);
-
- //extract statistics (instruction-specific)
- Object[] o = extractCPInstStatistics(inst, stats);
- VarStats[] vs = (VarStats[]) o[0];
- String[] attr = (String[]) o[1];
-
- //if(LOG.isDebugEnabled())
- // LOG.debug(inst);
-
- //call time estimation for inst
- ret += getCPInstTimeEstimate(inst, vs, attr);
-
- if( inst instanceof FunctionCallCPInstruction ) //functions
- {
- FunctionCallCPInstruction finst = (FunctionCallCPInstruction)inst;
- String fkey = DMLProgram.constructFunctionKey(finst.getNamespace(), finst.getFunctionName());
- //awareness of recursive functions, missing program
- if( !memoFunc.contains(fkey) && pb.getProgram()!=null )
- {
- if(LOG.isDebugEnabled())
- LOG.debug("Begin Function "+fkey);
-
- memoFunc.add(fkey);
- Program prog = pb.getProgram();
- FunctionProgramBlock fpb = prog.getFunctionProgramBlock(
- finst.getNamespace(), finst.getFunctionName());
- ret += rGetTimeEstimate(fpb, stats, memoFunc, recursive);
- memoFunc.remove(fkey);
-
- if(LOG.isDebugEnabled())
- LOG.debug("End Function "+fkey);
- }
- }
- }
- else if(inst instanceof MRJobInstruction) //MR
- {
- //obtain stats for job
- maintainMRJobInstVariableStatistics(inst, stats);
-
- //extract input statistics
- Object[] o = extractMRJobInstStatistics(inst, stats);
- VarStats[] vs = (VarStats[]) o[0];
-
- //if(LOG.isDebugEnabled())
- // LOG.debug(inst);
-
- if(LOG.isDebugEnabled())
- LOG.debug("Begin MRJob type="+((MRJobInstruction)inst).getJobType());
-
- //call time estimation for complex MR inst
- ret += getMRJobInstTimeEstimate(inst, vs, null);
-
- if(LOG.isDebugEnabled())
- LOG.debug("End MRJob");
-
- //cleanup stats for job
- cleanupMRJobVariableStatistics(inst, stats);
- }
- }
- }
-
- return ret;
- }
-
-
- /**
- *
- * @param vars
- * @param stats
- * @throws DMLRuntimeException
- */
- private void maintainVariableStatistics( LocalVariableMap vars, HashMap<String, VarStats> stats )
- throws DMLRuntimeException
- {
- for( String varname : vars.keySet() )
- {
- Data dat = vars.get(varname);
- VarStats vs = null;
- if( dat instanceof MatrixObject ) //matrix
- {
- MatrixObject mo = (MatrixObject) dat;
- MatrixCharacteristics mc = ((MatrixDimensionsMetaData)mo.getMetaData()).getMatrixCharacteristics();
- long rlen = mc.getRows();
- long clen = mc.getCols();
- long brlen = mc.getRowsPerBlock();
- long bclen = mc.getColsPerBlock();
- long nnz = mc.getNonZeros();
- boolean inmem = mo.getStatusAsString().equals("CACHED");
- vs = new VarStats(rlen, clen, brlen, bclen, nnz, inmem);
- }
- else //scalar
- {
- vs = _scalarStats;
- }
-
- stats.put(varname, vs);
- //System.out.println(varname+" "+vs);
- }
- }
-
- /**
- *
- * @param inst
- * @param stats
- */
- private void maintainCPInstVariableStatistics( CPInstruction inst, HashMap<String, VarStats> stats )
- {
- if( inst instanceof VariableCPInstruction )
- {
- String optype = inst.getOpcode();
- String[] parts = InstructionUtils.getInstructionParts(inst.toString());
-
- if( optype.equals("createvar") ) {
- if( parts.length<10 )
- return;
- String varname = parts[1];
- long rlen = Long.parseLong(parts[5]);
- long clen = Long.parseLong(parts[6]);
- long brlen = Long.parseLong(parts[7]);
- long bclen = Long.parseLong(parts[8]);
- long nnz = Long.parseLong(parts[9]);
- VarStats vs = new VarStats(rlen, clen, brlen, bclen, nnz, false);
- stats.put(varname, vs);
-
- //System.out.println(varname+" "+vs);
- }
- else if ( optype.equals("cpvar") ) {
- String varname = parts[1];
- String varname2 = parts[2];
- VarStats vs = stats.get(varname);
- stats.put(varname2, vs);
- }
- else if ( optype.equals("mvvar") ) {
- String varname = parts[1];
- String varname2 = parts[2];
- VarStats vs = stats.remove(varname);
- stats.put(varname2, vs);
- }
- else if( optype.equals("rmvar") ) {
- String varname = parts[1];
- stats.remove(varname);
- }
- }
- else if( inst instanceof DataGenCPInstruction ){
- DataGenCPInstruction randInst = (DataGenCPInstruction) inst;
- String varname = randInst.output.getName();
- long rlen = randInst.getRows();
- long clen = randInst.getCols();
- long brlen = randInst.getRowsInBlock();
- long bclen = randInst.getColsInBlock();
- long nnz = (long) (randInst.getSparsity() * rlen * clen);
- VarStats vs = new VarStats(rlen, clen, brlen, bclen, nnz, true);
- stats.put(varname, vs);
- }
- else if( inst instanceof StringInitCPInstruction ){
- StringInitCPInstruction iinst = (StringInitCPInstruction) inst;
- String varname = iinst.output.getName();
- long rlen = iinst.getRows();
- long clen = iinst.getCols();
- VarStats vs = new VarStats(rlen, clen, DMLTranslator.DMLBlockSize, DMLTranslator.DMLBlockSize, rlen*clen, true);
- stats.put(varname, vs);
- }
- else if( inst instanceof FunctionCallCPInstruction )
- {
- FunctionCallCPInstruction finst = (FunctionCallCPInstruction) inst;
- ArrayList<String> outVars = finst.getBoundOutputParamNames();
- for( String varname : outVars )
- {
- stats.put(varname, _unknownStats);
- //System.out.println(varname+" "+vs);
- }
- }
- }
-
- /**
- *
- * @param inst
- * @param stats
- * @throws DMLRuntimeException
- * @throws DMLUnsupportedOperationException
- */
- private void maintainMRJobInstVariableStatistics( Instruction inst, HashMap<String, VarStats> stats )
- throws DMLRuntimeException, DMLUnsupportedOperationException
- {
- MRJobInstruction jobinst = (MRJobInstruction)inst;
-
- //input sizes (varname, index mapping)
- String[] inVars = jobinst.getInputVars();
- int index = -1;
- for( String varname : inVars )
- {
- VarStats vs = stats.get(varname);
- if( vs==null )
- vs = _unknownStats;
- stats.put(String.valueOf(++index), vs);
- }
-
- //rand output
- String rdInst = jobinst.getIv_randInstructions();
- if( rdInst != null && rdInst.length()>0 )
- {
- StringTokenizer st = new StringTokenizer(rdInst,Lop.INSTRUCTION_DELIMITOR);
- while( st.hasMoreTokens() ) //foreach rand instruction
- {
- String[] parts = InstructionUtils.getInstructionParts(st.nextToken());
- byte outIndex = Byte.parseByte(parts[2]);
- long rlen = parts[3].contains("##")?-1:UtilFunctions.parseToLong(parts[3]);
- long clen = parts[4].contains("##")?-1:UtilFunctions.parseToLong(parts[4]);
- long brlen = Long.parseLong(parts[5]);
- long bclen = Long.parseLong(parts[6]);
- long nnz = (long) (Double.parseDouble(parts[9]) * rlen * clen);
- VarStats vs = new VarStats(rlen, clen, brlen, bclen, nnz, false);
- stats.put(String.valueOf(outIndex), vs);
- }
- }
-
- //compute intermediate result indices
- HashMap<Byte,MatrixCharacteristics> dims = new HashMap<Byte, MatrixCharacteristics>();
- //populate input indices
- for( Entry<String,VarStats> e : stats.entrySet() )
- {
- if(UtilFunctions.isIntegerNumber(e.getKey()))
- {
- byte ix = Byte.parseByte(e.getKey());
- VarStats vs = e.getValue();
- if( vs !=null )
- {
- MatrixCharacteristics mc = new MatrixCharacteristics(vs._rlen, vs._clen, (int)vs._brlen, (int)vs._bclen, (long)vs._nnz);
- dims.put(ix, mc);
- }
- }
- }
- //compute dims for all instructions
- String[] instCat = new String[]{
- jobinst.getIv_randInstructions(),
- jobinst.getIv_recordReaderInstructions(),
- jobinst.getIv_instructionsInMapper(),
- jobinst.getIv_shuffleInstructions(),
- jobinst.getIv_aggInstructions(),
- jobinst.getIv_otherInstructions()};
- for( String linstCat : instCat )
- if( linstCat !=null && linstCat.length()>0 )
- {
- String[] linst = linstCat.split(Instruction.INSTRUCTION_DELIM);
- for( String instStr : linst )
- {
- String instStr2 = replaceInstructionPatch(instStr);
- MRInstruction mrinst = MRInstructionParser.parseSingleInstruction(instStr2);
- MatrixCharacteristics.computeDimension(dims, mrinst);
- }
- }
-
- //create varstats if necessary
- for( Entry<Byte,MatrixCharacteristics> e : dims.entrySet() )
- {
- byte ix = e.getKey();
- if( !stats.containsKey(String.valueOf(ix)) )
- {
- MatrixCharacteristics mc = e.getValue();
- VarStats vs = new VarStats(mc.getRows(), mc.getCols(), mc.getRowsPerBlock(), mc.getColsPerBlock(), mc.getNonZeros(), false);
- stats.put(String.valueOf(ix), vs);
- }
- }
-
- //map result indexes
- String[] outLabels = jobinst.getOutputVars();
- byte[] resultIndexes = jobinst.getIv_resultIndices();
- for( int i=0; i<resultIndexes.length; i++ )
- {
- String varname = outLabels[i];
- VarStats varvs = stats.get(String.valueOf(resultIndexes[i]));
- if( varvs==null )
- {
- varvs = stats.get(outLabels[i]);
- }
- varvs._inmem = false;
- stats.put(varname, varvs);
- }
- }
-
- /**
- *
- * @param inst
- * @return
- */
- protected String replaceInstructionPatch( String inst )
- {
- String ret = inst;
- while( ret.contains("##") ) {
- int index1 = ret.indexOf("##");
- int index2 = ret.indexOf("##", index1+3);
- String replace = ret.substring(index1,index2+2);
- ret = ret.replaceAll(replace, "1");
- }
-
- return ret;
- }
-
- /**
- *
- * @param inst
- * @param stats
- * @return
- */
- private Object[] extractCPInstStatistics( Instruction inst, HashMap<String, VarStats> stats )
- {
- Object[] ret = new Object[2]; //stats, attrs
- VarStats[] vs = new VarStats[3];
- String[] attr = null;
-
- if( inst instanceof UnaryCPInstruction )
- {
- if( inst instanceof DataGenCPInstruction )
- {
- DataGenCPInstruction rinst = (DataGenCPInstruction) inst;
- vs[0] = _unknownStats;
- vs[1] = _unknownStats;
- vs[2] = stats.get( rinst.output.getName() );
-
- //prepare attributes for cost estimation
- int type = 2; //full rand
- if( rinst.getMinValue() == 0.0 && rinst.getMaxValue() == 0.0 )
- type = 0;
- else if( rinst.getSparsity() == 1.0 && rinst.getMinValue() == rinst.getMaxValue() )
- type = 1;
- attr = new String[]{String.valueOf(type)};
- }
- else if( inst instanceof StringInitCPInstruction )
- {
- StringInitCPInstruction rinst = (StringInitCPInstruction) inst;
- vs[0] = _unknownStats;
- vs[1] = _unknownStats;
- vs[2] = stats.get( rinst.output.getName() );
- }
- else //general unary
- {
- UnaryCPInstruction uinst = (UnaryCPInstruction) inst;
- vs[0] = stats.get( uinst.input1.getName() );
- vs[1] = _unknownStats;
- vs[2] = stats.get( uinst.output.getName() );
-
- if( vs[0] == null ) //scalar input, e.g., print
- vs[0] = _scalarStats;
- if( vs[2] == null ) //scalar output
- vs[2] = _scalarStats;
-
- if( inst instanceof MMTSJCPInstruction )
- {
- String type = ((MMTSJCPInstruction)inst).getMMTSJType().toString();
- attr = new String[]{type};
- }
- else if( inst instanceof AggregateUnaryCPInstruction )
- {
- String[] parts = InstructionUtils.getInstructionParts(inst.toString());
- String opcode = parts[0];
- if( opcode.equals("cm") )
- attr = new String[]{parts[parts.length-2]};
- }
- }
- }
- else if( inst instanceof BinaryCPInstruction )
- {
- BinaryCPInstruction binst = (BinaryCPInstruction) inst;
- vs[0] = stats.get( binst.input1.getName() );
- vs[1] = stats.get( binst.input2.getName() );
- vs[2] = stats.get( binst.output.getName() );
-
-
- if( vs[0] == null ) //scalar input,
- vs[0] = _scalarStats;
- if( vs[1] == null ) //scalar input,
- vs[1] = _scalarStats;
- if( vs[2] == null ) //scalar output
- vs[2] = _scalarStats;
- }
- else if( inst instanceof AggregateTernaryCPInstruction )
- {
- AggregateTernaryCPInstruction binst = (AggregateTernaryCPInstruction) inst;
- //of same dimension anyway but missing third input
- vs[0] = stats.get( binst.input1.getName() );
- vs[1] = stats.get( binst.input2.getName() );
- vs[2] = stats.get( binst.output.getName() );
-
- if( vs[0] == null ) //scalar input,
- vs[0] = _scalarStats;
- if( vs[1] == null ) //scalar input,
- vs[1] = _scalarStats;
- if( vs[2] == null ) //scalar output
- vs[2] = _scalarStats;
- }
- else if( inst instanceof ParameterizedBuiltinCPInstruction )
- {
- //ParameterizedBuiltinCPInstruction pinst = (ParameterizedBuiltinCPInstruction) inst;
- String[] parts = InstructionUtils.getInstructionParts(inst.toString());
- String opcode = parts[0];
- if( opcode.equals("groupedagg") )
- {
- HashMap<String,String> paramsMap = ParameterizedBuiltinCPInstruction.constructParameterMap(parts);
- String fn = paramsMap.get("fn");
- String order = paramsMap.get("order");
- AggregateOperationTypes type = CMOperator.getAggOpType(fn, order);
- attr = new String[]{String.valueOf(type.ordinal())};
- }
- else if( opcode.equals("rmempty") )
- {
- HashMap<String,String> paramsMap = ParameterizedBuiltinCPInstruction.constructParameterMap(parts);
- attr = new String[]{String.valueOf(paramsMap.get("margin").equals("rows")?0:1)};
- }
-
- vs[0] = stats.get( parts[1].substring(7).replaceAll("##", "") );
- vs[1] = _unknownStats; //TODO
- vs[2] = stats.get( parts[parts.length-1] );
-
- if( vs[0] == null ) //scalar input
- vs[0] = _scalarStats;
- if( vs[2] == null ) //scalar output
- vs[2] = _scalarStats;
- }
- else if( inst instanceof MultiReturnBuiltinCPInstruction )
- {
- //applies to qr, lu, eigen (cost computation on input1)
- MultiReturnBuiltinCPInstruction minst = (MultiReturnBuiltinCPInstruction) inst;
- vs[0] = stats.get( minst.input1.getName() );
- vs[1] = stats.get( minst.getOutput(0).getName() );
- vs[2] = stats.get( minst.getOutput(1).getName() );
- }
- else if( inst instanceof VariableCPInstruction )
- {
- setUnknownStats(vs);
-
- VariableCPInstruction varinst = (VariableCPInstruction) inst;
- if( varinst.getOpcode().equals("write") ) {
- //special handling write of matrix objects (non existing if scalar)
- if( stats.containsKey( varinst.getInput1().getName() ) )
- vs[0] = stats.get( varinst.getInput1().getName() );
- attr = new String[]{varinst.getInput3().getName()};
- }
- }
- else
- {
- setUnknownStats(vs);
- }
-
- //maintain var status (CP output always inmem)
- vs[2]._inmem = true;
-
- ret[0] = vs;
- ret[1] = attr;
-
- return ret;
- }
-
- private void setUnknownStats(VarStats[] vs) {
- vs[0] = _unknownStats;
- vs[1] = _unknownStats;
- vs[2] = _unknownStats;
- }
-
- /**
- *
- * @param inst
- * @param stats
- * @return
- */
- private Object[] extractMRJobInstStatistics( Instruction inst, HashMap<String, VarStats> stats )
- {
- Object[] ret = new Object[2]; //stats, attrs
- VarStats[] vs = null;
- String[] attr = null;
-
- MRJobInstruction jinst = (MRJobInstruction)inst;
-
- //get number of indices
- byte[] indexes = jinst.getIv_resultIndices();
- byte maxIx = -1;
- for( int i=0; i<indexes.length; i++ )
- if( maxIx < indexes[i] )
- maxIx = indexes[i];
-
- vs = new VarStats[ maxIx+1 ];
-
- //get inputs, intermediates, and outputs
- for( int i=0; i<vs.length; i++ ){
- vs[i] = stats.get(String.valueOf(i));
- if( vs[i]==null )
- {
- vs[i] = _unknownStats;
- }
- }
-
- //result preparation
- ret[0] = vs;
- ret[1] = attr;
-
- return ret;
- }
-
- /**
- *
- * @param inst
- * @param stats
- */
- private void cleanupMRJobVariableStatistics( Instruction inst, HashMap<String, VarStats> stats )
- {
- MRJobInstruction jinst = (MRJobInstruction)inst;
-
- //get number of indices
- byte[] indexes = jinst.getIv_resultIndices();
- byte maxIx = -1;
- for( int i=0; i<indexes.length; i++ )
- if( maxIx < indexes[i] )
- maxIx = indexes[i];
-
- //remove all stats up to max index
- for( int i=0; i<=maxIx; i++ )
- {
- VarStats tmp = stats.remove(String.valueOf(i));
- if( tmp!=null )
- tmp._inmem = false; //all MR job outptus on HDFS
- }
- }
-
- /**
- * TODO use of vars - needed for recompilation
- * TODO w/o exception
- *
- * @param vars
- * @param stats
- * @param pred
- * @return
- */
- private int getNumIterations(HashMap<String,VarStats> stats, String[] pred)
- {
- int N = DEFAULT_NUMITER;
- if( pred != null && pred[1]!=null && pred[2]!=null && pred[3]!=null )
- {
- try
- {
- int from = Integer.parseInt(pred[1]);
- int to = Integer.parseInt(pred[2]);
- int increment = Integer.parseInt(pred[3]);
- N = (int)Math.ceil(((double)(to-from+1))/increment);
- }
- catch(Exception ex){}
- }
- return N;
- }
-
-
- /**
- *
- * @param inst
- * @param vs
- * @param args
- * @return
- * @throws DMLRuntimeException
- * @throws DMLUnsupportedOperationException
- */
- protected abstract double getCPInstTimeEstimate( Instruction inst, VarStats[] vs, String[] args )
- throws DMLRuntimeException, DMLUnsupportedOperationException;
-
- /**
- *
- * @param inst
- * @param vs
- * @param args
- * @return
- * @throws DMLRuntimeException
- * @throws DMLUnsupportedOperationException
- */
- protected abstract double getMRJobInstTimeEstimate( Instruction inst, VarStats[] vs, String[] args )
- throws DMLRuntimeException, DMLUnsupportedOperationException;
-}
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/276d9257/src/main/java/com/ibm/bi/dml/hops/cost/CostEstimatorNumMRJobs.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/ibm/bi/dml/hops/cost/CostEstimatorNumMRJobs.java b/src/main/java/com/ibm/bi/dml/hops/cost/CostEstimatorNumMRJobs.java
deleted file mode 100644
index 4c9e78f..0000000
--- a/src/main/java/com/ibm/bi/dml/hops/cost/CostEstimatorNumMRJobs.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/**
- * (C) Copyright IBM Corp. 2010, 2015
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package com.ibm.bi.dml.hops.cost;
-
-import com.ibm.bi.dml.runtime.DMLRuntimeException;
-import com.ibm.bi.dml.runtime.DMLUnsupportedOperationException;
-import com.ibm.bi.dml.runtime.instructions.Instruction;
-
-/**
- *
- */
-public class CostEstimatorNumMRJobs extends CostEstimator
-{
-
-
- @Override
- protected double getCPInstTimeEstimate( Instruction inst, VarStats[] vs, String[] args )
- throws DMLRuntimeException, DMLUnsupportedOperationException
- {
- return 0;
- }
-
- @Override
- protected double getMRJobInstTimeEstimate( Instruction inst, VarStats[] vs, String[] args )
- throws DMLRuntimeException, DMLUnsupportedOperationException
- {
- return 1;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/276d9257/src/main/java/com/ibm/bi/dml/hops/cost/CostEstimatorStaticRuntime.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/ibm/bi/dml/hops/cost/CostEstimatorStaticRuntime.java b/src/main/java/com/ibm/bi/dml/hops/cost/CostEstimatorStaticRuntime.java
deleted file mode 100644
index 67d9cb2..0000000
--- a/src/main/java/com/ibm/bi/dml/hops/cost/CostEstimatorStaticRuntime.java
+++ /dev/null
@@ -1,1317 +0,0 @@
-/**
- * (C) Copyright IBM Corp. 2010, 2015
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package com.ibm.bi.dml.hops.cost;
-
-import java.util.ArrayList;
-import java.util.HashSet;
-
-import com.ibm.bi.dml.conf.ConfigurationManager;
-import com.ibm.bi.dml.conf.DMLConfig;
-import com.ibm.bi.dml.lops.DataGen;
-import com.ibm.bi.dml.lops.Lop;
-import com.ibm.bi.dml.lops.MapMult;
-import com.ibm.bi.dml.lops.LopProperties.ExecType;
-import com.ibm.bi.dml.lops.MMTSJ.MMTSJType;
-import com.ibm.bi.dml.lops.compile.JobType;
-import com.ibm.bi.dml.parser.DMLTranslator;
-import com.ibm.bi.dml.runtime.DMLRuntimeException;
-import com.ibm.bi.dml.runtime.DMLUnsupportedOperationException;
-import com.ibm.bi.dml.runtime.controlprogram.caching.CacheableData;
-import com.ibm.bi.dml.runtime.controlprogram.caching.LazyWriteBuffer;
-import com.ibm.bi.dml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
-import com.ibm.bi.dml.runtime.instructions.CPInstructionParser;
-import com.ibm.bi.dml.runtime.instructions.Instruction;
-import com.ibm.bi.dml.runtime.instructions.InstructionUtils;
-import com.ibm.bi.dml.runtime.instructions.MRInstructionParser;
-import com.ibm.bi.dml.runtime.instructions.MRJobInstruction;
-import com.ibm.bi.dml.runtime.instructions.cp.CPInstruction;
-import com.ibm.bi.dml.runtime.instructions.cp.CPInstruction.CPINSTRUCTION_TYPE;
-import com.ibm.bi.dml.runtime.instructions.cp.FunctionCallCPInstruction;
-import com.ibm.bi.dml.runtime.instructions.cp.VariableCPInstruction;
-import com.ibm.bi.dml.runtime.instructions.mr.BinaryMRInstructionBase;
-import com.ibm.bi.dml.runtime.instructions.mr.CM_N_COVInstruction;
-import com.ibm.bi.dml.runtime.instructions.mr.DataGenMRInstruction;
-import com.ibm.bi.dml.runtime.instructions.mr.GroupedAggregateInstruction;
-import com.ibm.bi.dml.runtime.instructions.mr.IDistributedCacheConsumer;
-import com.ibm.bi.dml.runtime.instructions.mr.MMTSJMRInstruction;
-import com.ibm.bi.dml.runtime.instructions.mr.MRInstruction;
-import com.ibm.bi.dml.runtime.instructions.mr.MapMultChainInstruction;
-import com.ibm.bi.dml.runtime.instructions.mr.PickByCountInstruction;
-import com.ibm.bi.dml.runtime.instructions.mr.RemoveEmptyMRInstruction;
-import com.ibm.bi.dml.runtime.instructions.mr.TernaryInstruction;
-import com.ibm.bi.dml.runtime.instructions.mr.UnaryMRInstructionBase;
-import com.ibm.bi.dml.runtime.instructions.mr.MRInstruction.MRINSTRUCTION_TYPE;
-import com.ibm.bi.dml.runtime.matrix.data.MatrixBlock;
-import com.ibm.bi.dml.runtime.matrix.mapred.DistributedCacheInput;
-import com.ibm.bi.dml.runtime.matrix.operators.CMOperator;
-import com.ibm.bi.dml.runtime.matrix.operators.CMOperator.AggregateOperationTypes;
-import com.ibm.bi.dml.yarn.ropt.MRJobResourceInstruction;
-import com.ibm.bi.dml.yarn.ropt.YarnClusterAnalyzer;
-
-/**
- *
- */
-public class CostEstimatorStaticRuntime extends CostEstimator
-{
-
- //time-conversion
- private static final long DEFAULT_FLOPS = 2L * 1024 * 1024 * 1024; //2GFLOPS
- //private static final long UNKNOWN_TIME = -1;
-
- //floating point operations
- private static final double DEFAULT_NFLOP_NOOP = 10;
- private static final double DEFAULT_NFLOP_UNKNOWN = 1;
- private static final double DEFAULT_NFLOP_CP = 1;
- private static final double DEFAULT_NFLOP_TEXT_IO = 350;
-
- //MR job latency
- private static final double DEFAULT_MR_JOB_LATENCY_LOCAL = 2;
- private static final double DEFAULT_MR_JOB_LATENCY_REMOTE = 20;
- private static final double DEFAULT_MR_TASK_LATENCY_LOCAL = 0.001;
- private static final double DEFAULT_MR_TASK_LATENCY_REMOTE = 1.5;
-
- //IO READ throughput
- private static final double DEFAULT_MBS_FSREAD_BINARYBLOCK_DENSE = 200;
- private static final double DEFAULT_MBS_FSREAD_BINARYBLOCK_SPARSE = 100;
- private static final double DEFAULT_MBS_HDFSREAD_BINARYBLOCK_DENSE = 150;
- private static final double DEFAULT_MBS_HDFSREAD_BINARYBLOCK_SPARSE = 75;
- //IO WRITE throughput
- private static final double DEFAULT_MBS_FSWRITE_BINARYBLOCK_DENSE = 150;
- private static final double DEFAULT_MBS_FSWRITE_BINARYBLOCK_SPARSE = 75;
- private static final double DEFAULT_MBS_HDFSWRITE_BINARYBLOCK_DENSE = 120;
- private static final double DEFAULT_MBS_HDFSWRITE_BINARYBLOCK_SPARSE = 60;
- private static final double DEFAULT_MBS_HDFSWRITE_TEXT_DENSE = 40;
- private static final double DEFAULT_MBS_HDFSWRITE_TEXT_SPARSE = 30;
-
- @Override
- @SuppressWarnings("unused")
- protected double getCPInstTimeEstimate( Instruction inst, VarStats[] vs, String[] args )
- throws DMLRuntimeException, DMLUnsupportedOperationException
- {
- CPInstruction cpinst = (CPInstruction)inst;
-
- //load time into mem
- double ltime = 0;
- if( !vs[0]._inmem ){
- ltime += getHDFSReadTime( vs[0]._rlen, vs[0]._clen, vs[0].getSparsity() );
- //eviction costs
- if( CacheableData.CACHING_WRITE_CACHE_ON_READ &&
- LazyWriteBuffer.getWriteBufferSize()<MatrixBlock.estimateSizeOnDisk(vs[0]._rlen, vs[0]._clen, (long)((vs[0]._nnz<0)? vs[0]._rlen*vs[0]._clen:vs[0]._nnz)) )
- {
- ltime += Math.abs( getFSWriteTime( vs[0]._rlen, vs[0]._clen, vs[0].getSparsity() ));
- }
- vs[0]._inmem = true;
- }
- if( !vs[1]._inmem ){
- ltime += getHDFSReadTime( vs[1]._rlen, vs[1]._clen, vs[1].getSparsity() );
- //eviction costs
- if( CacheableData.CACHING_WRITE_CACHE_ON_READ &&
- LazyWriteBuffer.getWriteBufferSize()<MatrixBlock.estimateSizeOnDisk(vs[1]._rlen, vs[1]._clen, (long)((vs[1]._nnz<0)? vs[1]._rlen*vs[1]._clen:vs[1]._nnz)) )
- {
- ltime += Math.abs( getFSWriteTime( vs[1]._rlen, vs[1]._clen, vs[1].getSparsity()) );
- }
- vs[1]._inmem = true;
- }
- if( LOG.isDebugEnabled() && ltime!=0 ) {
- LOG.debug("Cost["+cpinst.getOpcode()+" - read] = "+ltime);
- }
-
- //exec time CP instruction
- String opcode = (cpinst instanceof FunctionCallCPInstruction) ? InstructionUtils.getOpCode(cpinst.toString()) : cpinst.getOpcode();
- double etime = getInstTimeEstimate(opcode, vs, args, ExecType.CP);
-
- //write time caching
- double wtime = 0;
- //double wtime = getFSWriteTime( vs[2]._rlen, vs[2]._clen, (vs[2]._nnz<0)? 1.0:(double)vs[2]._nnz/vs[2]._rlen/vs[2]._clen );
- if( inst instanceof VariableCPInstruction && ((VariableCPInstruction)inst).getOpcode().equals("write") )
- wtime += getHDFSWriteTime(vs[2]._rlen, vs[2]._clen, vs[2].getSparsity(), ((VariableCPInstruction)inst).getInput3().getName() );
-
- if( LOG.isDebugEnabled() && wtime!=0 ) {
- LOG.debug("Cost["+cpinst.getOpcode()+" - write] = "+wtime);
- }
-
- //total costs
- double costs = ltime + etime + wtime;
-
- //if( LOG.isDebugEnabled() )
- // LOG.debug("Costs CP instruction = "+costs);
-
- return costs;
- }
-
-
- @Override
- protected double getMRJobInstTimeEstimate( Instruction inst, VarStats[] vs, String[] args )
- throws DMLRuntimeException, DMLUnsupportedOperationException
- {
- MRJobInstruction jinst = (MRJobInstruction) inst;
-
- //infrastructure properties
- boolean localJob = InfrastructureAnalyzer.isLocalMode();
- int maxPMap = InfrastructureAnalyzer.getRemoteParallelMapTasks();
- int maxPRed = Math.min( InfrastructureAnalyzer.getRemoteParallelReduceTasks(),
- ConfigurationManager.getConfig().getIntValue(DMLConfig.NUM_REDUCERS) );
- double blocksize = ((double)InfrastructureAnalyzer.getHDFSBlockSize())/(1024*1024);
-
- //correction max number of mappers/reducers on yarn clusters
- if( InfrastructureAnalyzer.isYarnEnabled() ) {
- maxPMap = (int)Math.max( maxPMap, YarnClusterAnalyzer.getNumCores() );
- //artificially reduced by factor 2, in order to prefer map-side processing even if smaller degree of parallelism
- maxPRed = (int)Math.max( maxPRed, YarnClusterAnalyzer.getNumCores()/2 /2 );
- }
-
- //yarn-specific: take degree of parallelism into account
- if( jinst instanceof MRJobResourceInstruction ){
- int maxTasks = (int)((MRJobResourceInstruction)jinst).getMaxMRTasks();
- maxPMap = Math.min(maxPMap, maxTasks);
- maxPRed = Math.min(maxPRed, maxTasks);
- }
-
- //job properties
- boolean mapOnly = jinst.isMapOnly();
- String rdInst = jinst.getIv_randInstructions();
- String rrInst = jinst.getIv_recordReaderInstructions();
- String mapInst = jinst.getIv_instructionsInMapper();
- String shfInst = jinst.getIv_shuffleInstructions();
- String aggInst = jinst.getIv_aggInstructions();
- String otherInst = jinst.getIv_otherInstructions();
- byte[] inIx = getInputIndexes( jinst.getInputVars() );
- byte[] retIx = jinst.getIv_resultIndices();
- byte[] mapOutIx = getMapOutputIndexes(inIx, retIx, rdInst, mapInst, shfInst, aggInst, otherInst);
- int numMap = computeNumMapTasks(vs, inIx, blocksize, maxPMap, jinst.getJobType());
- int numPMap = Math.min(numMap, maxPMap);
- int numEPMap = Math.max(Math.min(numMap, maxPMap/2),1); //effective map dop
- int numRed = computeNumReduceTasks( vs, mapOutIx, jinst.getJobType() );
- int numPRed = Math.min(numRed, maxPRed);
- int numEPRed = Math.max(Math.min(numRed, maxPRed/2),1); //effective reduce dop
-
- LOG.debug("Meta nmap = "+numMap+", nred = "+numRed+"; npmap = "+numPMap+", npred = "+numPRed+"; nepmap = "+numEPMap+", nepred = "+numEPRed);
-
- //step 0: export if inputs in mem
- double exportCosts = 0;
- for( int i=0; i<jinst.getInputVars().length; i++ )
- if( vs[i]._inmem )
- exportCosts += getHDFSWriteTime(vs[i]._rlen, vs[i]._clen, vs[i].getSparsity());
-
- //step 1: MR job / task latency (normalization by effective dop)
- double jobLatencyCosts = localJob ? DEFAULT_MR_JOB_LATENCY_LOCAL : DEFAULT_MR_JOB_LATENCY_REMOTE;
- double taskLatencyCost = (numMap / numEPMap + numEPRed)
- * (localJob ? DEFAULT_MR_TASK_LATENCY_LOCAL : DEFAULT_MR_TASK_LATENCY_REMOTE);
- double latencyCosts = jobLatencyCosts + taskLatencyCost;
-
- //step 2: parallel read of inputs (normalization by effective dop)
- double hdfsReadCosts = 0;
- for( int i=0; i<jinst.getInputVars().length; i++ )
- hdfsReadCosts += getHDFSReadTime(vs[i]._rlen, vs[i]._clen, vs[i].getSparsity());
- hdfsReadCosts /= numEPMap;
-
- //step 3: parallel MR instructions
- String[] mapperInst = new String[]{rdInst, rrInst, mapInst};
- String[] reducerInst = new String[]{shfInst, aggInst, otherInst};
-
- //map instructions compute/distcache read (normalization by effective dop)
- double mapDCReadCost = 0; //read through distributed cache
- double mapCosts = 0; //map compute cost
- double shuffleCosts = 0;
- double reduceCosts = 0; //reduce compute costs
-
- for( String instCat : mapperInst )
- if( instCat != null && instCat.length()>0 ) {
- String[] linst = instCat.split( Lop.INSTRUCTION_DELIMITOR );
- for( String tmp : linst ){
- //map compute costs
- Object[] o = extractMRInstStatistics(tmp, vs);
- String opcode = InstructionUtils.getOpCode(tmp);
- mapCosts += getInstTimeEstimate(opcode, (VarStats[])o[0], (String[])o[1], ExecType.MR);
- //dist cache read costs
- int dcIndex = getDistcacheIndex(tmp);
- if( dcIndex >= 0 ) {
- mapDCReadCost += Math.min(getFSReadTime(vs[dcIndex]._rlen, vs[dcIndex]._clen, vs[dcIndex].getSparsity()),
- getFSReadTime(DistributedCacheInput.PARTITION_SIZE, 1, 1.0)) //32MB partitions
- * numMap; //read in each task
- }
- }
- }
- mapCosts /= numEPMap;
- mapDCReadCost /= numEPMap;
-
- if( !mapOnly )
- {
- //shuffle costs (normalization by effective map/reduce dop)
- for( int i=0; i<mapOutIx.length; i++ )
- {
- shuffleCosts += ( getFSWriteTime(vs[mapOutIx[i]]._rlen, vs[mapOutIx[i]]._clen, vs[mapOutIx[i]].getSparsity()) / numEPMap
- + getFSWriteTime(vs[mapOutIx[i]]._rlen, vs[mapOutIx[i]]._clen, vs[mapOutIx[i]].getSparsity())*4 / numEPRed
- + getFSReadTime(vs[mapOutIx[i]]._rlen, vs[mapOutIx[i]]._clen, vs[mapOutIx[i]].getSparsity()) / numEPRed);
-
- //correction of shuffle costs (necessary because the above shuffle does not consider the number of blocks)
- //TODO this is a workaround - we need to address the number of map output blocks in a more systematic way
- for( String instCat : reducerInst )
- if( instCat != null && instCat.length()>0 ) {
- String[] linst = instCat.split( Lop.INSTRUCTION_DELIMITOR );
- for( String tmp : linst ) {
- if(InstructionUtils.getMRType(tmp)==MRINSTRUCTION_TYPE.Aggregate)
- shuffleCosts += numMap * getFSWriteTime(vs[mapOutIx[i]]._rlen, vs[mapOutIx[i]]._clen, vs[mapOutIx[i]].getSparsity()) / numEPMap
- + numPMap * getFSWriteTime(vs[mapOutIx[i]]._rlen, vs[mapOutIx[i]]._clen, vs[mapOutIx[i]].getSparsity()) / numEPMap
- + numPMap * getFSReadTime(vs[mapOutIx[i]]._rlen, vs[mapOutIx[i]]._clen, vs[mapOutIx[i]].getSparsity()) / numEPRed;
- }
- }
- }
-
- //reduce instructions compute (normalization by effective dop)
- for( String instCat : reducerInst )
- if( instCat != null && instCat.length()>0 ) {
- String[] linst = instCat.split( Lop.INSTRUCTION_DELIMITOR );
- for( String tmp : linst ){
- Object[] o = extractMRInstStatistics(tmp, vs);
- if(InstructionUtils.getMRType(tmp)==MRINSTRUCTION_TYPE.Aggregate)
- o[1] = new String[]{String.valueOf(numMap)};
- String opcode = InstructionUtils.getOpCode(tmp);
- reduceCosts += getInstTimeEstimate(opcode, (VarStats[])o[0], (String[])o[1], ExecType.MR);
- }
- }
- reduceCosts /= numEPRed;
- }
-
- //step 4: parallel write of outputs (normalization by effective dop)
- double hdfsWriteCosts = 0;
- for( int i=0; i<jinst.getOutputVars().length; i++ )
- {
- hdfsWriteCosts += getHDFSWriteTime(vs[retIx[i]]._rlen, vs[retIx[i]]._clen, vs[retIx[i]].getSparsity());
- }
- hdfsWriteCosts /= ((mapOnly)? numEPMap : numEPRed);
-
- //debug output
- if( LOG.isDebugEnabled() ) {
- LOG.debug("Costs Export = "+exportCosts);
- LOG.debug("Costs Latency = "+latencyCosts);
- LOG.debug("Costs HDFS Read = "+hdfsReadCosts);
- LOG.debug("Costs Distcache Read = "+mapDCReadCost);
- LOG.debug("Costs Map Exec = "+mapCosts);
- LOG.debug("Costs Shuffle = "+shuffleCosts);
- LOG.debug("Costs Reduce Exec = "+reduceCosts);
- LOG.debug("Costs HDFS Write = "+hdfsWriteCosts);
- }
-
- //aggregate individual cost factors
- return exportCosts + latencyCosts +
- hdfsReadCosts + mapCosts + mapDCReadCost +
- shuffleCosts +
- reduceCosts + hdfsWriteCosts;
- }
-
- /**
- *
- * @param inst
- * @param stats
- * @return
- * @throws DMLUnsupportedOperationException
- * @throws DMLRuntimeException
- */
- private Object[] extractMRInstStatistics( String inst, VarStats[] stats )
- throws DMLUnsupportedOperationException, DMLRuntimeException
- {
- Object[] ret = new Object[2]; //stats, attrs
- VarStats[] vs = new VarStats[3];
- String[] attr = null;
-
- String[] parts = InstructionUtils.getInstructionParts(inst);
- String opcode = parts[0];
-
-
- if( opcode.equals(DataGen.RAND_OPCODE) )
- {
- vs[0] = _unknownStats;
- vs[1] = _unknownStats;
- vs[2] = stats[Integer.parseInt(parts[2])];
-
- int type = 2;
- //awareness of instruction patching min/max
- if( !parts[7].contains(Lop.VARIABLE_NAME_PLACEHOLDER)
- && !parts[8].contains(Lop.VARIABLE_NAME_PLACEHOLDER) )
- {
- double minValue = Double.parseDouble(parts[7]);
- double maxValue = Double.parseDouble(parts[8]);
- double sparsity = Double.parseDouble(parts[9]);
- if( minValue == 0.0 && maxValue == 0.0 )
- type = 0;
- else if( sparsity == 1.0 && minValue == maxValue )
- type = 1;
- }
- attr = new String[]{String.valueOf(type)};
- }
- if( opcode.equals(DataGen.SEQ_OPCODE) )
- {
- vs[0] = _unknownStats;
- vs[1] = _unknownStats;
- vs[2] = stats[Integer.parseInt(parts[2])];
- }
- else //general case
- {
-
- String inst2 = replaceInstructionPatch( inst );
- MRInstruction mrinst = MRInstructionParser.parseSingleInstruction(inst2);
-
- if( mrinst instanceof UnaryMRInstructionBase )
- {
- UnaryMRInstructionBase uinst = (UnaryMRInstructionBase) mrinst;
- vs[0] = uinst.input>=0 ? stats[ uinst.input ] : _unknownStats;
- vs[1] = _unknownStats;
- vs[2] = stats[ uinst.output ];
-
- if( vs[0] == null ) //scalar input, e.g., print
- vs[0] = _scalarStats;
- if( vs[2] == null ) //scalar output
- vs[2] = _scalarStats;
-
- if( mrinst instanceof MMTSJMRInstruction )
- {
- String type = ((MMTSJMRInstruction)mrinst).getMMTSJType().toString();
- attr = new String[]{type};
- }
- else if( mrinst instanceof CM_N_COVInstruction )
- {
- if( opcode.equals("cm") )
- attr = new String[]{parts[parts.length-2]};
- }
- else if( mrinst instanceof GroupedAggregateInstruction )
- {
- if( opcode.equals("groupedagg") )
- {
- AggregateOperationTypes type = CMOperator.getAggOpType(parts[2], parts[3]);
- attr = new String[]{String.valueOf(type.ordinal())};
- }
-
- }
- }
- else if( mrinst instanceof BinaryMRInstructionBase )
- {
- BinaryMRInstructionBase binst = (BinaryMRInstructionBase) mrinst;
- vs[0] = stats[ binst.input1 ];
- vs[1] = stats[ binst.input2 ];
- vs[2] = stats[ binst.output ];
-
- if( vs[0] == null ) //scalar input,
- vs[0] = _scalarStats;
- if( vs[1] == null ) //scalar input,
- vs[1] = _scalarStats;
- if( vs[2] == null ) //scalar output
- vs[2] = _scalarStats;
-
- if( opcode.equals("rmempty") ) {
- RemoveEmptyMRInstruction rbinst = (RemoveEmptyMRInstruction) mrinst;
- attr = new String[]{rbinst.isRemoveRows()?"0":"1"};
- }
- }
- else if( mrinst instanceof TernaryInstruction )
- {
- TernaryInstruction tinst = (TernaryInstruction) mrinst;
- vs[0] = stats[ tinst.input1 ];
- vs[1] = stats[ tinst.input2 ];
- vs[2] = stats[ tinst.input3 ];
-
- if( vs[0] == null ) //scalar input,
- vs[0] = _scalarStats;
- if( vs[1] == null ) //scalar input,
- vs[1] = _scalarStats;
- if( vs[2] == null ) //scalar input
- vs[2] = _scalarStats;
- }
- else if( mrinst instanceof PickByCountInstruction )
- {
- PickByCountInstruction pinst = (PickByCountInstruction) mrinst;
- vs[0] = stats[ pinst.input1 ];
- vs[2] = stats[ pinst.output ];
- if( vs[0] == null ) //scalar input,
- vs[0] = _scalarStats;
- if( vs[1] == null ) //scalar input,
- vs[1] = _scalarStats;
- if( vs[2] == null ) //scalar input
- vs[2] = _scalarStats;
- }
- else if( mrinst instanceof MapMultChainInstruction)
- {
- MapMultChainInstruction minst = (MapMultChainInstruction) mrinst;
- vs[0] = stats[ minst.getInput1() ];
- vs[1] = stats[ minst.getInput2() ];
- if( minst.getInput3()>=0 )
- vs[2] = stats[ minst.getInput3() ];
-
- if( vs[0] == null ) //scalar input,
- vs[0] = _scalarStats;
- if( vs[1] == null ) //scalar input,
- vs[1] = _scalarStats;
- if( vs[2] == null ) //scalar input
- vs[2] = _scalarStats;
- }
- }
-
- //maintain var status (CP output always inmem)
- vs[2]._inmem = true;
-
- ret[0] = vs;
- ret[1] = attr;
-
- return ret;
- }
-
-
-
- /////////////////////
- // Utilities //
- /////////////////////
-
- /**
- *
- * @param inputVars
- * @return
- */
- private byte[] getInputIndexes(String[] inputVars)
- {
- byte[] inIx = new byte[inputVars.length];
- for( int i=0; i<inIx.length; i++ )
- inIx[i] = (byte)i;
- return inIx;
- }
-
- /**
- *
- * @param inIx
- * @param retIx
- * @param rdInst
- * @param mapInst
- * @param shfInst
- * @param aggInst
- * @param otherInst
- * @return
- * @throws DMLUnsupportedOperationException
- * @throws DMLRuntimeException
- */
- private byte[] getMapOutputIndexes( byte[] inIx, byte[] retIx, String rdInst, String mapInst, String shfInst, String aggInst, String otherInst )
- throws DMLUnsupportedOperationException, DMLRuntimeException
- {
- //note: this is a simplified version of MRJobConfiguration.setUpOutputIndexesForMapper
-
- //map indices
- HashSet<Byte> ixMap = new HashSet<Byte>();
- for( byte ix : inIx )
- ixMap.add(ix);
-
- if( rdInst!=null && rdInst.length()>0 ) {
- rdInst = replaceInstructionPatch(rdInst);
- DataGenMRInstruction[] ins = MRInstructionParser.parseDataGenInstructions(rdInst);
- for( DataGenMRInstruction inst : ins )
- for( byte ix : inst.getAllIndexes() )
- ixMap.add(ix);
- }
-
- if( mapInst!=null && mapInst.length()>0 ) {
- mapInst = replaceInstructionPatch(mapInst);
- MRInstruction[] ins = MRInstructionParser.parseMixedInstructions(mapInst);
- for( MRInstruction inst : ins )
- for( byte ix : inst.getAllIndexes() )
- ixMap.add(ix);
- }
-
- //reduce indices
- HashSet<Byte> ixRed = new HashSet<Byte>();
- for( byte ix : retIx )
- ixRed.add(ix);
-
-
- if( shfInst!=null && shfInst.length()>0 ) {
- shfInst = replaceInstructionPatch(shfInst);
- MRInstruction[] ins = MRInstructionParser.parseMixedInstructions(shfInst);
- for( MRInstruction inst : ins )
- for( byte ix : inst.getAllIndexes() )
- ixRed.add(ix);
- }
-
- if( aggInst!=null && aggInst.length()>0 ) {
- aggInst = replaceInstructionPatch(aggInst);
- MRInstruction[] ins = MRInstructionParser.parseAggregateInstructions(aggInst);
- for( MRInstruction inst : ins )
- for( byte ix : inst.getAllIndexes() )
- ixRed.add(ix);
- }
-
- if( otherInst!=null && otherInst.length()>0 ) {
- otherInst = replaceInstructionPatch(otherInst);
- MRInstruction[] ins = MRInstructionParser.parseMixedInstructions(otherInst);
- for( MRInstruction inst : ins )
- for( byte ix : inst.getAllIndexes() )
- ixRed.add(ix);
- }
-
- //difference
- ixMap.retainAll(ixRed);
-
- //copy result
- byte[] ret = new byte[ixMap.size()];
- int i = 0;
- for( byte ix : ixMap )
- ret[i++] = ix;
-
- return ret;
- }
-
- /**
- *
- * @param vs
- * @param inputIx
- * @param blocksize
- * @param maxPMap
- * @param jobtype
- * @return
- */
- private int computeNumMapTasks( VarStats[] vs, byte[] inputIx, double blocksize, int maxPMap, JobType jobtype )
- {
- //special cases
- if( jobtype == JobType.DATAGEN )
- return maxPMap;
-
- //input size, num blocks
- double mapInputSize = 0;
- int numBlocks = 0;
- for( int i=0; i<inputIx.length; i++ )
- {
- //input size
- mapInputSize += ((double)MatrixBlock.estimateSizeOnDisk((long)vs[inputIx[i]]._rlen, (long)vs[inputIx[i]]._clen, (long)vs[inputIx[i]]._nnz)) / (1024*1024);
-
- //num blocks
- int lret = (int) Math.ceil((double)vs[inputIx[i]]._rlen/vs[inputIx[i]]._brlen)
- *(int) Math.ceil((double)vs[inputIx[i]]._clen/vs[inputIx[i]]._bclen);
- numBlocks = Math.max(lret, numBlocks);
- }
-
- return Math.max(1, Math.min( (int)Math.ceil(mapInputSize/blocksize),numBlocks ));
- }
-
- /**
- *
- * @param vs
- * @param mapOutIx
- * @param jobtype
- * @return
- */
- private int computeNumReduceTasks( VarStats[] vs, byte[] mapOutIx, JobType jobtype )
- {
- int ret = -1;
-
- //TODO for jobtype==JobType.MMCJ common dim
-
- switch( jobtype )
- {
- case REBLOCK:
- case CSV_REBLOCK: {
- for( int i=0; i<mapOutIx.length; i++ )
- {
- int lret = (int) Math.ceil((double)vs[mapOutIx[i]]._rlen/vs[mapOutIx[i]]._brlen)
- *(int) Math.ceil((double)vs[mapOutIx[i]]._clen/vs[mapOutIx[i]]._bclen);
- ret = Math.max(lret, ret);
- }
- break;
- }
-
- default: {
- for( int i=0; i<mapOutIx.length; i++ )
- {
- int lret = (int) Math.ceil((double)vs[mapOutIx[i]]._rlen/DMLTranslator.DMLBlockSize)
- *(int) Math.ceil((double)vs[mapOutIx[i]]._clen/DMLTranslator.DMLBlockSize);
- ret = Math.max(lret, ret);
- }
- break;
- }
- }
-
- return Math.max(1, ret);
- }
-
- /**
- *
- * @param inst
- * @return
- * @throws DMLRuntimeException
- * @throws DMLUnsupportedOperationException
- */
- private int getDistcacheIndex(String inst)
- throws DMLUnsupportedOperationException, DMLRuntimeException
- {
- ArrayList<Byte> indexes = new ArrayList<Byte>();
-
- if( InstructionUtils.isDistributedCacheUsed(inst) ) {
- MRInstruction mrinst = MRInstructionParser.parseSingleInstruction(inst);
- if( mrinst instanceof IDistributedCacheConsumer )
- ((IDistributedCacheConsumer)mrinst).addDistCacheIndex(inst, indexes);
- }
-
- if( !indexes.isEmpty() )
- return indexes.get(0);
- else
- return -1;
- }
-
-
- /////////////////////
- // I/O Costs //
- /////////////////////
-
- /**
- * Returns the estimated read time from HDFS.
- * NOTE: Does not handle unknowns.
- *
- * @param dm
- * @param dn
- * @param ds
- * @return
- */
- private double getHDFSReadTime( long dm, long dn, double ds )
- {
- boolean sparse = MatrixBlock.evalSparseFormatOnDisk(dm, dn, (long)(ds*dm*dn));
- double ret = ((double)MatrixBlock.estimateSizeOnDisk((long)dm, (long)dn, (long)(ds*dm*dn))) / (1024*1024);
-
- if( sparse )
- ret /= DEFAULT_MBS_HDFSREAD_BINARYBLOCK_SPARSE;
- else //dense
- ret /= DEFAULT_MBS_HDFSREAD_BINARYBLOCK_DENSE;
-
- return ret;
- }
-
- /**
- *
- * @param dm
- * @param dn
- * @param ds
- * @return
- */
- private double getHDFSWriteTime( long dm, long dn, double ds )
- {
- boolean sparse = MatrixBlock.evalSparseFormatOnDisk(dm, dn, (long)(ds*dm*dn));
-
- double bytes = (double)MatrixBlock.estimateSizeOnDisk((long)dm, (long)dn, (long)(ds*dm*dn));
- double mbytes = bytes / (1024*1024);
-
- double ret = -1;
- if( sparse )
- ret = mbytes / DEFAULT_MBS_HDFSWRITE_BINARYBLOCK_SPARSE;
- else //dense
- ret = mbytes / DEFAULT_MBS_HDFSWRITE_BINARYBLOCK_DENSE;
-
- //if( LOG.isDebugEnabled() )
- // LOG.debug("Costs[export] = "+ret+"s, "+mbytes+" MB ("+dm+","+dn+","+ds+").");
-
-
- return ret;
- }
-
- private double getHDFSWriteTime( long dm, long dn, double ds, String format )
- {
- boolean sparse = MatrixBlock.evalSparseFormatOnDisk(dm, dn, (long)(ds*dm*dn));
-
- double bytes = (double)MatrixBlock.estimateSizeOnDisk((long)dm, (long)dn, (long)(ds*dm*dn));
- double mbytes = bytes / (1024*1024);
-
- double ret = -1;
-
- if( format.equals("textcell") || format.equals("csv") )
- {
- if( sparse )
- ret = mbytes / DEFAULT_MBS_HDFSWRITE_TEXT_SPARSE;
- else //dense
- ret = mbytes / DEFAULT_MBS_HDFSWRITE_TEXT_DENSE;
- ret *= 2.75; //text commonly 2x-3.5x larger than binary
- }
- else
- {
- if( sparse )
- ret = mbytes / DEFAULT_MBS_HDFSWRITE_BINARYBLOCK_SPARSE;
- else //dense
- ret = mbytes / DEFAULT_MBS_HDFSWRITE_BINARYBLOCK_DENSE;
- }
- //if( LOG.isDebugEnabled() )
- // LOG.debug("Costs[export] = "+ret+"s, "+mbytes+" MB ("+dm+","+dn+","+ds+").");
-
-
- return ret;
- }
-
- /**
- * Returns the estimated read time from local FS.
- * NOTE: Does not handle unknowns.
- *
- * @param dm
- * @param dn
- * @param ds
- * @return
- */
- private double getFSReadTime( long dm, long dn, double ds )
- {
- boolean sparse = MatrixBlock.evalSparseFormatOnDisk(dm, dn, (long)(ds*dm*dn));
-
- double ret = ((double)MatrixBlock.estimateSizeOnDisk((long)dm, (long)dn, (long)(ds*dm*dn))) / (1024*1024);
- if( sparse )
- ret /= DEFAULT_MBS_FSREAD_BINARYBLOCK_SPARSE;
- else //dense
- ret /= DEFAULT_MBS_FSREAD_BINARYBLOCK_DENSE;
-
- return ret;
- }
-
- /**
- *
- * @param dm
- * @param dn
- * @param ds
- * @return
- */
- private double getFSWriteTime( long dm, long dn, double ds )
- {
- boolean sparse = MatrixBlock.evalSparseFormatOnDisk(dm, dn, (long)(ds*dm*dn));
-
- double ret = ((double)MatrixBlock.estimateSizeOnDisk((long)dm, (long)dn, (long)(ds*dm*dn))) / (1024*1024);
-
- if( sparse )
- ret /= DEFAULT_MBS_FSWRITE_BINARYBLOCK_SPARSE;
- else //dense
- ret /= DEFAULT_MBS_FSWRITE_BINARYBLOCK_DENSE;
-
- return ret;
- }
-
-
- /////////////////////
- // Operation Costs //
- /////////////////////
-
- /**
- *
- * @param inst
- * @param vs
- * @param args
- * @return
- * @throws DMLRuntimeException
- * @throws DMLUnsupportedOperationException
- */
- private double getInstTimeEstimate(String opcode, VarStats[] vs, String[] args, ExecType et)
- throws DMLRuntimeException, DMLUnsupportedOperationException
- {
- boolean inMR = (et == ExecType.MR);
- return getInstTimeEstimate(opcode, inMR,
- vs[0]._rlen, vs[0]._clen, (vs[0]._nnz<0)? 1.0:(double)vs[0]._nnz/vs[0]._rlen/vs[0]._clen,
- vs[1]._rlen, vs[1]._clen, (vs[1]._nnz<0)? 1.0:(double)vs[1]._nnz/vs[1]._rlen/vs[1]._clen,
- vs[2]._rlen, vs[2]._clen, (vs[2]._nnz<0)? 1.0:(double)vs[2]._nnz/vs[2]._rlen/vs[2]._clen,
- args);
- }
-
- /**
- * Returns the estimated instruction execution time, w/o data transfer and single-threaded.
- * For scalars input dims must be set to 1 before invocation.
- *
- * NOTE: Does not handle unknowns.
- *
- * @param opcode
- * @param d1m
- * @param d1n
- * @param d1s
- * @param d2m
- * @param d2n
- * @param d2s
- * @param d3m
- * @param d3n
- * @param d3s
- * @return
- * @throws DMLRuntimeException
- * @throws DMLUnsupportedOperationException
- */
- private double getInstTimeEstimate( String opcode, boolean inMR, long d1m, long d1n, double d1s, long d2m, long d2n, double d2s, long d3m, long d3n, double d3s, String[] args ) throws DMLRuntimeException, DMLUnsupportedOperationException
- {
- double nflops = getNFLOP(opcode, inMR, d1m, d1n, d1s, d2m, d2n, d2s, d3m, d3n, d3s, args);
- double time = nflops / DEFAULT_FLOPS;
-
- if( LOG.isDebugEnabled() )
- LOG.debug("Cost["+opcode+"] = "+time+"s, "+nflops+" flops ("+d1m+","+d1n+","+d1s+","+d2m+","+d2n+","+d2s+","+d3m+","+d3n+","+d3s+").");
-
- return time;
- }
-
- /**
- *
- * @param optype
- * @param d1m
- * @param d1n
- * @param d1s
- * @param d2m
- * @param d2n
- * @param d2s
- * @param d3m
- * @param d3n
- * @param d3s
- * @param args
- * @return
- * @throws DMLRuntimeException
- * @throws DMLUnsupportedOperationException
- */
- private double getNFLOP( String optype, boolean inMR, long d1m, long d1n, double d1s, long d2m, long d2n, double d2s, long d3m, long d3n, double d3s, String[] args )
- throws DMLRuntimeException, DMLUnsupportedOperationException
- {
- //operation costs in FLOP on matrix block level (for CP and MR instructions)
- //(excludes IO and parallelism; assumes known dims for all inputs, outputs )
-
- boolean leftSparse = MatrixBlock.evalSparseFormatInMemory(d1m, d1n, (long)(d1s*d1m*d1n));
- boolean rightSparse = MatrixBlock.evalSparseFormatInMemory(d2m, d2n, (long)(d2s*d2m*d2n));
- boolean onlyLeft = (d1m>=0 && d1n>=0 && d2m<0 && d2n<0 );
- boolean allExists = (d1m>=0 && d1n>=0 && d2m>=0 && d2n>=0 && d3m>=0 && d3n>=0 );
-
- //NOTE: all instruction types that are equivalent in CP and MR are only
- //included in CP to prevent redundancy
- CPINSTRUCTION_TYPE cptype = CPInstructionParser.String2CPInstructionType.get(optype);
- if( cptype != null ) //for CP Ops and equivalent MR ops
- {
- //general approach: count of floating point *, /, +, -, ^, builtin ;
- switch(cptype)
- {
-
- case AggregateBinary: //opcodes: ba+*, cov
- if( optype.equals("ba+*") ) { //matrix mult
- //reduction by factor 2 because matrix mult better than
- //average flop count
- if( !leftSparse && !rightSparse )
- return 2 * (d1m * d1n * ((d2n>1)?d1s:1.0) * d2n) /2;
- else if( !leftSparse && rightSparse )
- return 2 * (d1m * d1n * d1s * d2n * d2s) /2;
- else if( leftSparse && !rightSparse )
- return 2 * (d1m * d1n * d1s * d2n) /2;
- else //leftSparse && rightSparse
- return 2 * (d1m * d1n * d1s * d2n * d2s) /2;
- }
- else if( optype.equals("cov") ) {
- //note: output always scalar, d3 used as weights block
- //if( allExists ), same runtime for 2 and 3 inputs
- return 23 * d1m; //(11+3*k+)
- }
-
- return 0;
-
- case MMChain:
- //reduction by factor 2 because matrix mult better than average flop count
- //(mmchain essentially two matrix-vector muliplications)
- if( !leftSparse )
- return (2+2) * (d1m * d1n) /2;
- else
- return (2+2) * (d1m * d1n * d1s) /2;
-
- case AggregateTernary: //opcodes: tak+*
- return 6 * d1m * d1n; //2*1(*) + 4 (k+)
-
- case AggregateUnary: //opcodes: uak+, uark+, uack+, uasqk+, uarsqk+, uacsqk+,
- // uamean, uarmean, uacmean,
- // uamax, uarmax, uarimax, uacmax, uamin, uarmin, uacmin,
- // ua+, uar+, uac+, ua*, uatrace, uaktrace,
- // nrow, ncol, length, cm
-
- if( optype.equals("nrow") || optype.equals("ncol") || optype.equals("length") )
- return DEFAULT_NFLOP_NOOP;
- else if( optype.equals( "cm" ) ) {
- double xcm = 1;
- switch( Integer.parseInt(args[0]) ) {
- case 0: xcm=1; break; //count
- case 1: xcm=8; break; //mean
- case 2: xcm=16; break; //cm2
- case 3: xcm=31; break; //cm3
- case 4: xcm=51; break; //cm4
- case 5: xcm=16; break; //variance
- }
- return (leftSparse) ? xcm * (d1m * d1s + 1) : xcm * d1m;
- }
- else if( optype.equals("uatrace") || optype.equals("uaktrace") )
- return 2 * d1m * d1n;
- else if( optype.equals("ua+") || optype.equals("uar+") || optype.equals("uar+") ){
- //sparse safe operations
- if( !leftSparse ) //dense
- return d1m * d1n;
- else //sparse
- return d1m * d1n * d1s;
- }
- else if( optype.equals("uak+") || optype.equals("uark+") || optype.equals("uark+"))
- return 4 * d1m * d1n; //1*k+
- else if( optype.equals("uasqk+") || optype.equals("uarsqk+") || optype.equals("uacsqk+"))
- return 5 * d1m * d1n; // +1 for multiplication to square term
- else if( optype.equals("uamean") || optype.equals("uarmean") || optype.equals("uacmean"))
- return 7 * d1m * d1n; //1*k+
- else if( optype.equals("uamax") || optype.equals("uarmax") || optype.equals("uacmax")
- || optype.equals("uamin") || optype.equals("uarmin") || optype.equals("uacmin")
- || optype.equals("uarimax") || optype.equals("ua*") )
- return d1m * d1n;
-
- return 0;
-
- case ArithmeticBinary: //opcodes: +, -, *, /, ^ (incl. ^2, *2)
- //note: covers scalar-scalar, scalar-matrix, matrix-matrix
- if( optype.equals("+") || optype.equals("-") //sparse safe
- && ( leftSparse || rightSparse ) )
- return d1m*d1n*d1s + d2m*d2n*d2s;
- else
- return d3m*d3n;
-
- case Ternary: //opcodes: ctable
- if( optype.equals("ctable") ){
- if( leftSparse )
- return d1m * d1n * d1s; //add
- else
- return d1m * d1n;
- }
- return 0;
-
- case BooleanBinary: //opcodes: &&, ||
- return 1; //always scalar-scalar
-
- case BooleanUnary: //opcodes: !
- return 1; //always scalar-scalar
-
- case Builtin: //opcodes: log
- //note: covers scalar-scalar, scalar-matrix, matrix-matrix
- //note: can be unary or binary
- if( allExists ) //binary
- return 3 * d3m * d3n;
- else //unary
- return d3m * d3n;
-
- case BuiltinBinary: //opcodes: max, min, solve
- //note: covers scalar-scalar, scalar-matrix, matrix-matrix
- if( optype.equals("solve") ) //see also MultiReturnBuiltin
- return d1m * d1n * d1n; //for 1kx1k ~ 1GFLOP -> 0.5s
- else //default
- return d3m * d3n;
-
-
- case BuiltinUnary: //opcodes: exp, abs, sin, cos, tan, sqrt, plogp, print, round, sprop, sigmoid
- if( optype.equals("print") ) //scalar only
- return 1;
- else
- {
- double xbu = 1; //default for all ops
- if( optype.equals("plogp") ) xbu = 2;
- else if( optype.equals("round") ) xbu = 4;
-
- if( optype.equals("sin") || optype.equals("tan") || optype.equals("round")
- || optype.equals("abs") || optype.equals("sqrt") || optype.equals("sprop")
- || optype.equals("sigmoid") ) //sparse-safe
- {
- if( leftSparse ) //sparse
- return xbu * d1m * d1n * d1s;
- else //dense
- return xbu * d1m * d1n;
- }
- else
- return xbu * d1m * d1n;
- }
-
- case Reorg: //opcodes: r', rdiag
- case MatrixReshape: //opcodes: rshape
- if( leftSparse )
- return d1m * d1n * d1s;
- else
- return d1m * d1n;
-
- case Append: //opcodes: append
- return DEFAULT_NFLOP_CP *
- (((leftSparse) ? d1m * d1n * d1s : d1m * d1n ) +
- ((rightSparse) ? d2m * d2n * d2s : d2m * d2n ));
-
- case RelationalBinary: //opcodes: ==, !=, <, >, <=, >=
- //note: all relational ops are not sparsesafe
- return d3m * d3n; //covers all combinations of scalar and matrix
-
- case File: //opcodes: rm, mv
- return DEFAULT_NFLOP_NOOP;
-
- case Variable: //opcodes: assignvar, cpvar, rmvar, rmfilevar, assignvarwithfile, attachfiletovar, valuepick, iqsize, read, write, createvar, setfilename, castAsMatrix
- if( optype.equals("write") ){
- boolean text = args[0].equals("textcell") || args[0].equals("csv");
- double xwrite = text ? DEFAULT_NFLOP_TEXT_IO : DEFAULT_NFLOP_CP;
-
- if( !leftSparse )
- return d1m * d1n * xwrite;
- else
- return d1m * d1n * d1s * xwrite;
- }
- else if ( optype.equals("inmem-iqm") )
- //note: assumes uniform distribution
- return 2 * d1m + //sum of weights
- 5 + 0.25d * d1m + //scan to lower quantile
- 8 * 0.5 * d1m; //scan from lower to upper quantile
- else
- return DEFAULT_NFLOP_NOOP;
-
- case Rand: //opcodes: rand, seq
- if( optype.equals(DataGen.RAND_OPCODE) ){
- int nflopRand = 32; //per random number
- switch(Integer.parseInt(args[0])) {
- case 0: return DEFAULT_NFLOP_NOOP; //empty matrix
- case 1: return d3m * d3n * 8; //allocate, arrayfill
- case 2: //full rand
- {
- if( d3s==1.0 )
- return d3m * d3n * nflopRand + d3m * d3n * 8; //DENSE gen (incl allocate)
- else
- return (d3s>=MatrixBlock.SPARSITY_TURN_POINT)?
- 2 * d3m * d3n * nflopRand + d3m * d3n * 8: //DENSE gen (incl allocate)
- 3 * d3m * d3n * d3s * nflopRand + d3m * d3n * d3s * 24; //SPARSE gen (incl allocate)
- }
- }
- }
- else //seq
- return d3m * d3n * DEFAULT_NFLOP_CP;
-
- case StringInit: //sinit
- return d3m * d3n * DEFAULT_NFLOP_CP;
-
- case External: //opcodes: extfunct
- //note: should be invoked independently for multiple outputs
- return d1m * d1n * d1s * DEFAULT_NFLOP_UNKNOWN;
-
- case MultiReturnBuiltin: //opcodes: qr, lu, eigen
- //note: they all have cubic complexity, the scaling factor refers to commons.math
- double xf = 2; //default e.g, qr
- if( optype.equals("eigen") )
- xf = 32;
- else if ( optype.equals("lu") )
- xf = 16;
- return xf * d1m * d1n * d1n; //for 1kx1k ~ 2GFLOP -> 1s
-
- case ParameterizedBuiltin: //opcodes: cdf, invcdf, groupedagg, rmempty
- if( optype.equals("cdf") || optype.equals("invcdf"))
- return DEFAULT_NFLOP_UNKNOWN; //scalar call to commons.math
- else if( optype.equals("groupedagg") ){
- double xga = 1;
- switch( Integer.parseInt(args[0]) ) {
- case 0: xga=4; break; //sum, see uk+
- case 1: xga=1; break; //count, see cm
- case 2: xga=8; break; //mean
- case 3: xga=16; break; //cm2
- case 4: xga=31; break; //cm3
- case 5: xga=51; break; //cm4
- case 6: xga=16; break; //variance
- }
- return 2 * d1m + xga * d1m; //scan for min/max, groupedagg
- }
- else if( optype.equals("rmempty") ){
- switch(Integer.parseInt(args[0])){
- case 0: //remove rows
- return ((leftSparse) ? d1m : d1m * Math.ceil(1.0d/d1s)/2) +
- DEFAULT_NFLOP_CP * d3m * d2m;
- case 1: //remove cols
- return d1n * Math.ceil(1.0d/d1s)/2 +
- DEFAULT_NFLOP_CP * d3m * d2m;
- }
-
- }
- return 0;
-
- case QSort: //opcodes: sort
- if( optype.equals("sort") ){
- //note: mergesort since comparator used
- double sortCosts = 0;
- if( onlyLeft )
- sortCosts = DEFAULT_NFLOP_CP * d1m + d1m;
- else //w/ weights
- sortCosts = DEFAULT_NFLOP_CP * ((leftSparse)?d1m*d1s:d1m);
- return sortCosts + d1m*(int)(Math.log(d1m)/Math.log(2)) + //mergesort
- DEFAULT_NFLOP_CP * d1m;
- }
- return 0;
-
- case MatrixIndexing: //opcodes: rangeReIndex, leftIndex
- if( optype.equals("leftIndex") ){
- return DEFAULT_NFLOP_CP * ((leftSparse)? d1m*d1n*d1s : d1m*d1n)
- + 2 * DEFAULT_NFLOP_CP * ((rightSparse)? d2m*d2n*d2s : d2m*d2n );
- }
- else if( optype.equals("rangeReIndex") ){
- return DEFAULT_NFLOP_CP * ((leftSparse)? d2m*d2n*d2s : d2m*d2n );
- }
- return 0;
-
- case MMTSJ: //opcodes: tsmm
- //diff to ba+* only upper triangular matrix
- //reduction by factor 2 because matrix mult better than
- //average flop count
- if( MMTSJType.valueOf(args[0]).isLeft() ) { //lefttranspose
- if( !rightSparse ) //dense
- return d1m * d1n * d1s * d1n /2;
- else //sparse
- return d1m * d1n * d1s * d1n * d1s /2;
- }
- else if(onlyLeft) { //righttranspose
- if( !leftSparse ) //dense
- return (double)d1m * d1n * d1m /2;
- else //sparse
- return d1m * d1n * d1s //reorg sparse
- + d1m * d1n * d1s * d1n * d1s /2; //core tsmm
- }
- return 0;
-
- case Partition:
- return d1m * d1n * d1s + //partitioning costs
- (inMR ? 0 : //include write cost if in CP
- getHDFSWriteTime(d1m, d1n, d1s)* DEFAULT_FLOPS);
-
- case INVALID:
- return 0;
-
- default:
- throw new DMLRuntimeException("CostEstimator: unsupported instruction type: "+optype);
- }
-
- }
-
- //if not found in CP instructions
- MRINSTRUCTION_TYPE mrtype = MRInstructionParser.String2MRInstructionType.get(optype);
- if ( mrtype != null ) //for specific MR ops
- {
- switch(mrtype)
- {
- case Aggregate: //opcodes: a+, ak+, asqk+, a*, amax, amin, amean
- //TODO should be aggregate unary
- int numMap = Integer.parseInt(args[0]);
- if( optype.equals("ak+") )
- return 4 * numMap * d1m * d1n * d1s;
- else if( optype.equals("asqk+") )
- return 5 * numMap * d1m * d1n * d1s; // +1 for multiplication to square term
- else
- return numMap * d1m * d1n * d1s;
-
- case AggregateBinary: //opcodes: cpmm, rmm, mapmult
- //note: copy from CP costs
- if( optype.equals("cpmm") || optype.equals("rmm")
- || optype.equals(MapMult.OPCODE) ) //matrix mult
- {
- //reduction by factor 2 because matrix mult better than
- //average flop count
- if( !leftSparse && !rightSparse )
- return 2 * (d1m * d1n * ((d2n>1)?d1s:1.0) * d2n) /2;
- else if( !leftSparse && rightSparse )
- return 2 * (d1m * d1n * d1s * d2n * d2s) /2;
- else if( leftSparse && !rightSparse )
- return 2 * (d1m * d1n * d1s * d2n) /2;
- else //leftSparse && rightSparse
- return 2 * (d1m * d1n * d1s * d2n * d2s) /2;
- }
- return 0;
-
- case MapMultChain: //opcodes: mapmultchain
- //assume dense input2 and input3
- return 2 * d1m * d2n * d1n * ((d2n>1)?d1s:1.0) //ba(+*)
- + d1m * d2n //cellwise b(*)
- + d1m * d2n //r(t)
- + 2 * d2n * d1n * d1m * (leftSparse?d1s:1.0) //ba(+*)
- + d2n * d1n; //r(t)
-
- case ArithmeticBinary: //opcodes: s-r, so, max, min,
- // >, >=, <, <=, ==, !=
- //TODO Should be relational
-
- //note: all relational ops are not sparsesafe
- return d3m * d3n; //covers all combinations of scalar and matrix
-
- case CombineUnary: //opcodes: combineunary
- return d1m * d1n * d1s;
-
- case CombineBinary: //opcodes: combinebinary
- return d1m * d1n * d1s
- + d2m * d2n * d2s;
-
- case CombineTernary: //opcodes: combinetertiary
- return d1m * d1n * d1s
- + d2m * d2n * d2s
- + d3m * d3n * d3s;
-
- case Unary: //opcodes: log, slog, pow
- //TODO requires opcode consolidation (builtin, arithmic)
- //note: covers scalar, matrix, matrix-scalar
- return d3m * d3n;
-
- case Ternary: //opcodes: ctabletransform, ctabletransformscalarweight, ctabletransformhistogram, ctabletransformweightedhistogram
- //note: copy from cp
- if( leftSparse )
- return d1m * d1n * d1s; //add
- else
- return d1m * d1n;
-
- case Quaternary:
- //TODO pattern specific and all 4 inputs requires
- return d1m * d1n * d1s *4;
-
- case Reblock: //opcodes: rblk
- return DEFAULT_NFLOP_CP * ((leftSparse)? d1m*d1n*d1s : d1m*d1n);
-
- case Replicate: //opcodes: rblk
- return DEFAULT_NFLOP_CP * ((leftSparse)? d1m*d1n*d1s : d1m*d1n);
-
- case CM_N_COV: //opcodes: mean
- double xcm = 8;
- return (leftSparse) ? xcm * (d1m * d1s + 1) : xcm * d1m;
-
- case GroupedAggregate: //opcodes: groupedagg
- //TODO: need to consolidate categories (ParameterizedBuiltin)
- //copy from CP opertion
- double xga = 1;
- switch( Integer.parseInt(args[0]) ) {
- case 0: xga=4; break; //sum, see uk+
- case 1: xga=1; break; //count, see cm
- case 2: xga=8; break; //mean
- case 3: xga=16; break; //cm2
- case 4: xga=31; break; //cm3
- case 5: xga=51; break; //cm4
- case 6: xga=16; break; //variance
- }
- return 2 * d1m + xga * d1m; //scan for min/max, groupedagg
-
- case PickByCount: //opcodes: valuepick, rangepick
- break;
- //TODO
- //String2MRInstructionType.put( "valuepick" , MRINSTRUCTION_TYPE.PickByCount); // for quantile()
- //String2MRInstructionType.put( "rangepick" , MRINSTRUCTION_TYPE.PickByCount); // for interQuantile()
-
- case RangeReIndex: //opcodes: rangeReIndex, rangeReIndexForLeft
- //TODO: requires category consolidation
- if( optype.equals("rangeReIndex") )
- return DEFAULT_NFLOP_CP * ((leftSparse)? d2m*d2n*d2s : d2m*d2n );
- else //rangeReIndexForLeft
- return DEFAULT_NFLOP_CP * ((leftSparse)? d1m*d1n*d1s : d1m*d1n)
- + DEFAULT_NFLOP_CP * ((rightSparse)? d2m*d2n*d2s : d2m*d2n );
-
- case ZeroOut: //opcodes: zeroOut
- return DEFAULT_NFLOP_CP * ((leftSparse)? d1m*d1n*d1s : d1m*d1n)
- + DEFAULT_NFLOP_CP * ((rightSparse)? d2m*d2n*d2s : d2m*d2n );
-
- default:
- return 0;
- }
- }
- else
- {
- throw new DMLRuntimeException("CostEstimator: unsupported instruction type: "+optype);
- }
-
- //TODO Parameterized Builtin Functions
- //String2CPFileInstructionType.put( "rmempty" , CPINSTRUCTION_TYPE.ParameterizedBuiltin);
-
- return -1; //should never come here.
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/276d9257/src/main/java/com/ibm/bi/dml/hops/cost/VarStats.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/ibm/bi/dml/hops/cost/VarStats.java b/src/main/java/com/ibm/bi/dml/hops/cost/VarStats.java
deleted file mode 100644
index b1d9671..0000000
--- a/src/main/java/com/ibm/bi/dml/hops/cost/VarStats.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/**
- * (C) Copyright IBM Corp. 2010, 2015
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package com.ibm.bi.dml.hops.cost;
-
-/**
- *
- *
- */
-public class VarStats
-{
-
-
- long _rlen = -1;
- long _clen = -1;
- long _brlen = -1;
- long _bclen = -1;
- double _nnz = -1;
- boolean _inmem = false;
-
- public VarStats( long rlen, long clen, long brlen, long bclen, long nnz, boolean inmem )
- {
- _rlen = rlen;
- _clen = clen;
- _brlen = brlen;
- _bclen = bclen;
- _nnz = nnz;
- _inmem = inmem;
- }
-
- /**
- *
- * @return
- */
- public double getSparsity()
- {
- return (_nnz<0) ? 1.0 : (double)_nnz/_rlen/_clen;
- }
-
- @Override
- public String toString()
- {
- StringBuilder sb = new StringBuilder();
- sb.append("VarStats: [");
- sb.append("rlen = ");
- sb.append(_rlen);
- sb.append(", clen = ");
- sb.append(_clen);
- sb.append(", nnz = ");
- sb.append(_nnz);
- sb.append(", inmem = ");
- sb.append(_inmem);
- sb.append("]");
-
- return sb.toString();
- }
-
- @Override
- public Object clone()
- {
- return new VarStats(_rlen, _clen, _brlen, _bclen,(long)_nnz, _inmem );
- }
-}